You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by jh...@apache.org on 2014/01/28 13:35:45 UTC
[08/18] TAJO-520: Move tajo-core-storage to tajo-storage. (jinho)
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-storage/src/main/java/org/apache/tajo/storage/LineReader.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/LineReader.java b/tajo-storage/src/main/java/org/apache/tajo/storage/LineReader.java
new file mode 100644
index 0000000..66c610a
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/LineReader.java
@@ -0,0 +1,559 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.tajo.storage.rcfile.NonSyncByteArrayOutputStream;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+
+/**
+ * A class that provides a line reader from an input stream.
+ * Depending on the constructor used, lines will either be terminated by:
+ * <ul>
+ * <li>one of the following: '\n' (LF) , '\r' (CR),
+ * or '\r\n' (CR+LF).</li>
+ * <li><em>or</em>, a custom byte sequence delimiter</li>
+ * </ul>
+ * In both cases, EOF also terminates an otherwise unterminated
+ * line.
+ */
+
+public class LineReader implements Closeable {
+ private static final int DEFAULT_BUFFER_SIZE = 64 * 1024;
+ private int bufferSize = DEFAULT_BUFFER_SIZE;
+ private InputStream in;
+ private byte[] buffer;
+ // the number of bytes of real data in the buffer
+ private int bufferLength = 0;
+ // the current position in the buffer
+ private int bufferPosn = 0;
+
+ private static final byte CR = '\r';
+ private static final byte LF = '\n';
+
+ // The line delimiter
+ private final byte[] recordDelimiterBytes;
+
+ /**
+ * Create a line reader that reads from the given stream using the
+ * default buffer-size (64k).
+ *
+ * @param in The input stream
+ * @throws IOException
+ */
+ public LineReader(InputStream in) {
+ this(in, DEFAULT_BUFFER_SIZE);
+ }
+
+ /**
+ * Create a line reader that reads from the given stream using the
+ * given buffer-size.
+ *
+ * @param in The input stream
+ * @param bufferSize Size of the read buffer
+ * @throws IOException
+ */
+ public LineReader(InputStream in, int bufferSize) {
+ this.in = in;
+ this.bufferSize = bufferSize;
+ this.buffer = new byte[this.bufferSize];
+ this.recordDelimiterBytes = null;
+ }
+
+ /**
+ * Create a line reader that reads from the given stream using the
+ * <code>io.file.buffer.size</code> specified in the given
+ * <code>Configuration</code>.
+ *
+ * @param in input stream
+ * @param conf configuration
+ * @throws IOException
+ */
+ public LineReader(InputStream in, Configuration conf) throws IOException {
+ this(in, conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE));
+ }
+
+ /**
+ * Create a line reader that reads from the given stream using the
+ * default buffer-size, and using a custom delimiter of array of
+ * bytes.
+ *
+ * @param in The input stream
+ * @param recordDelimiterBytes The delimiter
+ */
+ public LineReader(InputStream in, byte[] recordDelimiterBytes) {
+ this.in = in;
+ this.bufferSize = DEFAULT_BUFFER_SIZE;
+ this.buffer = new byte[this.bufferSize];
+ this.recordDelimiterBytes = recordDelimiterBytes;
+ }
+
+ /**
+ * Create a line reader that reads from the given stream using the
+ * given buffer-size, and using a custom delimiter of array of
+ * bytes.
+ *
+ * @param in The input stream
+ * @param bufferSize Size of the read buffer
+ * @param recordDelimiterBytes The delimiter
+ * @throws IOException
+ */
+ public LineReader(InputStream in, int bufferSize,
+ byte[] recordDelimiterBytes) {
+ this.in = in;
+ this.bufferSize = bufferSize;
+ this.buffer = new byte[this.bufferSize];
+ this.recordDelimiterBytes = recordDelimiterBytes;
+ }
+
+ /**
+ * Create a line reader that reads from the given stream using the
+ * <code>io.file.buffer.size</code> specified in the given
+ * <code>Configuration</code>, and using a custom delimiter of array of
+ * bytes.
+ *
+ * @param in input stream
+ * @param conf configuration
+ * @param recordDelimiterBytes The delimiter
+ * @throws IOException
+ */
+ public LineReader(InputStream in, Configuration conf,
+ byte[] recordDelimiterBytes) throws IOException {
+ this.in = in;
+ this.bufferSize = conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE);
+ this.buffer = new byte[this.bufferSize];
+ this.recordDelimiterBytes = recordDelimiterBytes;
+ }
+
+
+ /**
+ * Close the underlying stream.
+ *
+ * @throws IOException
+ */
+ public void close() throws IOException {
+ in.close();
+ }
+
+ public void reset() {
+ bufferLength = 0;
+ bufferPosn = 0;
+
+ }
+
+ /**
+ * Read one line from the InputStream into the given Text.
+ *
+ * @param str the object to store the given line (without newline)
+ * @param maxLineLength the maximum number of bytes to store into str;
+ * the rest of the line is silently discarded.
+ * @param maxBytesToConsume the maximum number of bytes to consume
+ * in this call. This is only a hint, because if the line cross
+ * this threshold, we allow it to happen. It can overshoot
+ * potentially by as much as one buffer length.
+ * @return the number of bytes read including the (longest) newline
+ * found.
+ * @throws IOException if the underlying stream throws
+ */
+ public int readLine(Text str, int maxLineLength,
+ int maxBytesToConsume) throws IOException {
+ if (this.recordDelimiterBytes != null) {
+ return readCustomLine(str, maxLineLength, maxBytesToConsume);
+ } else {
+ return readDefaultLine(str, maxLineLength, maxBytesToConsume);
+ }
+ }
+
+ protected int fillBuffer(InputStream in, byte[] buffer, boolean inDelimiter)
+ throws IOException {
+ return in.read(buffer);
+ }
+ /**
+ * Read a line terminated by one of CR, LF, or CRLF.
+ */
+ private int readDefaultLine(Text str, int maxLineLength, int maxBytesToConsume)
+ throws IOException {
+ /* We're reading data from in, but the head of the stream may be
+ * already buffered in buffer, so we have several cases:
+ * 1. No newline characters are in the buffer, so we need to copy
+ * everything and read another buffer from the stream.
+ * 2. An unambiguously terminated line is in buffer, so we just
+ * copy to str.
+ * 3. Ambiguously terminated line is in buffer, i.e. buffer ends
+ * in CR. In this case we copy everything up to CR to str, but
+ * we also need to see what follows CR: if it's LF, then we
+ * need consume LF as well, so next call to readLine will read
+ * from after that.
+ * We use a flag prevCharCR to signal if previous character was CR
+ * and, if it happens to be at the end of the buffer, delay
+ * consuming it until we have a chance to look at the char that
+ * follows.
+ */
+ str.clear();
+ int txtLength = 0; //tracks str.getLength(), as an optimization
+ int newlineLength = 0; //length of terminating newline
+ boolean prevCharCR = false; //true of prev char was CR
+ long bytesConsumed = 0;
+ do {
+ int startPosn = bufferPosn; //starting from where we left off the last time
+ if (bufferPosn >= bufferLength) {
+ startPosn = bufferPosn = 0;
+ if (prevCharCR) {
+ ++bytesConsumed; //account for CR from previous read
+ }
+ bufferLength = fillBuffer(in, buffer, prevCharCR);
+ if (bufferLength <= 0) {
+ break; // EOF
+ }
+ }
+ for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline
+ if (buffer[bufferPosn] == LF) {
+ newlineLength = (prevCharCR) ? 2 : 1;
+ ++bufferPosn; // at next invocation proceed from following byte
+ break;
+ }
+ if (prevCharCR) { //CR + notLF, we are at notLF
+ newlineLength = 1;
+ break;
+ }
+ prevCharCR = (buffer[bufferPosn] == CR);
+ }
+ int readLength = bufferPosn - startPosn;
+ if (prevCharCR && newlineLength == 0) {
+ --readLength; //CR at the end of the buffer
+ }
+ bytesConsumed += readLength;
+ int appendLength = readLength - newlineLength;
+ if (appendLength > maxLineLength - txtLength) {
+ appendLength = maxLineLength - txtLength;
+ }
+ if (appendLength > 0) {
+ str.append(buffer, startPosn, appendLength);
+ txtLength += appendLength;
+ }
+ } while (newlineLength == 0 && bytesConsumed < maxBytesToConsume);
+
+ if (bytesConsumed > (long) Integer.MAX_VALUE) {
+ throw new IOException("Too many bytes before newline: " + bytesConsumed);
+ }
+ return (int) bytesConsumed;
+ }
+
+ /**
+ * Read a line terminated by one of CR, LF, or CRLF.
+ */
+ public int readDefaultLine(NonSyncByteArrayOutputStream str, ArrayList<Integer> offsets, int maxLineLength
+ , int maxBytesToConsume)
+ throws IOException {
+ /* We're reading data from in, but the head of the stream may be
+ * already buffered in buffer, so we have several cases:
+ * 1. No newline characters are in the buffer, so we need to copy
+ * everything and read another buffer from the stream.
+ * 2. An unambiguously terminated line is in buffer, so we just
+ * copy to str.
+ * 3. Ambiguously terminated line is in buffer, i.e. buffer ends
+ * in CR. In this case we copy everything up to CR to str, but
+ * we also need to see what follows CR: if it's LF, then we
+ * need consume LF as well, so next call to readLine will read
+ * from after that.
+ * We use a flag prevCharCR to signal if previous character was CR
+ * and, if it happens to be at the end of the buffer, delay
+ * consuming it until we have a chance to look at the char that
+ * follows.
+ */
+
+ int txtLength = 0; //tracks str.getLength(), as an optimization
+ int newlineLength = 0; //length of terminating newline
+ boolean prevCharCR = false; //true of prev char was CR
+ long bytesConsumed = 0;
+ do {
+ int startPosn = bufferPosn; //starting from where we left off the last time
+ if (bufferPosn >= bufferLength) {
+ startPosn = bufferPosn = 0;
+ if (prevCharCR) {
+ ++bytesConsumed; //account for CR from previous read
+ }
+ bufferLength = fillBuffer(in, buffer, prevCharCR);
+ if (bufferLength <= 0) {
+ break; // EOF
+ }
+ }
+ for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline
+ if (buffer[bufferPosn] == LF) {
+ newlineLength = (prevCharCR) ? 2 : 1;
+ ++bufferPosn; // at next invocation proceed from following byte
+ break;
+ }
+ if (prevCharCR) { //CR + notLF, we are at notLF
+ newlineLength = 1;
+ break;
+ }
+ prevCharCR = (buffer[bufferPosn] == CR);
+ }
+ int readLength = bufferPosn - startPosn;
+ if (prevCharCR && newlineLength == 0) {
+ --readLength; //CR at the end of the buffer
+ }
+ bytesConsumed += readLength;
+ int appendLength = readLength - newlineLength;
+ if (appendLength > maxLineLength - txtLength) {
+ appendLength = maxLineLength - txtLength;
+ }
+ if (appendLength > 0) {
+ str.write(buffer, startPosn, appendLength);
+ txtLength += appendLength;
+ }
+ } while (newlineLength == 0 && bytesConsumed < maxBytesToConsume);
+
+ if (bytesConsumed > (long) Integer.MAX_VALUE) {
+ throw new IOException("Too many bytes before newline: " + bytesConsumed);
+ }
+
+ if (bytesConsumed > 0) offsets.add(txtLength);
+ return (int) bytesConsumed;
+ }
+
+ /**
+ * Read a line terminated by one of CR, LF, or CRLF.
+ */
+
+/* int validIdx = 0;
+ public int readDefaultLines(NonSyncByteArrayOutputStream str, ArrayList<Integer> offsets, ArrayList<Long> foffsets,
+ long pos, int maxLineLength, int maxBytesToConsume)
+ throws IOException {
+ *//* We're reading data from in, but the head of the stream may be
+ * already buffered in buffer, so we have several cases:
+ * 1. No newline characters are in the buffer, so we need to copy
+ * everything and read another buffer from the stream.
+ * 2. An unambiguously terminated line is in buffer, so we just
+ * copy to str.
+ * 3. Ambiguously terminated line is in buffer, i.e. buffer ends
+ * in CR. In this case we copy everything up to CR to str, but
+ * we also need to see what follows CR: if it's LF, then we
+ * need consume LF as well, so next call to readLine will read
+ * from after that.
+ * We use a flag prevCharCR to signal if previous character was CR
+ * and, if it happens to be at the end of the buffer, delay
+ * consuming it until we have a chance to look at the char that
+ * follows.
+ *//*
+ //str.clear();
+ str.reset();
+ offsets.clear();
+ foffsets.clear();
+
+ validIdx = 0;
+ long bufferBytesConsumed = 0;
+
+ int txtLength = 0; //tracks str.getLength(), as an optimization
+ int newlineLength = 0; //length of terminating newline
+ boolean prevCharCR = false; //true of prev char was CR
+ long bytesConsumed = 0;
+ do {
+
+ int startPosn = bufferPosn; //starting from where we left off the last time
+ if (bufferPosn >= bufferLength) {
+ startPosn = bufferPosn = 0;
+ if (prevCharCR) {
+ ++bytesConsumed; //account for CR from previous read
+ }
+ bufferLength = in.read(buffer);
+ if (bufferLength <= 0) {
+ break; // EOF
+ }
+ }
+ for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline
+ if (buffer[bufferPosn] == LF) {
+ newlineLength = (prevCharCR) ? 2 : 1;
+ ++bufferPosn; // at next invocation proceed from following byte
+ break;
+ }
+ if (prevCharCR) { //CR + notLF, we are at notLF
+ newlineLength = 1;
+ break;
+ }
+ prevCharCR = (buffer[bufferPosn] == CR);
+ }
+ int readLength = bufferPosn - startPosn;
+ if (prevCharCR && newlineLength == 0) {
+ --readLength; //CR at the end of the buffer
+ }
+ bytesConsumed += readLength;
+ int appendLength = readLength - newlineLength;
+ if (appendLength > maxLineLength - txtLength) {
+ appendLength = maxLineLength - txtLength;
+ }
+
+ if (appendLength > 0) {
+ str.write(buffer, startPosn, appendLength);
+ //System.out.println(startPosn + "," + appendLength);
+ //str.append(buffer, startPosn, appendLength);
+ txtLength += appendLength;
+ }
+
+ if(newlineLength > 0){
+ validIdx++;
+
+ if (bytesConsumed > (long)Integer.MAX_VALUE) {
+ throw new IOException("Too many bytes before newline: " + bytesConsumed);
+ }
+ offsets.add(txtLength);
+ foffsets.add(pos);
+ pos+= bytesConsumed;
+ bufferBytesConsumed += bytesConsumed;
+
+ txtLength = 0;
+ newlineLength = 0;
+ prevCharCR = false; //true of prev char was CR
+ bytesConsumed = 0;
+ } else {
+ bufferBytesConsumed += bytesConsumed;
+ bytesConsumed = 0;
+ }
+ } while ((bufferBytesConsumed < 256 * 1024));
+
+ return (int)bufferBytesConsumed;
+ }*/
+
+ /**
+ * Read a line terminated by a custom delimiter.
+ */
+ private int readCustomLine(Text str, int maxLineLength, int maxBytesToConsume)
+ throws IOException {
+ /* We're reading data from inputStream, but the head of the stream may be
+ * already captured in the previous buffer, so we have several cases:
+ *
+ * 1. The buffer tail does not contain any character sequence which
+ * matches with the head of delimiter. We count it as a
+ * ambiguous byte count = 0
+ *
+ * 2. The buffer tail contains a X number of characters,
+ * that forms a sequence, which matches with the
+ * head of delimiter. We count ambiguous byte count = X
+ *
+ * // *** eg: A segment of input file is as follows
+ *
+ * " record 1792: I found this bug very interesting and
+ * I have completely read about it. record 1793: This bug
+ * can be solved easily record 1794: This ."
+ *
+ * delimiter = "record";
+ *
+ * supposing:- String at the end of buffer =
+ * "I found this bug very interesting and I have completely re"
+ * There for next buffer = "ad about it. record 179 ...."
+ *
+ * The matching characters in the input
+ * buffer tail and delimiter head = "re"
+ * Therefore, ambiguous byte count = 2 **** //
+ *
+ * 2.1 If the following bytes are the remaining characters of
+ * the delimiter, then we have to capture only up to the starting
+ * position of delimiter. That means, we need not include the
+ * ambiguous characters in str.
+ *
+ * 2.2 If the following bytes are not the remaining characters of
+ * the delimiter ( as mentioned in the example ),
+ * then we have to include the ambiguous characters in str.
+ */
+ str.clear();
+ int txtLength = 0; // tracks str.getLength(), as an optimization
+ long bytesConsumed = 0;
+ int delPosn = 0;
+ int ambiguousByteCount = 0; // To capture the ambiguous characters count
+ do {
+ int startPosn = bufferPosn; // Start from previous end position
+ if (bufferPosn >= bufferLength) {
+ startPosn = bufferPosn = 0;
+ bufferLength = fillBuffer(in, buffer, ambiguousByteCount > 0);
+ if (bufferLength <= 0) {
+ str.append(recordDelimiterBytes, 0, ambiguousByteCount);
+ break; // EOF
+ }
+ }
+ for (; bufferPosn < bufferLength; ++bufferPosn) {
+ if (buffer[bufferPosn] == recordDelimiterBytes[delPosn]) {
+ delPosn++;
+ if (delPosn >= recordDelimiterBytes.length) {
+ bufferPosn++;
+ break;
+ }
+ } else if (delPosn != 0) {
+ bufferPosn--;
+ delPosn = 0;
+ }
+ }
+ int readLength = bufferPosn - startPosn;
+ bytesConsumed += readLength;
+ int appendLength = readLength - delPosn;
+ if (appendLength > maxLineLength - txtLength) {
+ appendLength = maxLineLength - txtLength;
+ }
+ if (appendLength > 0) {
+ if (ambiguousByteCount > 0) {
+ str.append(recordDelimiterBytes, 0, ambiguousByteCount);
+ //appending the ambiguous characters (refer case 2.2)
+ bytesConsumed += ambiguousByteCount;
+ ambiguousByteCount = 0;
+ }
+ str.append(buffer, startPosn, appendLength);
+ txtLength += appendLength;
+ }
+ if (bufferPosn >= bufferLength) {
+ if (delPosn > 0 && delPosn < recordDelimiterBytes.length) {
+ ambiguousByteCount = delPosn;
+ bytesConsumed -= ambiguousByteCount; //to be consumed in next
+ }
+ }
+ } while (delPosn < recordDelimiterBytes.length
+ && bytesConsumed < maxBytesToConsume);
+ if (bytesConsumed > (long) Integer.MAX_VALUE) {
+ throw new IOException("Too many bytes before delimiter: " + bytesConsumed);
+ }
+ return (int) bytesConsumed;
+ }
+
+ /**
+ * Read from the InputStream into the given Text.
+ *
+ * @param str the object to store the given line
+ * @param maxLineLength the maximum number of bytes to store into str.
+ * @return the number of bytes read including the newline
+ * @throws IOException if the underlying stream throws
+ */
+ public int readLine(Text str, int maxLineLength) throws IOException {
+ return readLine(str, maxLineLength, Integer.MAX_VALUE);
+ }
+
+ /**
+ * Read from the InputStream into the given Text.
+ *
+ * @param str the object to store the given line
+ * @return the number of bytes read including the newline
+ * @throws IOException if the underlying stream throws
+ */
+ public int readLine(Text str) throws IOException {
+ return readLine(str, Integer.MAX_VALUE, Integer.MAX_VALUE);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java b/tajo-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java
new file mode 100644
index 0000000..e4439f3
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.fragment.Fragment;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+public class MergeScanner implements Scanner {
+ private Configuration conf;
+ private TableMeta meta;
+ private Schema schema;
+ private List<FileFragment> fragments;
+ private Iterator<FileFragment> iterator;
+ private FileFragment currentFragment;
+ private Scanner currentScanner;
+ private Tuple tuple;
+ private boolean projectable = false;
+ private boolean selectable = false;
+ private Schema target;
+
+ public MergeScanner(Configuration conf, Schema schema, TableMeta meta, Collection<FileFragment> rawFragmentList)
+ throws IOException {
+ this(conf, schema, meta, rawFragmentList, schema);
+ }
+
+ public MergeScanner(Configuration conf, Schema schema, TableMeta meta, Collection<FileFragment> rawFragmentList,
+ Schema target)
+ throws IOException {
+ this.conf = conf;
+ this.schema = schema;
+ this.meta = meta;
+ this.fragments = Lists.newArrayList();
+ for (Fragment f : rawFragmentList) {
+ fragments.add((FileFragment) f);
+ }
+ Collections.sort(fragments);
+
+ this.target = target;
+ this.reset();
+ if (currentScanner != null) {
+ this.projectable = currentScanner.isProjectable();
+ this.selectable = currentScanner.isSelectable();
+ }
+ }
+
+ @Override
+ public void init() throws IOException {
+ }
+
+ @Override
+ public Tuple next() throws IOException {
+ if (currentScanner != null)
+ tuple = currentScanner.next();
+
+ if (tuple != null) {
+ return tuple;
+ } else {
+ if (currentScanner != null) {
+ currentScanner.close();
+ }
+ currentScanner = getNextScanner();
+ if (currentScanner != null) {
+ tuple = currentScanner.next();
+ }
+ }
+ return tuple;
+ }
+
+ @Override
+ public void reset() throws IOException {
+ this.iterator = fragments.iterator();
+ this.currentScanner = getNextScanner();
+ }
+
+ private Scanner getNextScanner() throws IOException {
+ if (iterator.hasNext()) {
+ currentFragment = iterator.next();
+ currentScanner = StorageManagerFactory.getStorageManager((TajoConf)conf).getScanner(meta, schema,
+ currentFragment, target);
+ currentScanner.init();
+ return currentScanner;
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ if(currentScanner != null) {
+ currentScanner.close();
+ }
+ iterator = null;
+ if(fragments != null) {
+ fragments.clear();
+ }
+ }
+
+ @Override
+ public boolean isProjectable() {
+ return projectable;
+ }
+
+ @Override
+ public void setTarget(Column[] targets) {
+ this.target = new Schema(targets);
+ }
+
+ @Override
+ public boolean isSelectable() {
+ return selectable;
+ }
+
+ @Override
+ public void setSearchCondition(Object expr) {
+ }
+
+ @Override
+ public Schema getSchema() {
+ return schema;
+ }
+
+ @Override
+ public boolean isSplittable(){
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-storage/src/main/java/org/apache/tajo/storage/NumericPathComparator.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/NumericPathComparator.java b/tajo-storage/src/main/java/org/apache/tajo/storage/NumericPathComparator.java
new file mode 100644
index 0000000..94d13ee
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/NumericPathComparator.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.hadoop.fs.Path;
+
+import java.util.Comparator;
+
+public class NumericPathComparator implements Comparator<Path> {
+
+ @Override
+ public int compare(Path p1, Path p2) {
+ int num1 = Integer.parseInt(p1.getName());
+ int num2 = Integer.parseInt(p2.getName());
+
+ return num1 - num2;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java b/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java
new file mode 100644
index 0000000..db511dc
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java
@@ -0,0 +1,532 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import com.google.protobuf.Message;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.common.TajoDataTypes.DataType;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.datum.ProtobufDatumFactory;
+import org.apache.tajo.datum.TimestampDatum;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.util.BitArray;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.Arrays;
+
+public class RawFile {
+ private static final Log LOG = LogFactory.getLog(RawFile.class);
+
+ public static class RawFileScanner extends FileScanner implements SeekableScanner {
+ private FileChannel channel;
+ private DataType[] columnTypes;
+ private Path path;
+
+ private ByteBuffer buffer;
+ private Tuple tuple;
+
+ private int headerSize = 0;
+ private BitArray nullFlags;
+ private static final int RECORD_SIZE = 4;
+ private boolean eof = false;
+ private long fileSize;
+ private FileInputStream fis;
+
+ public RawFileScanner(Configuration conf, Schema schema, TableMeta meta, Path path) throws IOException {
+ super(conf, schema, meta, null);
+ this.path = path;
+ init();
+ }
+
+ @SuppressWarnings("unused")
+ public RawFileScanner(Configuration conf, Schema schema, TableMeta meta, FileFragment fragment) throws IOException {
+ this(conf, schema, meta, fragment.getPath());
+ }
+
+ public void init() throws IOException {
+ //Preconditions.checkArgument(FileUtil.isLocalPath(path));
+ // TODO - to make it unified one.
+ URI uri = path.toUri();
+ fis = new FileInputStream(new File(uri));
+ channel = fis.getChannel();
+ fileSize = channel.size();
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("RawFileScanner open:" + path + "," + channel.position() + ", size :" + channel.size());
+ }
+
+ buffer = ByteBuffer.allocateDirect(128 * 1024);
+
+ columnTypes = new DataType[schema.getColumnNum()];
+ for (int i = 0; i < schema.getColumnNum(); i++) {
+ columnTypes[i] = schema.getColumn(i).getDataType();
+ }
+
+ tuple = new VTuple(columnTypes.length);
+
+ // initial read
+ channel.read(buffer);
+ buffer.flip();
+
+ nullFlags = new BitArray(schema.getColumnNum());
+ headerSize = RECORD_SIZE + 2 + nullFlags.bytesLength();
+
+ super.init();
+ }
+
+ @Override
+ public long getNextOffset() throws IOException {
+ return channel.position() - buffer.remaining();
+ }
+
+ @Override
+ public void seek(long offset) throws IOException {
+ long currentPos = channel.position();
+ if(currentPos < offset && offset < currentPos + buffer.limit()){
+ buffer.position((int)(offset - currentPos));
+ } else {
+ buffer.clear();
+ channel.position(offset);
+ channel.read(buffer);
+ buffer.flip();
+ eof = false;
+ }
+ }
+
+ private boolean fillBuffer() throws IOException {
+ buffer.compact();
+ if (channel.read(buffer) == -1) {
+ eof = true;
+ return false;
+ } else {
+ buffer.flip();
+ return true;
+ }
+ }
+
+ @Override
+ public Tuple next() throws IOException {
+ if(eof) return null;
+
+ if (buffer.remaining() < headerSize) {
+ if (!fillBuffer()) {
+ return null;
+ }
+ }
+
+ // backup the buffer state
+ int bufferLimit = buffer.limit();
+ int recordSize = buffer.getInt();
+ int nullFlagSize = buffer.getShort();
+
+ buffer.limit(buffer.position() + nullFlagSize);
+ nullFlags.fromByteBuffer(buffer);
+ // restore the start of record contents
+ buffer.limit(bufferLimit);
+ //buffer.position(recordOffset + headerSize);
+ if (buffer.remaining() < (recordSize - headerSize)) {
+ if (!fillBuffer()) {
+ return null;
+ }
+ }
+
+ for (int i = 0; i < columnTypes.length; i++) {
+ // check if the i'th column is null
+ if (nullFlags.get(i)) {
+ tuple.put(i, DatumFactory.createNullDatum());
+ continue;
+ }
+
+ switch (columnTypes[i].getType()) {
+ case BOOLEAN :
+ tuple.put(i, DatumFactory.createBool(buffer.get()));
+ break;
+
+ case BIT :
+ tuple.put(i, DatumFactory.createBit(buffer.get()));
+ break;
+
+ case CHAR :
+ int realLen = buffer.getInt();
+ byte[] buf = new byte[columnTypes[i].getLength()];
+ buffer.get(buf);
+ byte[] charBuf = Arrays.copyOf(buf, realLen);
+ tuple.put(i, DatumFactory.createChar(charBuf));
+ break;
+
+ case INT2 :
+ tuple.put(i, DatumFactory.createInt2(buffer.getShort()));
+ break;
+
+ case INT4 :
+ tuple.put(i, DatumFactory.createInt4(buffer.getInt()));
+ break;
+
+ case INT8 :
+ tuple.put(i, DatumFactory.createInt8(buffer.getLong()));
+ break;
+
+ case FLOAT4 :
+ tuple.put(i, DatumFactory.createFloat4(buffer.getFloat()));
+ break;
+
+ case FLOAT8 :
+ tuple.put(i, DatumFactory.createFloat8(buffer.getDouble()));
+ break;
+
+ case TEXT :
+ // TODO - shoud use CharsetEncoder / CharsetDecoder
+ //byte [] rawBytes = getColumnBytes();
+ int strSize2 = buffer.getInt();
+ byte [] strBytes2 = new byte[strSize2];
+ buffer.get(strBytes2);
+ tuple.put(i, DatumFactory.createText(new String(strBytes2)));
+ break;
+
+ case TIMESTAMP:
+ tuple.put(i, DatumFactory.createTimeStampFromMillis(buffer.getLong()));
+ break;
+
+ case BLOB : {
+ //byte [] rawBytes = getColumnBytes();
+ int byteSize = buffer.getInt();
+ byte [] rawBytes = new byte[byteSize];
+ buffer.get(rawBytes);
+ tuple.put(i, DatumFactory.createBlob(rawBytes));
+ break;
+ }
+
+ case PROTOBUF: {
+ //byte [] rawBytes = getColumnBytes();
+ int byteSize = buffer.getInt();
+ byte [] rawBytes = new byte[byteSize];
+ buffer.get(rawBytes);
+
+ ProtobufDatumFactory factory = ProtobufDatumFactory.get(columnTypes[i]);
+ Message.Builder builder = factory.newBuilder();
+ builder.mergeFrom(rawBytes);
+ tuple.put(i, factory.createDatum(builder.build()));
+ break;
+ }
+
+ case INET4 :
+ byte [] ipv4Bytes = new byte[4];
+ buffer.get(ipv4Bytes);
+ tuple.put(i, DatumFactory.createInet4(ipv4Bytes));
+ break;
+
+ case NULL_TYPE:
+ tuple.put(i, NullDatum.get());
+ break;
+
+ default:
+ }
+ }
+
+ if(!buffer.hasRemaining() && channel.position() == fileSize){
+ eof = true;
+ }
+ return tuple;
+ }
+
+ @Override
+ public void reset() throws IOException {
+ // clear the buffer
+ buffer.clear();
+ // reload initial buffer
+ channel.position(0);
+ channel.read(buffer);
+ buffer.flip();
+ eof = false;
+ }
+
+ @Override
+ public void close() throws IOException {
+ buffer.clear();
+ channel.close();
+ fis.close();
+ }
+
+ @Override
+ public boolean isProjectable() {
+ return false;
+ }
+
+ @Override
+ public boolean isSelectable() {
+ return false;
+ }
+
+ @Override
+ public boolean isSplittable(){
+ return false;
+ }
+ }
+
+ public static class RawFileAppender extends FileAppender {
+ private FileChannel channel;
+ private RandomAccessFile randomAccessFile;
+ private DataType[] columnTypes;
+
+ private ByteBuffer buffer;
+ private BitArray nullFlags;
+ private int headerSize = 0;
+ private static final int RECORD_SIZE = 4;
+ private long pos;
+
+ private TableStatistics stats;
+
+ public RawFileAppender(Configuration conf, Schema schema, TableMeta meta, Path path) throws IOException {
+ super(conf, schema, meta, path);
+ }
+
+ public void init() throws IOException {
+ // TODO - RawFile only works on Local File System.
+ //Preconditions.checkArgument(FileUtil.isLocalPath(path));
+ File file = new File(path.toUri());
+ randomAccessFile = new RandomAccessFile(file, "rw");
+ channel = randomAccessFile.getChannel();
+ pos = 0;
+
+ columnTypes = new DataType[schema.getColumnNum()];
+ for (int i = 0; i < schema.getColumnNum(); i++) {
+ columnTypes[i] = schema.getColumn(i).getDataType();
+ }
+
+ buffer = ByteBuffer.allocateDirect(64 * 1024);
+
+ // comput the number of bytes, representing the null flags
+
+ nullFlags = new BitArray(schema.getColumnNum());
+ headerSize = RECORD_SIZE + 2 + nullFlags.bytesLength();
+
+ if (enabledStats) {
+ this.stats = new TableStatistics(this.schema);
+ }
+
+ super.init();
+ }
+
+ @Override
+ public long getOffset() throws IOException {
+ return pos;
+ }
+
+ private void flushBuffer() throws IOException {
+ buffer.limit(buffer.position());
+ buffer.flip();
+ channel.write(buffer);
+ buffer.clear();
+ }
+
+ private boolean flushBufferAndReplace(int recordOffset, int sizeToBeWritten)
+ throws IOException {
+
+ // if the buffer reaches the limit,
+ // write the bytes from 0 to the previous record.
+ if (buffer.remaining() < sizeToBeWritten) {
+
+ int limit = buffer.position();
+ buffer.limit(recordOffset);
+ buffer.flip();
+ channel.write(buffer);
+ buffer.position(recordOffset);
+ buffer.limit(limit);
+ buffer.compact();
+
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public void addTuple(Tuple t) throws IOException {
+
+ if (buffer.remaining() < headerSize) {
+ flushBuffer();
+ }
+
+ // skip the row header
+ int recordOffset = buffer.position();
+ buffer.position(recordOffset + headerSize);
+ // reset the null flags
+ nullFlags.clear();
+ for (int i = 0; i < schema.getColumnNum(); i++) {
+ if (enabledStats) {
+ stats.analyzeField(i, t.get(i));
+ }
+
+ if (t.isNull(i)) {
+ nullFlags.set(i);
+ continue;
+ }
+
+ // 8 is the maximum bytes size of all types
+ if (flushBufferAndReplace(recordOffset, 8)) {
+ recordOffset = 0;
+ }
+
+ switch(columnTypes[i].getType()) {
+ case NULL_TYPE:
+ nullFlags.set(i);
+ continue;
+
+ case BOOLEAN:
+ case BIT:
+ buffer.put(t.get(i).asByte());
+ break;
+
+ case CHAR :
+ byte[] src = t.getChar(i).asByteArray();
+ byte[] dst = Arrays.copyOf(src, columnTypes[i].getLength());
+ buffer.putInt(src.length);
+ buffer.put(dst);
+ break;
+
+ case INT2 :
+ buffer.putShort(t.get(i).asInt2());
+ break;
+
+ case INT4 :
+ buffer.putInt(t.get(i).asInt4());
+ break;
+
+ case INT8 :
+ buffer.putLong(t.get(i).asInt8());
+ break;
+
+ case FLOAT4 :
+ buffer.putFloat(t.get(i).asFloat4());
+ break;
+
+ case FLOAT8 :
+ buffer.putDouble(t.get(i).asFloat8());
+ break;
+
+ case TEXT:
+ byte [] strBytes2 = t.get(i).asByteArray();
+ if (flushBufferAndReplace(recordOffset, strBytes2.length + 4)) {
+ recordOffset = 0;
+ }
+ buffer.putInt(strBytes2.length);
+ buffer.put(strBytes2);
+ break;
+
+ case TIMESTAMP:
+ buffer.putLong(((TimestampDatum)t.get(i)).getMillis());
+ break;
+
+ case BLOB : {
+ byte [] rawBytes = t.get(i).asByteArray();
+ if (flushBufferAndReplace(recordOffset, rawBytes.length + 4)) {
+ recordOffset = 0;
+ }
+ buffer.putInt(rawBytes.length);
+ buffer.put(rawBytes);
+ break;
+ }
+
+ case PROTOBUF: {
+ // TODO - to be fixed
+// byte [] lengthByte = new byte[4];
+// byte [] byteArray = t.get(i).asByteArray();
+// CodedOutputStream outputStream = CodedOutputStream.newInstance(lengthByte);
+// outputStream.writeUInt32NoTag(byteArray.length);
+// outputStream.flush();
+// int legnthByteLength = CodedOutputStream.computeInt32SizeNoTag(byteArray.length);
+// if (flushBufferAndReplace(recordOffset, byteArray.length + legnthByteLength)) {
+// recordOffset = 0;
+// }
+// buffer.put(lengthByte, 0, legnthByteLength);
+ byte [] rawBytes = t.get(i).asByteArray();
+ if (flushBufferAndReplace(recordOffset, rawBytes.length + 4)) {
+ recordOffset = 0;
+ }
+ buffer.putInt(rawBytes.length);
+ buffer.put(rawBytes);
+ break;
+ }
+
+ case INET4 :
+ buffer.put(t.get(i).asByteArray());
+ break;
+
+ default:
+ throw new IOException("Cannot support data type: " + columnTypes[i].getType());
+ }
+ }
+
+ // write a record header
+ int bufferPos = buffer.position();
+ buffer.position(recordOffset);
+ buffer.putInt(bufferPos - recordOffset);
+ byte [] flags = nullFlags.toArray();
+ buffer.putShort((short) flags.length);
+ buffer.put(flags);
+
+ pos += bufferPos - recordOffset;
+ buffer.position(bufferPos);
+
+ if (enabledStats) {
+ stats.incrementRow();
+ }
+ }
+
+ @Override
+ public void flush() throws IOException {
+ flushBuffer();
+ }
+
+ @Override
+ public void close() throws IOException {
+ flush();
+ if (enabledStats) {
+ stats.setNumBytes(getOffset());
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("RawFileAppender written: " + getOffset() + " bytes, path: " + path);
+ }
+ channel.close();
+ randomAccessFile.close();
+ }
+
+ @Override
+ public TableStats getStats() {
+ if (enabledStats) {
+ return stats.getTableStat();
+ } else {
+ return null;
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-storage/src/main/java/org/apache/tajo/storage/RowFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/RowFile.java b/tajo-storage/src/main/java/org/apache/tajo/storage/RowFile.java
new file mode 100644
index 0000000..1e89f31
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/RowFile.java
@@ -0,0 +1,506 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.conf.TajoConf.ConfVars;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.storage.exception.AlreadyExistsStorageException;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.util.BitArray;
+import org.apache.tajo.util.Bytes;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.Arrays;
+
+public class RowFile {
+ public static final Log LOG = LogFactory.getLog(RowFile.class);
+
+ private static final int SYNC_ESCAPE = -1;
+ private static final int SYNC_HASH_SIZE = 16;
+ private static final int SYNC_SIZE = 4 + SYNC_HASH_SIZE;
+ private final static int DEFAULT_BUFFER_SIZE = 65535;
+ public static int SYNC_INTERVAL;
+
+ public static class RowFileScanner extends FileScanner {
+ private FileSystem fs;
+ private FSDataInputStream in;
+ private Tuple tuple;
+
+ private byte[] sync = new byte[SYNC_HASH_SIZE];
+ private byte[] checkSync = new byte[SYNC_HASH_SIZE];
+ private long start, end;
+
+ private ByteBuffer buffer;
+ private final int tupleHeaderSize;
+ private BitArray nullFlags;
+ private long bufferStartPos;
+
+ public RowFileScanner(Configuration conf, final Schema schema, final TableMeta meta, final FileFragment fragment)
+ throws IOException {
+ super(conf, schema, meta, fragment);
+
+ SYNC_INTERVAL =
+ conf.getInt(ConfVars.RAWFILE_SYNC_INTERVAL.varname,
+ SYNC_SIZE * 100);
+
+ nullFlags = new BitArray(schema.getColumnNum());
+ tupleHeaderSize = nullFlags.bytesLength() + (2 * Short.SIZE / 8);
+ this.start = fragment.getStartKey();
+ this.end = this.start + fragment.getEndKey();
+ }
+
+ public void init() throws IOException {
+ // set default page size.
+ fs = fragment.getPath().getFileSystem(conf);
+ in = fs.open(fragment.getPath());
+ buffer = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE * schema.getColumnNum());
+ buffer.flip();
+
+ readHeader();
+
+ // find the correct position from the start
+ if (this.start > in.getPos()) {
+ long realStart = start > SYNC_SIZE ? (start-SYNC_SIZE) : 0;
+ in.seek(realStart);
+ }
+ bufferStartPos = in.getPos();
+ fillBuffer();
+
+ if (start != 0) {
+ // TODO: improve
+ boolean syncFound = false;
+ while (!syncFound) {
+ if (buffer.remaining() < SYNC_SIZE) {
+ fillBuffer();
+ }
+ buffer.mark();
+ syncFound = checkSync();
+ if (!syncFound) {
+ buffer.reset();
+ buffer.get(); // proceed one byte
+ }
+ }
+ bufferStartPos += buffer.position();
+ buffer.compact();
+ buffer.flip();
+ }
+
+ super.init();
+ }
+
+ private void readHeader() throws IOException {
+ SYNC_INTERVAL = in.readInt();
+ Bytes.readFully(in, this.sync, 0, SYNC_HASH_SIZE);
+ }
+
+ /**
+ * Find the sync from the front of the buffer
+ *
+ * @return return true if it succeeds to find the sync.
+ * @throws IOException
+ */
+ private boolean checkSync() throws IOException {
+ buffer.getInt(); // escape
+ buffer.get(checkSync, 0, SYNC_HASH_SIZE); // sync
+ return Arrays.equals(checkSync, sync);
+ }
+
+ private int fillBuffer() throws IOException {
+ bufferStartPos += buffer.position();
+ buffer.compact();
+ int remain = buffer.remaining();
+ int read = in.read(buffer);
+ if (read == -1) {
+ buffer.flip();
+ return read;
+ } else {
+ int totalRead = read;
+ if (remain > totalRead) {
+ read = in.read(buffer);
+ totalRead += read > 0 ? read : 0;
+ }
+ buffer.flip();
+ return totalRead;
+ }
+ }
+
+ @Override
+ public Tuple next() throws IOException {
+ while (buffer.remaining() < SYNC_SIZE) {
+ if (fillBuffer() < 0) {
+ return null;
+ }
+ }
+
+ buffer.mark();
+ if (!checkSync()) {
+ buffer.reset();
+ } else {
+ if (bufferStartPos + buffer.position() > end) {
+ return null;
+ }
+ }
+
+ while (buffer.remaining() < tupleHeaderSize) {
+ if (fillBuffer() < 0) {
+ return null;
+ }
+ }
+
+ int i;
+ tuple = new VTuple(schema.getColumnNum());
+
+ int nullFlagSize = buffer.getShort();
+ byte[] nullFlagBytes = new byte[nullFlagSize];
+ buffer.get(nullFlagBytes, 0, nullFlagSize);
+ nullFlags = new BitArray(nullFlagBytes);
+ int tupleSize = buffer.getShort();
+
+ while (buffer.remaining() < (tupleSize)) {
+ if (fillBuffer() < 0) {
+ return null;
+ }
+ }
+
+ Datum datum;
+ Column col;
+ for (i = 0; i < schema.getColumnNum(); i++) {
+ if (!nullFlags.get(i)) {
+ col = schema.getColumn(i);
+ switch (col.getDataType().getType()) {
+ case BOOLEAN :
+ datum = DatumFactory.createBool(buffer.get());
+ tuple.put(i, datum);
+ break;
+
+ case BIT:
+ datum = DatumFactory.createBit(buffer.get());
+ tuple.put(i, datum );
+ break;
+
+ case CHAR :
+ int realLen = buffer.getInt();
+ byte[] buf = new byte[col.getDataType().getLength()];
+ buffer.get(buf);
+ byte[] charBuf = Arrays.copyOf(buf, realLen);
+ tuple.put(i, DatumFactory.createChar(charBuf));
+ break;
+
+ case INT2 :
+ datum = DatumFactory.createInt2(buffer.getShort());
+ tuple.put(i, datum );
+ break;
+
+ case INT4 :
+ datum = DatumFactory.createInt4(buffer.getInt());
+ tuple.put(i, datum );
+ break;
+
+ case INT8 :
+ datum = DatumFactory.createInt8(buffer.getLong());
+ tuple.put(i, datum );
+ break;
+
+ case FLOAT4 :
+ datum = DatumFactory.createFloat4(buffer.getFloat());
+ tuple.put(i, datum);
+ break;
+
+ case FLOAT8 :
+ datum = DatumFactory.createFloat8(buffer.getDouble());
+ tuple.put(i, datum);
+ break;
+
+// case TEXT :
+// short len = buffer.getShort();
+// byte[] buf = new byte[len];
+// buffer.get(buf, 0, len);
+// datum = DatumFactory.createText(buf);
+// tuple.put(i, datum);
+// break;
+
+ case TEXT:
+ short bytelen = buffer.getShort();
+ byte[] strbytes = new byte[bytelen];
+ buffer.get(strbytes, 0, bytelen);
+ datum = DatumFactory.createText(strbytes);
+ tuple.put(i, datum);
+ break;
+
+ case BLOB:
+ short bytesLen = buffer.getShort();
+ byte [] bytesBuf = new byte[bytesLen];
+ buffer.get(bytesBuf);
+ datum = DatumFactory.createBlob(bytesBuf);
+ tuple.put(i, datum);
+ break;
+
+ case INET4 :
+ byte[] ipv4 = new byte[4];
+ buffer.get(ipv4, 0, 4);
+ datum = DatumFactory.createInet4(ipv4);
+ tuple.put(i, datum);
+ break;
+
+ default:
+ break;
+ }
+ } else {
+ tuple.put(i, DatumFactory.createNullDatum());
+ }
+ }
+ return tuple;
+ }
+
+ @Override
+ public void reset() throws IOException {
+ init();
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (in != null) {
+ in.close();
+ }
+ }
+
+ @Override
+ public boolean isProjectable() {
+ return false;
+ }
+
+ @Override
+ public boolean isSelectable() {
+ return false;
+ }
+
+ @Override
+ public boolean isSplittable(){
+ return true;
+ }
+ }
+
+ public static class RowFileAppender extends FileAppender {
+ private FSDataOutputStream out;
+ private long lastSyncPos;
+ private FileSystem fs;
+ private byte[] sync;
+ private ByteBuffer buffer;
+
+ private BitArray nullFlags;
+ // statistics
+ private TableStatistics stats;
+
+ public RowFileAppender(Configuration conf, final Schema schema, final TableMeta meta, final Path path)
+ throws IOException {
+ super(conf, schema, meta, path);
+ }
+
+ public void init() throws IOException {
+ SYNC_INTERVAL = conf.getInt(ConfVars.RAWFILE_SYNC_INTERVAL.varname, 100);
+
+ fs = path.getFileSystem(conf);
+
+ if (!fs.exists(path.getParent())) {
+ throw new FileNotFoundException(path.toString());
+ }
+
+ if (fs.exists(path)) {
+ throw new AlreadyExistsStorageException(path);
+ }
+
+ sync = new byte[SYNC_HASH_SIZE];
+ lastSyncPos = 0;
+
+ out = fs.create(path);
+
+ MessageDigest md;
+ try {
+ md = MessageDigest.getInstance("MD5");
+ md.update((path.toString()+System.currentTimeMillis()).getBytes());
+ sync = md.digest();
+ } catch (NoSuchAlgorithmException e) {
+ LOG.error(e);
+ }
+
+ writeHeader();
+
+ buffer = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE);
+
+ nullFlags = new BitArray(schema.getColumnNum());
+
+ if (enabledStats) {
+ this.stats = new TableStatistics(this.schema);
+ }
+ }
+
+ private void writeHeader() throws IOException {
+ out.writeInt(SYNC_INTERVAL);
+ out.write(sync);
+ out.flush();
+ lastSyncPos = out.getPos();
+ }
+
+ @Override
+ public void addTuple(Tuple t) throws IOException {
+ checkAndWriteSync();
+ Column col;
+
+ buffer.clear();
+ nullFlags.clear();
+
+ for (int i = 0; i < schema.getColumnNum(); i++) {
+ if (enabledStats) {
+ stats.analyzeField(i, t.get(i));
+ }
+
+ if (t.isNull(i)) {
+ nullFlags.set(i);
+ } else {
+ col = schema.getColumn(i);
+ switch (col.getDataType().getType()) {
+ case BOOLEAN:
+ buffer.put(t.getBoolean(i).asByte());
+ break;
+ case BIT:
+ buffer.put(t.getByte(i).asByte());
+ break;
+ case CHAR:
+ byte[] src = t.getChar(i).asByteArray();
+ byte[] dst = Arrays.copyOf(src, col.getDataType().getLength());
+ buffer.putInt(src.length);
+ buffer.put(dst);
+ break;
+ case TEXT:
+ byte [] strbytes = t.getText(i).asByteArray();
+ buffer.putShort((short)strbytes.length);
+ buffer.put(strbytes, 0, strbytes.length);
+ break;
+ case INT2:
+ buffer.putShort(t.getShort(i).asInt2());
+ break;
+ case INT4:
+ buffer.putInt(t.getInt(i).asInt4());
+ break;
+ case INT8:
+ buffer.putLong(t.getLong(i).asInt8());
+ break;
+ case FLOAT4:
+ buffer.putFloat(t.getFloat(i).asFloat4());
+ break;
+ case FLOAT8:
+ buffer.putDouble(t.getDouble(i).asFloat8());
+ break;
+ case BLOB:
+ byte [] bytes = t.getBytes(i).asByteArray();
+ buffer.putShort((short)bytes.length);
+ buffer.put(bytes);
+ break;
+ case INET4:
+ buffer.put(t.getIPv4Bytes(i));
+ break;
+ case INET6:
+ buffer.put(t.getIPv6Bytes(i));
+ break;
+ case NULL_TYPE:
+ nullFlags.set(i);
+ break;
+ default:
+ break;
+ }
+ }
+ }
+
+ byte[] bytes = nullFlags.toArray();
+ out.writeShort(bytes.length);
+ out.write(bytes);
+
+ bytes = buffer.array();
+ int dataLen = buffer.position();
+ out.writeShort(dataLen);
+ out.write(bytes, 0, dataLen);
+
+ // Statistical section
+ if (enabledStats) {
+ stats.incrementRow();
+ }
+ }
+
+ @Override
+ public long getOffset() throws IOException {
+ return out.getPos();
+ }
+
+ @Override
+ public void flush() throws IOException {
+ out.flush();
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (out != null) {
+ if (enabledStats) {
+ stats.setNumBytes(out.getPos());
+ }
+ sync();
+ out.flush();
+ out.close();
+ }
+ }
+
+ private void sync() throws IOException {
+ if (lastSyncPos != out.getPos()) {
+ out.writeInt(SYNC_ESCAPE);
+ out.write(sync);
+ lastSyncPos = out.getPos();
+ }
+ }
+
+ private void checkAndWriteSync() throws IOException {
+ if (out.getPos() >= lastSyncPos + SYNC_INTERVAL) {
+ sync();
+ }
+ }
+
+ @Override
+ public TableStats getStats() {
+ if (enabledStats) {
+ return stats.getTableStat();
+ } else {
+ return null;
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-storage/src/main/java/org/apache/tajo/storage/RowStoreUtil.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/RowStoreUtil.java b/tajo-storage/src/main/java/org/apache/tajo/storage/RowStoreUtil.java
new file mode 100644
index 0000000..9f32028
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/RowStoreUtil.java
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.datum.TimestampDatum;
+import org.apache.tajo.util.Bytes;
+
+import java.nio.ByteBuffer;
+
+public class RowStoreUtil {
+ public static int[] getTargetIds(Schema inSchema, Schema outSchema) {
+ int[] targetIds = new int[outSchema.getColumnNum()];
+ int i = 0;
+ for (Column target : outSchema.getColumns()) {
+ targetIds[i] = inSchema.getColumnId(target.getQualifiedName());
+ i++;
+ }
+
+ return targetIds;
+ }
+
+ public static Tuple project(Tuple in, Tuple out, int[] targetIds) {
+ out.clear();
+ for (int idx = 0; idx < targetIds.length; idx++) {
+ out.put(idx, in.get(targetIds[idx]));
+ }
+ return out;
+ }
+
+ public static class RowStoreDecoder {
+
+ public static Tuple toTuple(Schema schema, byte [] bytes) {
+ ByteBuffer bb = ByteBuffer.wrap(bytes);
+ Tuple tuple = new VTuple(schema.getColumnNum());
+ Column col;
+ TajoDataTypes.DataType type;
+ for (int i =0; i < schema.getColumnNum(); i++) {
+ col = schema.getColumn(i);
+ type = col.getDataType();
+ switch (type.getType()) {
+ case BOOLEAN: tuple.put(i, DatumFactory.createBool(bb.get())); break;
+ case BIT:
+ byte b = bb.get();
+ if(b == 0) {
+ tuple.put(i, DatumFactory.createNullDatum());
+ } else {
+ tuple.put(i, DatumFactory.createBit(b));
+ }
+ break;
+
+ case CHAR:
+ byte c = bb.get();
+ if(c == 0) {
+ tuple.put(i, DatumFactory.createNullDatum());
+ } else {
+ tuple.put(i, DatumFactory.createChar(c));
+ }
+ break;
+
+ case INT2:
+ short s = bb.getShort();
+ if(s < Short.MIN_VALUE + 1) {
+ tuple.put(i, DatumFactory.createNullDatum());
+ }else {
+ tuple.put(i, DatumFactory.createInt2(s));
+ }
+ break;
+
+ case INT4:
+ case DATE:
+ int i_ = bb.getInt();
+ if ( i_ < Integer.MIN_VALUE + 1) {
+ tuple.put(i, DatumFactory.createNullDatum());
+ } else {
+ tuple.put(i, DatumFactory.createFromInt4(type, i_));
+ }
+ break;
+
+ case INT8:
+ case TIME:
+ case TIMESTAMP:
+ long l = bb.getLong();
+ if ( l < Long.MIN_VALUE + 1) {
+ tuple.put(i, DatumFactory.createNullDatum());
+ }else {
+ tuple.put(i, DatumFactory.createFromInt8(type, l));
+ }
+ break;
+
+ case FLOAT4:
+ float f = bb.getFloat();
+ if (Float.isNaN(f)) {
+ tuple.put(i, DatumFactory.createNullDatum());
+ }else {
+ tuple.put(i, DatumFactory.createFloat4(f));
+ }
+ break;
+
+ case FLOAT8:
+ double d = bb.getDouble();
+ if(Double.isNaN(d)) {
+ tuple.put(i, DatumFactory.createNullDatum());
+ }else {
+ tuple.put(i, DatumFactory.createFloat8(d));
+ }
+ break;
+
+ case TEXT:
+ byte [] _string = new byte[bb.getInt()];
+ bb.get(_string);
+ String str = new String(_string);
+ if(str.compareTo("NULL") == 0) {
+ tuple.put(i, DatumFactory.createNullDatum());
+ }else {
+ tuple.put(i, DatumFactory.createText(str));
+ }
+ break;
+
+ case BLOB:
+ byte [] _bytes = new byte[bb.getInt()];
+ bb.get(_bytes);
+ if(Bytes.compareTo(bytes, Bytes.toBytes("NULL")) == 0) {
+ tuple.put(i, DatumFactory.createNullDatum());
+ } else {
+ tuple.put(i, DatumFactory.createBlob(_bytes));
+ }
+ break;
+
+ case INET4:
+ byte [] _ipv4 = new byte[4];
+ bb.get(_ipv4);
+ tuple.put(i, DatumFactory.createInet4(_ipv4));
+ break;
+ case INET6:
+ // TODO - to be implemented
+ }
+ }
+ return tuple;
+ }
+ }
+
+ public static class RowStoreEncoder {
+
+ public static byte [] toBytes(Schema schema, Tuple tuple) {
+ int size = StorageUtil.getRowByteSize(schema);
+ ByteBuffer bb = ByteBuffer.allocate(size);
+ Column col;
+ for (int i = 0; i < schema.getColumnNum(); i++) {
+ col = schema.getColumn(i);
+ switch (col.getDataType().getType()) {
+ case BOOLEAN: bb.put(tuple.get(i).asByte()); break;
+ case BIT: bb.put(tuple.get(i).asByte()); break;
+ case CHAR: bb.put(tuple.get(i).asByte()); break;
+ case INT2: bb.putShort(tuple.get(i).asInt2()); break;
+ case INT4: bb.putInt(tuple.get(i).asInt4()); break;
+ case INT8: bb.putLong(tuple.get(i).asInt8()); break;
+ case FLOAT4: bb.putFloat(tuple.get(i).asFloat4()); break;
+ case FLOAT8: bb.putDouble(tuple.get(i).asFloat8()); break;
+ case TEXT:
+ byte [] _string = tuple.get(i).asByteArray();
+ bb.putInt(_string.length);
+ bb.put(_string);
+ break;
+ case DATE: bb.putInt(tuple.get(i).asInt4()); break;
+ case TIMESTAMP: bb.putLong(((TimestampDatum)tuple.get(i)).getMillis()); break;
+ case BLOB:
+ byte [] bytes = tuple.get(i).asByteArray();
+ bb.putInt(bytes.length);
+ bb.put(bytes);
+ break;
+ case INET4:
+ byte [] ipBytes = tuple.getIPv4Bytes(i);
+ bb.put(ipBytes);
+ break;
+ case INET6: bb.put(tuple.getIPv6Bytes(i)); break;
+ default:
+ }
+ }
+
+ bb.flip();
+ byte [] buf = new byte [bb.limit()];
+ bb.get(buf);
+ return buf;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-storage/src/main/java/org/apache/tajo/storage/Scanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/Scanner.java b/tajo-storage/src/main/java/org/apache/tajo/storage/Scanner.java
new file mode 100644
index 0000000..6dca3f2
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/Scanner.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.SchemaObject;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * Scanner Interface
+ */
+public interface Scanner extends SchemaObject, Closeable {
+
+ void init() throws IOException;
+
+ /**
+ * It returns one tuple at each call.
+ *
+ * @return retrieve null if the scanner has no more tuples.
+ * Otherwise it returns one tuple.
+ *
+ * @throws IOException if internal I/O error occurs during next method
+ */
+ Tuple next() throws IOException;
+
+ /**
+ * Reset the cursor. After executed, the scanner
+ * will retrieve the first tuple.
+ *
+ * @throws IOException if internal I/O error occurs during reset method
+ */
+ void reset() throws IOException;
+
+ /**
+ * Close scanner
+ *
+ * @throws IOException if internal I/O error occurs during close method
+ */
+ void close() throws IOException;
+
+
+ /**
+ * It returns if the projection is executed in the underlying scanner layer.
+ *
+ * @return true if this scanner can project the given columns.
+ */
+ boolean isProjectable();
+
+ /**
+ * Set target columns
+ * @param targets columns to be projected
+ */
+ void setTarget(Column [] targets);
+
+ /**
+ * It returns if the selection is executed in the underlying scanner layer.
+ *
+ * @return true if this scanner can filter tuples against a given condition.
+ */
+ boolean isSelectable();
+
+ /**
+ * Set a search condition
+ * @param expr to be searched
+ *
+ * TODO - to be changed Object type
+ */
+ void setSearchCondition(Object expr);
+
+ /**
+ * It returns if the file is splittable.
+ *
+ * @return true if this scanner can split the a file.
+ */
+ boolean isSplittable();
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-storage/src/main/java/org/apache/tajo/storage/SeekableScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/SeekableScanner.java b/tajo-storage/src/main/java/org/apache/tajo/storage/SeekableScanner.java
new file mode 100644
index 0000000..894e7ee
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/SeekableScanner.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import java.io.IOException;
+
+public interface SeekableScanner extends Scanner {
+
+ public abstract long getNextOffset() throws IOException;
+
+ public abstract void seek(long offset) throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-storage/src/main/java/org/apache/tajo/storage/SerializerDeserializer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/SerializerDeserializer.java b/tajo-storage/src/main/java/org/apache/tajo/storage/SerializerDeserializer.java
new file mode 100644
index 0000000..333f205
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/SerializerDeserializer.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.datum.Datum;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+
+public interface SerializerDeserializer {
+
+ public int serialize(Column col, Datum datum, OutputStream out, byte[] nullCharacters) throws IOException;
+
+ public Datum deserialize(Column col, byte[] bytes, int offset, int length, byte[] nullCharacters) throws IOException;
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-storage/src/main/java/org/apache/tajo/storage/SplitLineReader.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/SplitLineReader.java b/tajo-storage/src/main/java/org/apache/tajo/storage/SplitLineReader.java
new file mode 100644
index 0000000..3579674
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/SplitLineReader.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+public class SplitLineReader extends LineReader {
+ public SplitLineReader(InputStream in, byte[] recordDelimiterBytes) {
+ super(in, recordDelimiterBytes);
+ }
+
+ public SplitLineReader(InputStream in, Configuration conf,
+ byte[] recordDelimiterBytes) throws IOException {
+ super(in, conf, recordDelimiterBytes);
+ }
+
+ public boolean needAdditionalRecordAfterSplit() {
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-storage/src/main/java/org/apache/tajo/storage/Storage.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/Storage.java b/tajo-storage/src/main/java/org/apache/tajo/storage/Storage.java
new file mode 100644
index 0000000..cc85c1d
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/Storage.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.storage.fragment.FileFragment;
+
+import java.io.IOException;
+
+public abstract class Storage {
+ protected final Configuration conf;
+
+ public Storage(final Configuration conf) {
+ this.conf = conf;
+ }
+
+ public Configuration getConf() {
+ return this.conf;
+ }
+
+ public abstract Appender getAppender(TableMeta meta, Path path)
+ throws IOException;
+
+ public abstract Scanner openScanner(Schema schema, FileFragment[] tablets)
+ throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-storage/src/main/java/org/apache/tajo/storage/StorageManager.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/StorageManager.java b/tajo-storage/src/main/java/org/apache/tajo/storage/StorageManager.java
new file mode 100644
index 0000000..1b852d4
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/StorageManager.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.storage.fragment.Fragment;
+
+import java.io.IOException;
+
+/**
+ * StorageManager
+ */
+public class StorageManager extends AbstractStorageManager {
+
+ protected StorageManager(TajoConf conf) throws IOException {
+ super(conf);
+ }
+
+ @Override
+ public Class<? extends Scanner> getScannerClass(CatalogProtos.StoreType storeType) throws IOException {
+ String handlerName = storeType.name().toLowerCase();
+ Class<? extends Scanner> scannerClass = SCANNER_HANDLER_CACHE.get(handlerName);
+ if (scannerClass == null) {
+ scannerClass = conf.getClass(
+ String.format("tajo.storage.scanner-handler.%s.class",storeType.name().toLowerCase()), null, Scanner.class);
+ SCANNER_HANDLER_CACHE.put(handlerName, scannerClass);
+ }
+
+ if (scannerClass == null) {
+ throw new IOException("Unknown Storage Type: " + storeType.name());
+ }
+
+ return scannerClass;
+ }
+
+ @Override
+ public Scanner getScanner(TableMeta meta, Schema schema, Fragment fragment, Schema target) throws IOException {
+ Scanner scanner;
+
+ Class<? extends Scanner> scannerClass = getScannerClass(meta.getStoreType());
+ scanner = newScannerInstance(scannerClass, conf, schema, meta, fragment);
+ if (scanner.isProjectable()) {
+ scanner.setTarget(target.toArray());
+ }
+
+ return scanner;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-storage/src/main/java/org/apache/tajo/storage/StorageManagerFactory.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/StorageManagerFactory.java b/tajo-storage/src/main/java/org/apache/tajo/storage/StorageManagerFactory.java
new file mode 100644
index 0000000..85bb861
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/StorageManagerFactory.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import com.google.common.collect.Maps;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.v2.StorageManagerV2;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Map;
+
+import static org.apache.tajo.conf.TajoConf.ConfVars;
+
+public class StorageManagerFactory {
+ private static final Map<String, AbstractStorageManager> storageManagers = Maps.newHashMap();
+
+ public static AbstractStorageManager getStorageManager(TajoConf conf) throws IOException {
+ return getStorageManager(conf, null);
+ }
+
+ public static synchronized AbstractStorageManager getStorageManager (
+ TajoConf conf, Path warehouseDir) throws IOException {
+ return getStorageManager(conf, warehouseDir, conf.getBoolVar(ConfVars.STORAGE_MANAGER_VERSION_2));
+ }
+
+ private static synchronized AbstractStorageManager getStorageManager (
+ TajoConf conf, Path warehouseDir, boolean v2) throws IOException {
+
+ URI uri;
+ TajoConf localConf = new TajoConf(conf);
+ if (warehouseDir != null) {
+ localConf.setVar(ConfVars.WAREHOUSE_DIR, warehouseDir.toUri().toString());
+ }
+
+ uri = TajoConf.getWarehouseDir(localConf).toUri();
+
+ String key = "file".equals(uri.getScheme()) ? "file" : uri.toString();
+
+ if(v2) {
+ key += "_v2";
+ }
+
+ if(storageManagers.containsKey(key)) {
+ AbstractStorageManager sm = storageManagers.get(key);
+ return sm;
+ } else {
+ AbstractStorageManager storageManager;
+
+ if(v2) {
+ storageManager = new StorageManagerV2(localConf);
+ } else {
+ storageManager = new StorageManager(localConf);
+ }
+
+ storageManagers.put(key, storageManager);
+
+ return storageManager;
+ }
+ }
+
+ public static synchronized SeekableScanner getSeekableScanner(
+ TajoConf conf, TableMeta meta, Schema schema, FileFragment fragment, Schema target) throws IOException {
+ return (SeekableScanner)getStorageManager(conf, null, false).getScanner(meta, schema, fragment, target);
+ }
+
+ public static synchronized SeekableScanner getSeekableScanner(
+ TajoConf conf, TableMeta meta, Schema schema, Path path) throws IOException {
+
+ FileSystem fs = path.getFileSystem(conf);
+ FileStatus status = fs.getFileStatus(path);
+ FileFragment fragment = new FileFragment(path.getName(), path, 0, status.getLen());
+
+ return getSeekableScanner(conf, meta, schema, fragment, schema);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-storage/src/main/java/org/apache/tajo/storage/StorageUtil.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/StorageUtil.java b/tajo-storage/src/main/java/org/apache/tajo/storage/StorageUtil.java
new file mode 100644
index 0000000..9627a5d
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/StorageUtil.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.util.FileUtil;
+
+import java.io.IOException;
+
+public class StorageUtil {
+ public static int getRowByteSize(Schema schema) {
+ int sum = 0;
+ for(Column col : schema.getColumns()) {
+ sum += StorageUtil.getColByteSize(col);
+ }
+
+ return sum;
+ }
+
+ public static int getColByteSize(Column col) {
+ switch(col.getDataType().getType()) {
+ case BOOLEAN: return 1;
+ case CHAR: return 1;
+ case BIT: return 1;
+ case INT2: return 2;
+ case INT4: return 4;
+ case INT8: return 8;
+ case FLOAT4: return 4;
+ case FLOAT8: return 8;
+ case INET4: return 4;
+ case INET6: return 32;
+ case TEXT: return 256;
+ case BLOB: return 256;
+ default: return 0;
+ }
+ }
+
+ public static void writeTableMeta(Configuration conf, Path tableroot, TableMeta meta) throws IOException {
+ FileSystem fs = tableroot.getFileSystem(conf);
+ FSDataOutputStream out = fs.create(new Path(tableroot, ".meta"));
+ FileUtil.writeProto(out, meta.getProto());
+ out.flush();
+ out.close();
+ }
+
+ public static Path concatPath(String parent, String...childs) {
+ return concatPath(new Path(parent), childs);
+ }
+
+ public static Path concatPath(Path parent, String...childs) {
+ StringBuilder sb = new StringBuilder();
+
+ for(int i=0; i < childs.length; i++) {
+ sb.append(childs[i]);
+ if(i < childs.length - 1)
+ sb.append("/");
+ }
+
+ return new Path(parent, sb.toString());
+ }
+}