You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/12/12 06:59:59 UTC
[25/32] tajo git commit: TAJO-1233: Merge hbase_storage branch to the
master branch. (Hyoungjun Kim via hyunsik)
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/LineReader.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/LineReader.java b/tajo-storage/src/main/java/org/apache/tajo/storage/LineReader.java
deleted file mode 100644
index 66c610a..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/LineReader.java
+++ /dev/null
@@ -1,559 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Text;
-import org.apache.tajo.storage.rcfile.NonSyncByteArrayOutputStream;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-
-/**
- * A class that provides a line reader from an input stream.
- * Depending on the constructor used, lines will either be terminated by:
- * <ul>
- * <li>one of the following: '\n' (LF) , '\r' (CR),
- * or '\r\n' (CR+LF).</li>
- * <li><em>or</em>, a custom byte sequence delimiter</li>
- * </ul>
- * In both cases, EOF also terminates an otherwise unterminated
- * line.
- */
-
-public class LineReader implements Closeable {
- private static final int DEFAULT_BUFFER_SIZE = 64 * 1024;
- private int bufferSize = DEFAULT_BUFFER_SIZE;
- private InputStream in;
- private byte[] buffer;
- // the number of bytes of real data in the buffer
- private int bufferLength = 0;
- // the current position in the buffer
- private int bufferPosn = 0;
-
- private static final byte CR = '\r';
- private static final byte LF = '\n';
-
- // The line delimiter
- private final byte[] recordDelimiterBytes;
-
- /**
- * Create a line reader that reads from the given stream using the
- * default buffer-size (64k).
- *
- * @param in The input stream
- * @throws IOException
- */
- public LineReader(InputStream in) {
- this(in, DEFAULT_BUFFER_SIZE);
- }
-
- /**
- * Create a line reader that reads from the given stream using the
- * given buffer-size.
- *
- * @param in The input stream
- * @param bufferSize Size of the read buffer
- * @throws IOException
- */
- public LineReader(InputStream in, int bufferSize) {
- this.in = in;
- this.bufferSize = bufferSize;
- this.buffer = new byte[this.bufferSize];
- this.recordDelimiterBytes = null;
- }
-
- /**
- * Create a line reader that reads from the given stream using the
- * <code>io.file.buffer.size</code> specified in the given
- * <code>Configuration</code>.
- *
- * @param in input stream
- * @param conf configuration
- * @throws IOException
- */
- public LineReader(InputStream in, Configuration conf) throws IOException {
- this(in, conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE));
- }
-
- /**
- * Create a line reader that reads from the given stream using the
- * default buffer-size, and using a custom delimiter of array of
- * bytes.
- *
- * @param in The input stream
- * @param recordDelimiterBytes The delimiter
- */
- public LineReader(InputStream in, byte[] recordDelimiterBytes) {
- this.in = in;
- this.bufferSize = DEFAULT_BUFFER_SIZE;
- this.buffer = new byte[this.bufferSize];
- this.recordDelimiterBytes = recordDelimiterBytes;
- }
-
- /**
- * Create a line reader that reads from the given stream using the
- * given buffer-size, and using a custom delimiter of array of
- * bytes.
- *
- * @param in The input stream
- * @param bufferSize Size of the read buffer
- * @param recordDelimiterBytes The delimiter
- * @throws IOException
- */
- public LineReader(InputStream in, int bufferSize,
- byte[] recordDelimiterBytes) {
- this.in = in;
- this.bufferSize = bufferSize;
- this.buffer = new byte[this.bufferSize];
- this.recordDelimiterBytes = recordDelimiterBytes;
- }
-
- /**
- * Create a line reader that reads from the given stream using the
- * <code>io.file.buffer.size</code> specified in the given
- * <code>Configuration</code>, and using a custom delimiter of array of
- * bytes.
- *
- * @param in input stream
- * @param conf configuration
- * @param recordDelimiterBytes The delimiter
- * @throws IOException
- */
- public LineReader(InputStream in, Configuration conf,
- byte[] recordDelimiterBytes) throws IOException {
- this.in = in;
- this.bufferSize = conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE);
- this.buffer = new byte[this.bufferSize];
- this.recordDelimiterBytes = recordDelimiterBytes;
- }
-
-
- /**
- * Close the underlying stream.
- *
- * @throws IOException
- */
- public void close() throws IOException {
- in.close();
- }
-
- public void reset() {
- bufferLength = 0;
- bufferPosn = 0;
-
- }
-
- /**
- * Read one line from the InputStream into the given Text.
- *
- * @param str the object to store the given line (without newline)
- * @param maxLineLength the maximum number of bytes to store into str;
- * the rest of the line is silently discarded.
- * @param maxBytesToConsume the maximum number of bytes to consume
- * in this call. This is only a hint, because if the line cross
- * this threshold, we allow it to happen. It can overshoot
- * potentially by as much as one buffer length.
- * @return the number of bytes read including the (longest) newline
- * found.
- * @throws IOException if the underlying stream throws
- */
- public int readLine(Text str, int maxLineLength,
- int maxBytesToConsume) throws IOException {
- if (this.recordDelimiterBytes != null) {
- return readCustomLine(str, maxLineLength, maxBytesToConsume);
- } else {
- return readDefaultLine(str, maxLineLength, maxBytesToConsume);
- }
- }
-
- protected int fillBuffer(InputStream in, byte[] buffer, boolean inDelimiter)
- throws IOException {
- return in.read(buffer);
- }
- /**
- * Read a line terminated by one of CR, LF, or CRLF.
- */
- private int readDefaultLine(Text str, int maxLineLength, int maxBytesToConsume)
- throws IOException {
- /* We're reading data from in, but the head of the stream may be
- * already buffered in buffer, so we have several cases:
- * 1. No newline characters are in the buffer, so we need to copy
- * everything and read another buffer from the stream.
- * 2. An unambiguously terminated line is in buffer, so we just
- * copy to str.
- * 3. Ambiguously terminated line is in buffer, i.e. buffer ends
- * in CR. In this case we copy everything up to CR to str, but
- * we also need to see what follows CR: if it's LF, then we
- * need consume LF as well, so next call to readLine will read
- * from after that.
- * We use a flag prevCharCR to signal if previous character was CR
- * and, if it happens to be at the end of the buffer, delay
- * consuming it until we have a chance to look at the char that
- * follows.
- */
- str.clear();
- int txtLength = 0; //tracks str.getLength(), as an optimization
- int newlineLength = 0; //length of terminating newline
- boolean prevCharCR = false; //true of prev char was CR
- long bytesConsumed = 0;
- do {
- int startPosn = bufferPosn; //starting from where we left off the last time
- if (bufferPosn >= bufferLength) {
- startPosn = bufferPosn = 0;
- if (prevCharCR) {
- ++bytesConsumed; //account for CR from previous read
- }
- bufferLength = fillBuffer(in, buffer, prevCharCR);
- if (bufferLength <= 0) {
- break; // EOF
- }
- }
- for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline
- if (buffer[bufferPosn] == LF) {
- newlineLength = (prevCharCR) ? 2 : 1;
- ++bufferPosn; // at next invocation proceed from following byte
- break;
- }
- if (prevCharCR) { //CR + notLF, we are at notLF
- newlineLength = 1;
- break;
- }
- prevCharCR = (buffer[bufferPosn] == CR);
- }
- int readLength = bufferPosn - startPosn;
- if (prevCharCR && newlineLength == 0) {
- --readLength; //CR at the end of the buffer
- }
- bytesConsumed += readLength;
- int appendLength = readLength - newlineLength;
- if (appendLength > maxLineLength - txtLength) {
- appendLength = maxLineLength - txtLength;
- }
- if (appendLength > 0) {
- str.append(buffer, startPosn, appendLength);
- txtLength += appendLength;
- }
- } while (newlineLength == 0 && bytesConsumed < maxBytesToConsume);
-
- if (bytesConsumed > (long) Integer.MAX_VALUE) {
- throw new IOException("Too many bytes before newline: " + bytesConsumed);
- }
- return (int) bytesConsumed;
- }
-
- /**
- * Read a line terminated by one of CR, LF, or CRLF.
- */
- public int readDefaultLine(NonSyncByteArrayOutputStream str, ArrayList<Integer> offsets, int maxLineLength
- , int maxBytesToConsume)
- throws IOException {
- /* We're reading data from in, but the head of the stream may be
- * already buffered in buffer, so we have several cases:
- * 1. No newline characters are in the buffer, so we need to copy
- * everything and read another buffer from the stream.
- * 2. An unambiguously terminated line is in buffer, so we just
- * copy to str.
- * 3. Ambiguously terminated line is in buffer, i.e. buffer ends
- * in CR. In this case we copy everything up to CR to str, but
- * we also need to see what follows CR: if it's LF, then we
- * need consume LF as well, so next call to readLine will read
- * from after that.
- * We use a flag prevCharCR to signal if previous character was CR
- * and, if it happens to be at the end of the buffer, delay
- * consuming it until we have a chance to look at the char that
- * follows.
- */
-
- int txtLength = 0; //tracks str.getLength(), as an optimization
- int newlineLength = 0; //length of terminating newline
- boolean prevCharCR = false; //true of prev char was CR
- long bytesConsumed = 0;
- do {
- int startPosn = bufferPosn; //starting from where we left off the last time
- if (bufferPosn >= bufferLength) {
- startPosn = bufferPosn = 0;
- if (prevCharCR) {
- ++bytesConsumed; //account for CR from previous read
- }
- bufferLength = fillBuffer(in, buffer, prevCharCR);
- if (bufferLength <= 0) {
- break; // EOF
- }
- }
- for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline
- if (buffer[bufferPosn] == LF) {
- newlineLength = (prevCharCR) ? 2 : 1;
- ++bufferPosn; // at next invocation proceed from following byte
- break;
- }
- if (prevCharCR) { //CR + notLF, we are at notLF
- newlineLength = 1;
- break;
- }
- prevCharCR = (buffer[bufferPosn] == CR);
- }
- int readLength = bufferPosn - startPosn;
- if (prevCharCR && newlineLength == 0) {
- --readLength; //CR at the end of the buffer
- }
- bytesConsumed += readLength;
- int appendLength = readLength - newlineLength;
- if (appendLength > maxLineLength - txtLength) {
- appendLength = maxLineLength - txtLength;
- }
- if (appendLength > 0) {
- str.write(buffer, startPosn, appendLength);
- txtLength += appendLength;
- }
- } while (newlineLength == 0 && bytesConsumed < maxBytesToConsume);
-
- if (bytesConsumed > (long) Integer.MAX_VALUE) {
- throw new IOException("Too many bytes before newline: " + bytesConsumed);
- }
-
- if (bytesConsumed > 0) offsets.add(txtLength);
- return (int) bytesConsumed;
- }
-
- /**
- * Read a line terminated by one of CR, LF, or CRLF.
- */
-
-/* int validIdx = 0;
- public int readDefaultLines(NonSyncByteArrayOutputStream str, ArrayList<Integer> offsets, ArrayList<Long> foffsets,
- long pos, int maxLineLength, int maxBytesToConsume)
- throws IOException {
- *//* We're reading data from in, but the head of the stream may be
- * already buffered in buffer, so we have several cases:
- * 1. No newline characters are in the buffer, so we need to copy
- * everything and read another buffer from the stream.
- * 2. An unambiguously terminated line is in buffer, so we just
- * copy to str.
- * 3. Ambiguously terminated line is in buffer, i.e. buffer ends
- * in CR. In this case we copy everything up to CR to str, but
- * we also need to see what follows CR: if it's LF, then we
- * need consume LF as well, so next call to readLine will read
- * from after that.
- * We use a flag prevCharCR to signal if previous character was CR
- * and, if it happens to be at the end of the buffer, delay
- * consuming it until we have a chance to look at the char that
- * follows.
- *//*
- //str.clear();
- str.reset();
- offsets.clear();
- foffsets.clear();
-
- validIdx = 0;
- long bufferBytesConsumed = 0;
-
- int txtLength = 0; //tracks str.getLength(), as an optimization
- int newlineLength = 0; //length of terminating newline
- boolean prevCharCR = false; //true of prev char was CR
- long bytesConsumed = 0;
- do {
-
- int startPosn = bufferPosn; //starting from where we left off the last time
- if (bufferPosn >= bufferLength) {
- startPosn = bufferPosn = 0;
- if (prevCharCR) {
- ++bytesConsumed; //account for CR from previous read
- }
- bufferLength = in.read(buffer);
- if (bufferLength <= 0) {
- break; // EOF
- }
- }
- for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline
- if (buffer[bufferPosn] == LF) {
- newlineLength = (prevCharCR) ? 2 : 1;
- ++bufferPosn; // at next invocation proceed from following byte
- break;
- }
- if (prevCharCR) { //CR + notLF, we are at notLF
- newlineLength = 1;
- break;
- }
- prevCharCR = (buffer[bufferPosn] == CR);
- }
- int readLength = bufferPosn - startPosn;
- if (prevCharCR && newlineLength == 0) {
- --readLength; //CR at the end of the buffer
- }
- bytesConsumed += readLength;
- int appendLength = readLength - newlineLength;
- if (appendLength > maxLineLength - txtLength) {
- appendLength = maxLineLength - txtLength;
- }
-
- if (appendLength > 0) {
- str.write(buffer, startPosn, appendLength);
- //System.out.println(startPosn + "," + appendLength);
- //str.append(buffer, startPosn, appendLength);
- txtLength += appendLength;
- }
-
- if(newlineLength > 0){
- validIdx++;
-
- if (bytesConsumed > (long)Integer.MAX_VALUE) {
- throw new IOException("Too many bytes before newline: " + bytesConsumed);
- }
- offsets.add(txtLength);
- foffsets.add(pos);
- pos+= bytesConsumed;
- bufferBytesConsumed += bytesConsumed;
-
- txtLength = 0;
- newlineLength = 0;
- prevCharCR = false; //true of prev char was CR
- bytesConsumed = 0;
- } else {
- bufferBytesConsumed += bytesConsumed;
- bytesConsumed = 0;
- }
- } while ((bufferBytesConsumed < 256 * 1024));
-
- return (int)bufferBytesConsumed;
- }*/
-
- /**
- * Read a line terminated by a custom delimiter.
- */
- private int readCustomLine(Text str, int maxLineLength, int maxBytesToConsume)
- throws IOException {
- /* We're reading data from inputStream, but the head of the stream may be
- * already captured in the previous buffer, so we have several cases:
- *
- * 1. The buffer tail does not contain any character sequence which
- * matches with the head of delimiter. We count it as a
- * ambiguous byte count = 0
- *
- * 2. The buffer tail contains a X number of characters,
- * that forms a sequence, which matches with the
- * head of delimiter. We count ambiguous byte count = X
- *
- * // *** eg: A segment of input file is as follows
- *
- * " record 1792: I found this bug very interesting and
- * I have completely read about it. record 1793: This bug
- * can be solved easily record 1794: This ."
- *
- * delimiter = "record";
- *
- * supposing:- String at the end of buffer =
- * "I found this bug very interesting and I have completely re"
- * There for next buffer = "ad about it. record 179 ...."
- *
- * The matching characters in the input
- * buffer tail and delimiter head = "re"
- * Therefore, ambiguous byte count = 2 **** //
- *
- * 2.1 If the following bytes are the remaining characters of
- * the delimiter, then we have to capture only up to the starting
- * position of delimiter. That means, we need not include the
- * ambiguous characters in str.
- *
- * 2.2 If the following bytes are not the remaining characters of
- * the delimiter ( as mentioned in the example ),
- * then we have to include the ambiguous characters in str.
- */
- str.clear();
- int txtLength = 0; // tracks str.getLength(), as an optimization
- long bytesConsumed = 0;
- int delPosn = 0;
- int ambiguousByteCount = 0; // To capture the ambiguous characters count
- do {
- int startPosn = bufferPosn; // Start from previous end position
- if (bufferPosn >= bufferLength) {
- startPosn = bufferPosn = 0;
- bufferLength = fillBuffer(in, buffer, ambiguousByteCount > 0);
- if (bufferLength <= 0) {
- str.append(recordDelimiterBytes, 0, ambiguousByteCount);
- break; // EOF
- }
- }
- for (; bufferPosn < bufferLength; ++bufferPosn) {
- if (buffer[bufferPosn] == recordDelimiterBytes[delPosn]) {
- delPosn++;
- if (delPosn >= recordDelimiterBytes.length) {
- bufferPosn++;
- break;
- }
- } else if (delPosn != 0) {
- bufferPosn--;
- delPosn = 0;
- }
- }
- int readLength = bufferPosn - startPosn;
- bytesConsumed += readLength;
- int appendLength = readLength - delPosn;
- if (appendLength > maxLineLength - txtLength) {
- appendLength = maxLineLength - txtLength;
- }
- if (appendLength > 0) {
- if (ambiguousByteCount > 0) {
- str.append(recordDelimiterBytes, 0, ambiguousByteCount);
- //appending the ambiguous characters (refer case 2.2)
- bytesConsumed += ambiguousByteCount;
- ambiguousByteCount = 0;
- }
- str.append(buffer, startPosn, appendLength);
- txtLength += appendLength;
- }
- if (bufferPosn >= bufferLength) {
- if (delPosn > 0 && delPosn < recordDelimiterBytes.length) {
- ambiguousByteCount = delPosn;
- bytesConsumed -= ambiguousByteCount; //to be consumed in next
- }
- }
- } while (delPosn < recordDelimiterBytes.length
- && bytesConsumed < maxBytesToConsume);
- if (bytesConsumed > (long) Integer.MAX_VALUE) {
- throw new IOException("Too many bytes before delimiter: " + bytesConsumed);
- }
- return (int) bytesConsumed;
- }
-
- /**
- * Read from the InputStream into the given Text.
- *
- * @param str the object to store the given line
- * @param maxLineLength the maximum number of bytes to store into str.
- * @return the number of bytes read including the newline
- * @throws IOException if the underlying stream throws
- */
- public int readLine(Text str, int maxLineLength) throws IOException {
- return readLine(str, maxLineLength, Integer.MAX_VALUE);
- }
-
- /**
- * Read from the InputStream into the given Text.
- *
- * @param str the object to store the given line
- * @return the number of bytes read including the newline
- * @throws IOException if the underlying stream throws
- */
- public int readLine(Text str) throws IOException {
- return readLine(str, Integer.MAX_VALUE, Integer.MAX_VALUE);
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/MemoryUtil.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/MemoryUtil.java b/tajo-storage/src/main/java/org/apache/tajo/storage/MemoryUtil.java
deleted file mode 100644
index f19b61f..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/MemoryUtil.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import org.apache.tajo.datum.*;
-import org.apache.tajo.util.ClassSize;
-
-public class MemoryUtil {
-
- /** Overhead for an NullDatum */
- public static final long NULL_DATUM;
-
- /** Overhead for an BoolDatum */
- public static final long BOOL_DATUM;
-
- /** Overhead for an CharDatum */
- public static final long CHAR_DATUM;
-
- /** Overhead for an BitDatum */
- public static final long BIT_DATUM;
-
- /** Overhead for an Int2Datum */
- public static final long INT2_DATUM;
-
- /** Overhead for an Int4Datum */
- public static final long INT4_DATUM;
-
- /** Overhead for an Int8Datum */
- public static final long INT8_DATUM;
-
- /** Overhead for an Float4Datum */
- public static final long FLOAT4_DATUM;
-
- /** Overhead for an Float8Datum */
- public static final long FLOAT8_DATUM;
-
- /** Overhead for an TextDatum */
- public static final long TEXT_DATUM;
-
- /** Overhead for an BlobDatum */
- public static final long BLOB_DATUM;
-
- /** Overhead for an DateDatum */
- public static final long DATE_DATUM;
-
- /** Overhead for an TimeDatum */
- public static final long TIME_DATUM;
-
- /** Overhead for an TimestampDatum */
- public static final long TIMESTAMP_DATUM;
-
- static {
- NULL_DATUM = ClassSize.estimateBase(NullDatum.class, false);
-
- CHAR_DATUM = ClassSize.estimateBase(CharDatum.class, false);
-
- BOOL_DATUM = ClassSize.estimateBase(BooleanDatum.class, false);
-
- BIT_DATUM = ClassSize.estimateBase(BitDatum.class, false);
-
- INT2_DATUM = ClassSize.estimateBase(Int2Datum.class, false);
-
- INT4_DATUM = ClassSize.estimateBase(Int4Datum.class, false);
-
- INT8_DATUM = ClassSize.estimateBase(Int8Datum.class, false);
-
- FLOAT4_DATUM = ClassSize.estimateBase(Float4Datum.class, false);
-
- FLOAT8_DATUM = ClassSize.estimateBase(Float8Datum.class, false);
-
- TEXT_DATUM = ClassSize.estimateBase(TextDatum.class, false);
-
- BLOB_DATUM = ClassSize.estimateBase(BlobDatum.class, false);
-
- DATE_DATUM = ClassSize.estimateBase(DateDatum.class, false);
-
- TIME_DATUM = ClassSize.estimateBase(TimeDatum.class, false);
-
- TIMESTAMP_DATUM = ClassSize.estimateBase(TimestampDatum.class, false);
- }
-
- public static long calculateMemorySize(Tuple tuple) {
- long total = ClassSize.OBJECT;
- for (Datum datum : tuple.getValues()) {
- switch (datum.type()) {
-
- case NULL_TYPE:
- total += NULL_DATUM;
- break;
-
- case BOOLEAN:
- total += BOOL_DATUM;
- break;
-
- case BIT:
- total += BIT_DATUM;
- break;
-
- case CHAR:
- total += CHAR_DATUM + datum.size();
- break;
-
- case INT1:
- case INT2:
- total += INT2_DATUM;
- break;
-
- case INT4:
- total += INT4_DATUM;
- break;
-
- case INT8:
- total += INT8_DATUM;
- break;
-
- case FLOAT4:
- total += FLOAT4_DATUM;
- break;
-
- case FLOAT8:
- total += FLOAT4_DATUM;
- break;
-
- case TEXT:
- total += TEXT_DATUM + datum.size();
- break;
-
- case DATE:
- total += DATE_DATUM;
- break;
-
- case TIME:
- total += TIME_DATUM;
- break;
-
- case TIMESTAMP:
- total += TIMESTAMP_DATUM;
- break;
-
- default:
- break;
- }
- }
-
- return total;
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java b/tajo-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java
deleted file mode 100644
index 4122c76..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java
+++ /dev/null
@@ -1,198 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import com.google.common.collect.ImmutableList;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.statistics.ColumnStats;
-import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.storage.fragment.FileFragment;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-public class MergeScanner implements Scanner {
- private Configuration conf;
- private TableMeta meta;
- private Schema schema;
- private List<FileFragment> fragments;
- private Iterator<FileFragment> iterator;
- private FileFragment currentFragment;
- private Scanner currentScanner;
- private Tuple tuple;
- private boolean projectable = false;
- private boolean selectable = false;
- private Schema target;
- private float progress;
- protected TableStats tableStats;
-
- public MergeScanner(Configuration conf, Schema schema, TableMeta meta, List<FileFragment> rawFragmentList)
- throws IOException {
- this(conf, schema, meta, rawFragmentList, schema);
- }
-
- public MergeScanner(Configuration conf, Schema schema, TableMeta meta, List<FileFragment> rawFragmentList,
- Schema target)
- throws IOException {
- this.conf = conf;
- this.schema = schema;
- this.meta = meta;
- this.target = target;
-
- this.fragments = new ArrayList<FileFragment>();
-
- long numBytes = 0;
- for (FileFragment eachFileFragment: rawFragmentList) {
- numBytes += eachFileFragment.getEndKey();
- if (eachFileFragment.getEndKey() > 0) {
- fragments.add(eachFileFragment);
- }
- }
-
- // it should keep the input order. Otherwise, it causes wrong result of sort queries.
- this.reset();
-
- if (currentScanner != null) {
- this.projectable = currentScanner.isProjectable();
- this.selectable = currentScanner.isSelectable();
- }
-
- tableStats = new TableStats();
-
- tableStats.setNumBytes(numBytes);
- tableStats.setNumBlocks(fragments.size());
-
- for(Column eachColumn: schema.getColumns()) {
- ColumnStats columnStats = new ColumnStats(eachColumn);
- tableStats.addColumnStat(columnStats);
- }
- }
-
- @Override
- public void init() throws IOException {
- progress = 0.0f;
- }
-
- @Override
- public Tuple next() throws IOException {
- if (currentScanner != null)
- tuple = currentScanner.next();
-
- if (tuple != null) {
- return tuple;
- } else {
- if (currentScanner != null) {
- currentScanner.close();
- TableStats scannerTableStsts = currentScanner.getInputStats();
- if (scannerTableStsts != null) {
- tableStats.setReadBytes(tableStats.getReadBytes() + scannerTableStsts.getReadBytes());
- tableStats.setNumRows(tableStats.getNumRows() + scannerTableStsts.getNumRows());
- }
- }
- currentScanner = getNextScanner();
- if (currentScanner != null) {
- tuple = currentScanner.next();
- }
- }
- return tuple;
- }
-
- @Override
- public void reset() throws IOException {
- this.iterator = fragments.iterator();
- this.currentScanner = getNextScanner();
- }
-
- private Scanner getNextScanner() throws IOException {
- if (iterator.hasNext()) {
- currentFragment = iterator.next();
- currentScanner = StorageManager.getStorageManager((TajoConf)conf).getScanner(meta, schema,
- currentFragment, target);
- currentScanner.init();
- return currentScanner;
- } else {
- return null;
- }
- }
-
- @Override
- public void close() throws IOException {
- if(currentScanner != null) {
- currentScanner.close();
- currentScanner = null;
- }
- iterator = null;
- progress = 1.0f;
- }
-
- @Override
- public boolean isProjectable() {
- return projectable;
- }
-
- @Override
- public void setTarget(Column[] targets) {
- this.target = new Schema(targets);
- }
-
- @Override
- public boolean isSelectable() {
- return selectable;
- }
-
- @Override
- public void setSearchCondition(Object expr) {
- }
-
- @Override
- public Schema getSchema() {
- return schema;
- }
-
- @Override
- public boolean isSplittable(){
- return false;
- }
-
- @Override
- public float getProgress() {
- if (currentScanner != null && iterator != null && tableStats.getNumBytes() > 0) {
- TableStats scannerTableStsts = currentScanner.getInputStats();
- long currentScannerReadBytes = 0;
- if (scannerTableStsts != null) {
- currentScannerReadBytes = scannerTableStsts.getReadBytes();
- }
-
- return (float)(tableStats.getReadBytes() + currentScannerReadBytes) / (float)tableStats.getNumBytes();
- } else {
- return progress;
- }
- }
-
- @Override
- public TableStats getInputStats() {
- return tableStats;
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/NullScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/NullScanner.java b/tajo-storage/src/main/java/org/apache/tajo/storage/NullScanner.java
deleted file mode 100644
index 4cec67d..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/NullScanner.java
+++ /dev/null
@@ -1,62 +0,0 @@
-package org.apache.tajo.storage; /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.storage.fragment.FileFragment;
-
-import java.io.IOException;
-
-public class NullScanner extends FileScanner {
- public NullScanner(Configuration conf, Schema schema, TableMeta meta, FileFragment fragment) {
- super(conf, schema, meta, fragment);
- }
-
- @Override
- public Tuple next() throws IOException {
- progress = 1.0f;
-
- return null;
- }
-
- @Override
- public void reset() throws IOException {
- progress = 0.0f;
- }
-
- @Override
- public void close() throws IOException {
- progress = 0.0f;
- }
-
- @Override
- public boolean isProjectable() {
- return false;
- }
-
- @Override
- public boolean isSelectable() {
- return true;
- }
-
- @Override
- public boolean isSplittable() {
- return true;
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/NumericPathComparator.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/NumericPathComparator.java b/tajo-storage/src/main/java/org/apache/tajo/storage/NumericPathComparator.java
deleted file mode 100644
index 94d13ee..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/NumericPathComparator.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import org.apache.hadoop.fs.Path;
-
-import java.util.Comparator;
-
-public class NumericPathComparator implements Comparator<Path> {
-
- @Override
- public int compare(Path p1, Path p2) {
- int num1 = Integer.parseInt(p1.getName());
- int num2 = Integer.parseInt(p2.getName());
-
- return num1 - num2;
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java b/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java
deleted file mode 100644
index 2fae243..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java
+++ /dev/null
@@ -1,772 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import com.google.protobuf.Message;
-import io.netty.buffer.ByteBuf;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.common.TajoDataTypes.DataType;
-import org.apache.tajo.datum.DatumFactory;
-import org.apache.tajo.datum.NullDatum;
-import org.apache.tajo.datum.ProtobufDatumFactory;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.unit.StorageUnit;
-import org.apache.tajo.util.BitArray;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-
-public class RawFile {
- private static final Log LOG = LogFactory.getLog(RawFile.class);
-
- public static class RawFileScanner extends FileScanner implements SeekableScanner {
- private FileChannel channel;
- private DataType[] columnTypes;
-
- private ByteBuffer buffer;
- private ByteBuf buf;
- private Tuple tuple;
-
- private int headerSize = 0; // Header size of a tuple
- private BitArray nullFlags;
- private static final int RECORD_SIZE = 4;
- private boolean eos = false;
- private long startOffset;
- private long endOffset;
- private FileInputStream fis;
- private long recordCount;
- private long totalReadBytes;
- private long filePosition;
- private boolean forceFillBuffer;
-
- public RawFileScanner(Configuration conf, Schema schema, TableMeta meta, FileFragment fragment) throws IOException {
- super(conf, schema, meta, fragment);
- }
-
- public void init() throws IOException {
- File file;
- try {
- if (fragment.getPath().toUri().getScheme() != null) {
- file = new File(fragment.getPath().toUri());
- } else {
- file = new File(fragment.getPath().toString());
- }
- } catch (IllegalArgumentException iae) {
- throw new IOException(iae);
- }
-
- fis = new FileInputStream(file);
- channel = fis.getChannel();
- filePosition = startOffset = fragment.getStartKey();
- endOffset = fragment.getStartKey() + fragment.getEndKey();
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("RawFileScanner open:" + fragment + "," + channel.position() + ", file size :" + channel.size()
- + ", fragment length :" + fragment.getEndKey());
- }
-
- buf = BufferPool.directBuffer(64 * StorageUnit.KB);
- buffer = buf.nioBuffer(0, buf.capacity());
-
- columnTypes = new DataType[schema.size()];
- for (int i = 0; i < schema.size(); i++) {
- columnTypes[i] = schema.getColumn(i).getDataType();
- }
-
- tuple = new VTuple(columnTypes.length);
- nullFlags = new BitArray(schema.size());
- headerSize = RECORD_SIZE + 2 + nullFlags.bytesLength(); // The middle 2 bytes is for NullFlagSize
-
- // initial set position
- if (fragment.getStartKey() > 0) {
- channel.position(fragment.getStartKey());
- }
-
- forceFillBuffer = true;
- super.init();
- }
-
- @Override
- public long getNextOffset() throws IOException {
- return filePosition - (forceFillBuffer ? 0 : buffer.remaining());
- }
-
- @Override
- public void seek(long offset) throws IOException {
- eos = false;
- filePosition = channel.position();
-
- // do not fill the buffer if the offset is already included in the buffer.
- if(!forceFillBuffer && filePosition > offset && offset > filePosition - buffer.limit()){
- buffer.position((int)(offset - (filePosition - buffer.limit())));
- } else {
- if(offset < startOffset || offset > startOffset + fragment.getEndKey()){
- throw new IndexOutOfBoundsException(String.format("range(%d, %d), offset: %d",
- startOffset, startOffset + fragment.getEndKey(), offset));
- }
- channel.position(offset);
- filePosition = offset;
- buffer.clear();
- forceFillBuffer = true;
- fillBuffer();
- }
- }
-
- private boolean fillBuffer() throws IOException {
- if(!forceFillBuffer) buffer.compact();
-
- int bytesRead = channel.read(buffer);
- forceFillBuffer = false;
- if (bytesRead == -1) {
- eos = true;
- return false;
- } else {
- buffer.flip(); //The limit is set to the current filePosition and then the filePosition is set to zero
- filePosition += bytesRead;
- totalReadBytes += bytesRead;
- return true;
- }
- }
-
- /**
- * Decode a ZigZag-encoded 32-bit value. ZigZag encodes signed integers
- * into values that can be efficiently encoded with varint. (Otherwise,
- * negative values must be sign-extended to 64 bits to be varint encoded,
- * thus always taking 10 bytes on the wire.)
- *
- * @param n An unsigned 32-bit integer, stored in a signed int because
- * Java has no explicit unsigned support.
- * @return A signed 32-bit integer.
- */
- public static int decodeZigZag32(final int n) {
- return (n >>> 1) ^ -(n & 1);
- }
-
- /**
- * Decode a ZigZag-encoded 64-bit value. ZigZag encodes signed integers
- * into values that can be efficiently encoded with varint. (Otherwise,
- * negative values must be sign-extended to 64 bits to be varint encoded,
- * thus always taking 10 bytes on the wire.)
- *
- * @param n An unsigned 64-bit integer, stored in a signed int because
- * Java has no explicit unsigned support.
- * @return A signed 64-bit integer.
- */
- public static long decodeZigZag64(final long n) {
- return (n >>> 1) ^ -(n & 1);
- }
-
-
- /**
- * Read a raw Varint from the stream. If larger than 32 bits, discard the
- * upper bits.
- */
- public int readRawVarint32() throws IOException {
- byte tmp = buffer.get();
- if (tmp >= 0) {
- return tmp;
- }
- int result = tmp & 0x7f;
- if ((tmp = buffer.get()) >= 0) {
- result |= tmp << 7;
- } else {
- result |= (tmp & 0x7f) << 7;
- if ((tmp = buffer.get()) >= 0) {
- result |= tmp << 14;
- } else {
- result |= (tmp & 0x7f) << 14;
- if ((tmp = buffer.get()) >= 0) {
- result |= tmp << 21;
- } else {
- result |= (tmp & 0x7f) << 21;
- result |= (tmp = buffer.get()) << 28;
- if (tmp < 0) {
- // Discard upper 32 bits.
- for (int i = 0; i < 5; i++) {
- if (buffer.get() >= 0) {
- return result;
- }
- }
- throw new IOException("Invalid Variable int32");
- }
- }
- }
- }
- return result;
- }
-
- /** Read a raw Varint from the stream. */
- public long readRawVarint64() throws IOException {
- int shift = 0;
- long result = 0;
- while (shift < 64) {
- final byte b = buffer.get();
- result |= (long)(b & 0x7F) << shift;
- if ((b & 0x80) == 0) {
- return result;
- }
- shift += 7;
- }
- throw new IOException("Invalid Variable int64");
- }
-
- @Override
- public Tuple next() throws IOException {
- if(eos) return null;
-
- if (forceFillBuffer || buffer.remaining() < headerSize) {
- if (!fillBuffer()) {
- return null;
- }
- }
-
- // backup the buffer state
- int bufferLimit = buffer.limit();
- int recordSize = buffer.getInt();
- int nullFlagSize = buffer.getShort();
-
- buffer.limit(buffer.position() + nullFlagSize);
- nullFlags.fromByteBuffer(buffer);
- // restore the start of record contents
- buffer.limit(bufferLimit);
- if (buffer.remaining() < (recordSize - headerSize)) {
-
- //if the buffer reaches the writable size, the buffer increase the record size
- reSizeBuffer(recordSize);
-
- if (!fillBuffer()) {
- return null;
- }
- }
-
- for (int i = 0; i < columnTypes.length; i++) {
- // check if the i'th column is null
- if (nullFlags.get(i)) {
- tuple.put(i, DatumFactory.createNullDatum());
- continue;
- }
-
- switch (columnTypes[i].getType()) {
- case BOOLEAN :
- tuple.put(i, DatumFactory.createBool(buffer.get()));
- break;
-
- case BIT :
- tuple.put(i, DatumFactory.createBit(buffer.get()));
- break;
-
- case CHAR :
- int realLen = readRawVarint32();
- byte[] buf = new byte[realLen];
- buffer.get(buf);
- tuple.put(i, DatumFactory.createChar(buf));
- break;
-
- case INT2 :
- tuple.put(i, DatumFactory.createInt2(buffer.getShort()));
- break;
-
- case INT4 :
- tuple.put(i, DatumFactory.createInt4(decodeZigZag32(readRawVarint32())));
- break;
-
- case INT8 :
- tuple.put(i, DatumFactory.createInt8(decodeZigZag64(readRawVarint64())));
- break;
-
- case FLOAT4 :
- tuple.put(i, DatumFactory.createFloat4(buffer.getFloat()));
- break;
-
- case FLOAT8 :
- tuple.put(i, DatumFactory.createFloat8(buffer.getDouble()));
- break;
-
- case TEXT : {
- int len = readRawVarint32();
- byte [] strBytes = new byte[len];
- buffer.get(strBytes);
- tuple.put(i, DatumFactory.createText(strBytes));
- break;
- }
-
- case BLOB : {
- int len = readRawVarint32();
- byte [] rawBytes = new byte[len];
- buffer.get(rawBytes);
- tuple.put(i, DatumFactory.createBlob(rawBytes));
- break;
- }
-
- case PROTOBUF: {
- int len = readRawVarint32();
- byte [] rawBytes = new byte[len];
- buffer.get(rawBytes);
-
- ProtobufDatumFactory factory = ProtobufDatumFactory.get(columnTypes[i]);
- Message.Builder builder = factory.newBuilder();
- builder.mergeFrom(rawBytes);
- tuple.put(i, factory.createDatum(builder.build()));
- break;
- }
-
- case INET4 :
- byte [] ipv4Bytes = new byte[4];
- buffer.get(ipv4Bytes);
- tuple.put(i, DatumFactory.createInet4(ipv4Bytes));
- break;
-
- case DATE: {
- int val = buffer.getInt();
- if (val < Integer.MIN_VALUE + 1) {
- tuple.put(i, DatumFactory.createNullDatum());
- } else {
- tuple.put(i, DatumFactory.createFromInt4(columnTypes[i], val));
- }
- break;
- }
- case TIME:
- case TIMESTAMP: {
- long val = buffer.getLong();
- if (val < Long.MIN_VALUE + 1) {
- tuple.put(i, DatumFactory.createNullDatum());
- } else {
- tuple.put(i, DatumFactory.createFromInt8(columnTypes[i], val));
- }
- break;
- }
- case NULL_TYPE:
- tuple.put(i, NullDatum.get());
- break;
-
- default:
- }
- }
-
- recordCount++;
-
- if(filePosition - buffer.remaining() >= endOffset){
- eos = true;
- }
- return new VTuple(tuple);
- }
-
- private void reSizeBuffer(int writableBytes){
- if (buffer.capacity() - buffer.remaining() < writableBytes) {
- buf.setIndex(buffer.position(), buffer.limit());
- buf.markReaderIndex();
- buf.discardSomeReadBytes();
- buf.ensureWritable(writableBytes);
- buffer = buf.nioBuffer(0, buf.capacity());
- buffer.limit(buf.writerIndex());
- }
- }
-
- @Override
- public void reset() throws IOException {
- // reset the buffer
- buffer.clear();
- forceFillBuffer = true;
- filePosition = fragment.getStartKey();
- channel.position(filePosition);
- eos = false;
- }
-
- @Override
- public void close() throws IOException {
- if(buf != null){
- buffer.clear();
- buffer = null;
-
- buf.release();
- buf = null;
- }
-
- IOUtils.cleanup(LOG, channel, fis);
- }
-
- @Override
- public boolean isProjectable() {
- return false;
- }
-
- @Override
- public boolean isSelectable() {
- return false;
- }
-
- @Override
- public boolean isSplittable(){
- return false;
- }
-
- @Override
- public TableStats getInputStats() {
- if(tableStats != null){
- tableStats.setNumRows(recordCount);
- tableStats.setReadBytes(totalReadBytes); // actual read bytes (scan + rescan * n)
- tableStats.setNumBytes(fragment.getEndKey());
- }
- return tableStats;
- }
-
- @Override
- public float getProgress() {
- if(eos) {
- return 1.0f;
- }
-
- if (filePosition - startOffset == 0) {
- return 0.0f;
- } else {
- return Math.min(1.0f, ((float) filePosition / endOffset));
- }
- }
- }
-
- public static class RawFileAppender extends FileAppender {
- private FileChannel channel;
- private RandomAccessFile randomAccessFile;
- private DataType[] columnTypes;
-
- private ByteBuffer buffer;
- private ByteBuf buf;
- private BitArray nullFlags;
- private int headerSize = 0;
- private static final int RECORD_SIZE = 4;
- private long pos;
-
- private TableStatistics stats;
-
- public RawFileAppender(Configuration conf, Schema schema, TableMeta meta, Path path) throws IOException {
- super(conf, schema, meta, path);
- }
-
- public void init() throws IOException {
- File file;
- try {
- if (path.toUri().getScheme() != null) {
- file = new File(path.toUri());
- } else {
- file = new File(path.toString());
- }
- } catch (IllegalArgumentException iae) {
- throw new IOException(iae);
- }
-
- randomAccessFile = new RandomAccessFile(file, "rw");
- channel = randomAccessFile.getChannel();
- pos = 0;
-
- columnTypes = new DataType[schema.size()];
- for (int i = 0; i < schema.size(); i++) {
- columnTypes[i] = schema.getColumn(i).getDataType();
- }
-
- buf = BufferPool.directBuffer(64 * StorageUnit.KB);
- buffer = buf.nioBuffer(0, buf.capacity());
-
- // comput the number of bytes, representing the null flags
-
- nullFlags = new BitArray(schema.size());
- headerSize = RECORD_SIZE + 2 + nullFlags.bytesLength();
-
- if (enabledStats) {
- this.stats = new TableStatistics(this.schema);
- }
-
- super.init();
- }
-
- @Override
- public long getOffset() throws IOException {
- return pos;
- }
-
- private void flushBuffer() throws IOException {
- buffer.flip();
- channel.write(buffer);
- buffer.clear();
- }
-
- private boolean flushBufferAndReplace(int recordOffset, int sizeToBeWritten)
- throws IOException {
-
- // if the buffer reaches the limit,
- // write the bytes from 0 to the previous record.
- if (buffer.remaining() < sizeToBeWritten) {
-
- int limit = buffer.position();
- buffer.limit(recordOffset);
- buffer.flip();
- channel.write(buffer);
- buffer.position(recordOffset);
- buffer.limit(limit);
- buffer.compact();
-
- return true;
- } else {
- return false;
- }
- }
-
- /**
- * Encode a ZigZag-encoded 32-bit value. ZigZag encodes signed integers
- * into values that can be efficiently encoded with varint. (Otherwise,
- * negative values must be sign-extended to 64 bits to be varint encoded,
- * thus always taking 10 bytes on the wire.)
- *
- * @param n A signed 32-bit integer.
- * @return An unsigned 32-bit integer, stored in a signed int because
- * Java has no explicit unsigned support.
- */
- public static int encodeZigZag32(final int n) {
- // Note: the right-shift must be arithmetic
- return (n << 1) ^ (n >> 31);
- }
-
- /**
- * Encode a ZigZag-encoded 64-bit value. ZigZag encodes signed integers
- * into values that can be efficiently encoded with varint. (Otherwise,
- * negative values must be sign-extended to 64 bits to be varint encoded,
- * thus always taking 10 bytes on the wire.)
- *
- * @param n A signed 64-bit integer.
- * @return An unsigned 64-bit integer, stored in a signed int because
- * Java has no explicit unsigned support.
- */
- public static long encodeZigZag64(final long n) {
- // Note: the right-shift must be arithmetic
- return (n << 1) ^ (n >> 63);
- }
-
- /**
- * Encode and write a varint. {@code value} is treated as
- * unsigned, so it won't be sign-extended if negative.
- */
- public void writeRawVarint32(int value) throws IOException {
- while (true) {
- if ((value & ~0x7F) == 0) {
- buffer.put((byte) value);
- return;
- } else {
- buffer.put((byte) ((value & 0x7F) | 0x80));
- value >>>= 7;
- }
- }
- }
-
- /**
- * Compute the number of bytes that would be needed to encode a varint.
- * {@code value} is treated as unsigned, so it won't be sign-extended if
- * negative.
- */
- public static int computeRawVarint32Size(final int value) {
- if ((value & (0xffffffff << 7)) == 0) return 1;
- if ((value & (0xffffffff << 14)) == 0) return 2;
- if ((value & (0xffffffff << 21)) == 0) return 3;
- if ((value & (0xffffffff << 28)) == 0) return 4;
- return 5;
- }
-
- /** Encode and write a varint. */
- public void writeRawVarint64(long value) throws IOException {
- while (true) {
- if ((value & ~0x7FL) == 0) {
- buffer.put((byte) value);
- return;
- } else {
- buffer.put((byte) ((value & 0x7F) | 0x80));
- value >>>= 7;
- }
- }
- }
-
- @Override
- public void addTuple(Tuple t) throws IOException {
-
- if (buffer.remaining() < headerSize) {
- flushBuffer();
- }
-
- // skip the row header
- int recordOffset = buffer.position();
- buffer.position(recordOffset + headerSize);
- // reset the null flags
- nullFlags.clear();
- for (int i = 0; i < schema.size(); i++) {
- if (enabledStats) {
- stats.analyzeField(i, t.get(i));
- }
-
- if (t.isNull(i)) {
- nullFlags.set(i);
- continue;
- }
-
- // 8 is the maximum bytes size of all types
- if (flushBufferAndReplace(recordOffset, 8)) {
- recordOffset = 0;
- }
-
- switch(columnTypes[i].getType()) {
- case NULL_TYPE:
- nullFlags.set(i);
- continue;
-
- case BOOLEAN:
- case BIT:
- buffer.put(t.getByte(i));
- break;
-
- case INT2 :
- buffer.putShort(t.getInt2(i));
- break;
-
- case INT4 :
- writeRawVarint32(encodeZigZag32(t.getInt4(i)));
- break;
-
- case INT8 :
- writeRawVarint64(encodeZigZag64(t.getInt8(i)));
- break;
-
- case FLOAT4 :
- buffer.putFloat(t.getFloat4(i));
- break;
-
- case FLOAT8 :
- buffer.putDouble(t.getFloat8(i));
- break;
-
- case CHAR:
- case TEXT: {
- byte [] strBytes = t.getBytes(i);
- if (flushBufferAndReplace(recordOffset, strBytes.length + computeRawVarint32Size(strBytes.length))) {
- recordOffset = 0;
- }
- writeRawVarint32(strBytes.length);
- buffer.put(strBytes);
- break;
- }
-
- case DATE:
- buffer.putInt(t.getInt4(i));
- break;
-
- case TIME:
- case TIMESTAMP:
- buffer.putLong(t.getInt8(i));
- break;
-
- case BLOB : {
- byte [] rawBytes = t.getBytes(i);
- if (flushBufferAndReplace(recordOffset, rawBytes.length + computeRawVarint32Size(rawBytes.length))) {
- recordOffset = 0;
- }
- writeRawVarint32(rawBytes.length);
- buffer.put(rawBytes);
- break;
- }
-
- case PROTOBUF: {
- byte [] rawBytes = t.getBytes(i);
- if (flushBufferAndReplace(recordOffset, rawBytes.length + computeRawVarint32Size(rawBytes.length))) {
- recordOffset = 0;
- }
- writeRawVarint32(rawBytes.length);
- buffer.put(rawBytes);
- break;
- }
-
- case INET4 :
- buffer.put(t.getBytes(i));
- break;
-
- default:
- throw new IOException("Cannot support data type: " + columnTypes[i].getType());
- }
- }
-
- // write a record header
- int bufferPos = buffer.position();
- buffer.position(recordOffset);
- buffer.putInt(bufferPos - recordOffset);
- byte [] flags = nullFlags.toArray();
- buffer.putShort((short) flags.length);
- buffer.put(flags);
-
- pos += bufferPos - recordOffset;
- buffer.position(bufferPos);
-
- if (enabledStats) {
- stats.incrementRow();
- }
- }
-
- @Override
- public void flush() throws IOException {
- if(buffer != null){
- flushBuffer();
- }
- }
-
- @Override
- public void close() throws IOException {
- flush();
- if (enabledStats) {
- stats.setNumBytes(getOffset());
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("RawFileAppender written: " + getOffset() + " bytes, path: " + path);
- }
-
- if(buf != null){
- buffer.clear();
- buffer = null;
-
- buf.release();
- buf = null;
- }
-
- IOUtils.cleanup(LOG, channel, randomAccessFile);
- }
-
- @Override
- public TableStats getStats() {
- if (enabledStats) {
- stats.setNumBytes(pos);
- return stats.getTableStat();
- } else {
- return null;
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/RowFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/RowFile.java b/tajo-storage/src/main/java/org/apache/tajo/storage/RowFile.java
deleted file mode 100644
index db36771..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/RowFile.java
+++ /dev/null
@@ -1,496 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.conf.TajoConf.ConfVars;
-import org.apache.tajo.datum.Datum;
-import org.apache.tajo.datum.DatumFactory;
-import org.apache.tajo.storage.exception.AlreadyExistsStorageException;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.util.BitArray;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
-import java.util.Arrays;
-
-public class RowFile {
- public static final Log LOG = LogFactory.getLog(RowFile.class);
-
- private static final int SYNC_ESCAPE = -1;
- private static final int SYNC_HASH_SIZE = 16;
- private static final int SYNC_SIZE = 4 + SYNC_HASH_SIZE;
- private final static int DEFAULT_BUFFER_SIZE = 65535;
- public static int SYNC_INTERVAL;
-
- public static class RowFileScanner extends FileScanner {
- private FileSystem fs;
- private FSDataInputStream in;
- private Tuple tuple;
-
- private byte[] sync = new byte[SYNC_HASH_SIZE];
- private byte[] checkSync = new byte[SYNC_HASH_SIZE];
- private long start, end;
-
- private ByteBuffer buffer;
- private final int tupleHeaderSize;
- private BitArray nullFlags;
- private long bufferStartPos;
-
- public RowFileScanner(Configuration conf, final Schema schema, final TableMeta meta, final FileFragment fragment)
- throws IOException {
- super(conf, schema, meta, fragment);
-
- SYNC_INTERVAL = conf.getInt(ConfVars.ROWFILE_SYNC_INTERVAL.varname,
- ConfVars.ROWFILE_SYNC_INTERVAL.defaultIntVal) * SYNC_SIZE;
-
- nullFlags = new BitArray(schema.size());
- tupleHeaderSize = nullFlags.bytesLength() + (2 * Short.SIZE / 8);
- this.start = fragment.getStartKey();
- this.end = this.start + fragment.getEndKey();
- }
-
- public void init() throws IOException {
- // set default page size.
- fs = fragment.getPath().getFileSystem(conf);
- in = fs.open(fragment.getPath());
- buffer = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE * schema.size());
- buffer.flip();
-
- readHeader();
-
- // find the correct position from the start
- if (this.start > in.getPos()) {
- long realStart = start > SYNC_SIZE ? (start-SYNC_SIZE) : 0;
- in.seek(realStart);
- }
- bufferStartPos = in.getPos();
- fillBuffer();
-
- if (start != 0) {
- // TODO: improve
- boolean syncFound = false;
- while (!syncFound) {
- if (buffer.remaining() < SYNC_SIZE) {
- fillBuffer();
- }
- buffer.mark();
- syncFound = checkSync();
- if (!syncFound) {
- buffer.reset();
- buffer.get(); // proceed one byte
- }
- }
- bufferStartPos += buffer.position();
- buffer.compact();
- buffer.flip();
- }
-
- super.init();
- }
-
- private void readHeader() throws IOException {
- SYNC_INTERVAL = in.readInt();
- StorageUtil.readFully(in, this.sync, 0, SYNC_HASH_SIZE);
- }
-
- /**
- * Find the sync from the front of the buffer
- *
- * @return return true if it succeeds to find the sync.
- * @throws IOException
- */
- private boolean checkSync() throws IOException {
- buffer.getInt(); // escape
- buffer.get(checkSync, 0, SYNC_HASH_SIZE); // sync
- return Arrays.equals(checkSync, sync);
- }
-
- private int fillBuffer() throws IOException {
- bufferStartPos += buffer.position();
- buffer.compact();
- int remain = buffer.remaining();
- int read = in.read(buffer);
- if (read == -1) {
- buffer.flip();
- return read;
- } else {
- int totalRead = read;
- if (remain > totalRead) {
- read = in.read(buffer);
- totalRead += read > 0 ? read : 0;
- }
- buffer.flip();
- return totalRead;
- }
- }
-
- @Override
- public Tuple next() throws IOException {
- while (buffer.remaining() < SYNC_SIZE) {
- if (fillBuffer() < 0) {
- return null;
- }
- }
-
- buffer.mark();
- if (!checkSync()) {
- buffer.reset();
- } else {
- if (bufferStartPos + buffer.position() > end) {
- return null;
- }
- }
-
- while (buffer.remaining() < tupleHeaderSize) {
- if (fillBuffer() < 0) {
- return null;
- }
- }
-
- int i;
- tuple = new VTuple(schema.size());
-
- int nullFlagSize = buffer.getShort();
- byte[] nullFlagBytes = new byte[nullFlagSize];
- buffer.get(nullFlagBytes, 0, nullFlagSize);
- nullFlags = new BitArray(nullFlagBytes);
- int tupleSize = buffer.getShort();
-
- while (buffer.remaining() < (tupleSize)) {
- if (fillBuffer() < 0) {
- return null;
- }
- }
-
- Datum datum;
- Column col;
- for (i = 0; i < schema.size(); i++) {
- if (!nullFlags.get(i)) {
- col = schema.getColumn(i);
- switch (col.getDataType().getType()) {
- case BOOLEAN :
- datum = DatumFactory.createBool(buffer.get());
- tuple.put(i, datum);
- break;
-
- case BIT:
- datum = DatumFactory.createBit(buffer.get());
- tuple.put(i, datum );
- break;
-
- case CHAR :
- int realLen = buffer.getInt();
- byte[] buf = new byte[col.getDataType().getLength()];
- buffer.get(buf);
- byte[] charBuf = Arrays.copyOf(buf, realLen);
- tuple.put(i, DatumFactory.createChar(charBuf));
- break;
-
- case INT2 :
- datum = DatumFactory.createInt2(buffer.getShort());
- tuple.put(i, datum );
- break;
-
- case INT4 :
- datum = DatumFactory.createInt4(buffer.getInt());
- tuple.put(i, datum );
- break;
-
- case INT8 :
- datum = DatumFactory.createInt8(buffer.getLong());
- tuple.put(i, datum );
- break;
-
- case FLOAT4 :
- datum = DatumFactory.createFloat4(buffer.getFloat());
- tuple.put(i, datum);
- break;
-
- case FLOAT8 :
- datum = DatumFactory.createFloat8(buffer.getDouble());
- tuple.put(i, datum);
- break;
-
- case TEXT:
- short bytelen = buffer.getShort();
- byte[] strbytes = new byte[bytelen];
- buffer.get(strbytes, 0, bytelen);
- datum = DatumFactory.createText(strbytes);
- tuple.put(i, datum);
- break;
-
- case BLOB:
- short bytesLen = buffer.getShort();
- byte [] bytesBuf = new byte[bytesLen];
- buffer.get(bytesBuf);
- datum = DatumFactory.createBlob(bytesBuf);
- tuple.put(i, datum);
- break;
-
- case INET4 :
- byte[] ipv4 = new byte[4];
- buffer.get(ipv4, 0, 4);
- datum = DatumFactory.createInet4(ipv4);
- tuple.put(i, datum);
- break;
-
- default:
- break;
- }
- } else {
- tuple.put(i, DatumFactory.createNullDatum());
- }
- }
- return tuple;
- }
-
- @Override
- public void reset() throws IOException {
- init();
- }
-
- @Override
- public void close() throws IOException {
- if (in != null) {
- in.close();
- }
- }
-
- @Override
- public boolean isProjectable() {
- return false;
- }
-
- @Override
- public boolean isSelectable() {
- return false;
- }
-
- @Override
- public boolean isSplittable(){
- return true;
- }
- }
-
- public static class RowFileAppender extends FileAppender {
- private FSDataOutputStream out;
- private long lastSyncPos;
- private FileSystem fs;
- private byte[] sync;
- private ByteBuffer buffer;
-
- private BitArray nullFlags;
- // statistics
- private TableStatistics stats;
-
- public RowFileAppender(Configuration conf, final Schema schema, final TableMeta meta, final Path path)
- throws IOException {
- super(conf, schema, meta, path);
- }
-
- public void init() throws IOException {
- SYNC_INTERVAL = conf.getInt(ConfVars.ROWFILE_SYNC_INTERVAL.varname,
- ConfVars.ROWFILE_SYNC_INTERVAL.defaultIntVal);
- fs = path.getFileSystem(conf);
-
- if (!fs.exists(path.getParent())) {
- throw new FileNotFoundException(path.toString());
- }
-
- if (fs.exists(path)) {
- throw new AlreadyExistsStorageException(path);
- }
-
- sync = new byte[SYNC_HASH_SIZE];
- lastSyncPos = 0;
-
- out = fs.create(path);
-
- MessageDigest md;
- try {
- md = MessageDigest.getInstance("MD5");
- md.update((path.toString()+System.currentTimeMillis()).getBytes());
- sync = md.digest();
- } catch (NoSuchAlgorithmException e) {
- LOG.error(e);
- }
-
- writeHeader();
-
- buffer = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE);
-
- nullFlags = new BitArray(schema.size());
-
- if (enabledStats) {
- this.stats = new TableStatistics(this.schema);
- }
- }
-
- private void writeHeader() throws IOException {
- out.writeInt(SYNC_INTERVAL);
- out.write(sync);
- out.flush();
- lastSyncPos = out.getPos();
- }
-
- @Override
- public void addTuple(Tuple t) throws IOException {
- checkAndWriteSync();
- Column col;
-
- buffer.clear();
- nullFlags.clear();
-
- for (int i = 0; i < schema.size(); i++) {
- if (enabledStats) {
- stats.analyzeField(i, t.get(i));
- }
-
- if (t.isNull(i)) {
- nullFlags.set(i);
- } else {
- col = schema.getColumn(i);
- switch (col.getDataType().getType()) {
- case BOOLEAN:
- buffer.put(t.get(i).asByte());
- break;
- case BIT:
- buffer.put(t.get(i).asByte());
- break;
- case CHAR:
- byte[] src = t.get(i).asByteArray();
- byte[] dst = Arrays.copyOf(src, col.getDataType().getLength());
- buffer.putInt(src.length);
- buffer.put(dst);
- break;
- case TEXT:
- byte [] strbytes = t.get(i).asByteArray();
- buffer.putShort((short)strbytes.length);
- buffer.put(strbytes, 0, strbytes.length);
- break;
- case INT2:
- buffer.putShort(t.get(i).asInt2());
- break;
- case INT4:
- buffer.putInt(t.get(i).asInt4());
- break;
- case INT8:
- buffer.putLong(t.get(i).asInt8());
- break;
- case FLOAT4:
- buffer.putFloat(t.get(i).asFloat4());
- break;
- case FLOAT8:
- buffer.putDouble(t.get(i).asFloat8());
- break;
- case BLOB:
- byte [] bytes = t.get(i).asByteArray();
- buffer.putShort((short)bytes.length);
- buffer.put(bytes);
- break;
- case INET4:
- buffer.put(t.get(i).asByteArray());
- break;
- case INET6:
- buffer.put(t.get(i).asByteArray());
- break;
- case NULL_TYPE:
- nullFlags.set(i);
- break;
- default:
- break;
- }
- }
- }
-
- byte[] bytes = nullFlags.toArray();
- out.writeShort(bytes.length);
- out.write(bytes);
-
- bytes = buffer.array();
- int dataLen = buffer.position();
- out.writeShort(dataLen);
- out.write(bytes, 0, dataLen);
-
- // Statistical section
- if (enabledStats) {
- stats.incrementRow();
- }
- }
-
- @Override
- public long getOffset() throws IOException {
- return out.getPos();
- }
-
- @Override
- public void flush() throws IOException {
- out.flush();
- }
-
- @Override
- public void close() throws IOException {
- if (out != null) {
- if (enabledStats) {
- stats.setNumBytes(out.getPos());
- }
- sync();
- out.flush();
- out.close();
- }
- }
-
- private void sync() throws IOException {
- if (lastSyncPos != out.getPos()) {
- out.writeInt(SYNC_ESCAPE);
- out.write(sync);
- lastSyncPos = out.getPos();
- }
- }
-
- private void checkAndWriteSync() throws IOException {
- if (out.getPos() >= lastSyncPos + SYNC_INTERVAL) {
- sync();
- }
- }
-
- @Override
- public TableStats getStats() {
- if (enabledStats) {
- return stats.getTableStat();
- } else {
- return null;
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/RowStoreUtil.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/RowStoreUtil.java b/tajo-storage/src/main/java/org/apache/tajo/storage/RowStoreUtil.java
deleted file mode 100644
index 24b6280..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/RowStoreUtil.java
+++ /dev/null
@@ -1,377 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.common.TajoDataTypes;
-import org.apache.tajo.datum.DatumFactory;
-import org.apache.tajo.datum.IntervalDatum;
-import org.apache.tajo.datum.ProtobufDatum;
-import org.apache.tajo.exception.UnsupportedException;
-import org.apache.tajo.storage.exception.UnknownDataTypeException;
-import org.apache.tajo.tuple.offheap.RowWriter;
-import org.apache.tajo.util.BitArray;
-
-import java.nio.ByteBuffer;
-
-public class RowStoreUtil {
- public static int[] getTargetIds(Schema inSchema, Schema outSchema) {
- int[] targetIds = new int[outSchema.size()];
- int i = 0;
- for (Column target : outSchema.getColumns()) {
- targetIds[i] = inSchema.getColumnId(target.getQualifiedName());
- i++;
- }
-
- return targetIds;
- }
-
- public static Tuple project(Tuple in, Tuple out, int[] targetIds) {
- out.clear();
- for (int idx = 0; idx < targetIds.length; idx++) {
- out.put(idx, in.get(targetIds[idx]));
- }
- return out;
- }
-
- public static RowStoreEncoder createEncoder(Schema schema) {
- return new RowStoreEncoder(schema);
- }
-
- public static RowStoreDecoder createDecoder(Schema schema) {
- return new RowStoreDecoder(schema);
- }
-
- public static class RowStoreDecoder {
-
- private Schema schema;
- private BitArray nullFlags;
- private int headerSize;
-
- private RowStoreDecoder(Schema schema) {
- this.schema = schema;
- nullFlags = new BitArray(schema.size());
- headerSize = nullFlags.bytesLength();
- }
-
-
- public Tuple toTuple(byte [] bytes) {
- nullFlags.clear();
- ByteBuffer bb = ByteBuffer.wrap(bytes);
- Tuple tuple = new VTuple(schema.size());
- Column col;
- TajoDataTypes.DataType type;
-
- bb.limit(headerSize);
- nullFlags.fromByteBuffer(bb);
- bb.limit(bytes.length);
-
- for (int i =0; i < schema.size(); i++) {
- if (nullFlags.get(i)) {
- tuple.put(i, DatumFactory.createNullDatum());
- continue;
- }
-
- col = schema.getColumn(i);
- type = col.getDataType();
- switch (type.getType()) {
- case BOOLEAN: tuple.put(i, DatumFactory.createBool(bb.get())); break;
- case BIT:
- byte b = bb.get();
- tuple.put(i, DatumFactory.createBit(b));
- break;
-
- case CHAR:
- byte c = bb.get();
- tuple.put(i, DatumFactory.createChar(c));
- break;
-
- case INT2:
- short s = bb.getShort();
- tuple.put(i, DatumFactory.createInt2(s));
- break;
-
- case INT4:
- case DATE:
- int i_ = bb.getInt();
- tuple.put(i, DatumFactory.createFromInt4(type, i_));
- break;
-
- case INT8:
- case TIME:
- case TIMESTAMP:
- long l = bb.getLong();
- tuple.put(i, DatumFactory.createFromInt8(type, l));
- break;
-
- case INTERVAL:
- int month = bb.getInt();
- long milliseconds = bb.getLong();
- tuple.put(i, new IntervalDatum(month, milliseconds));
- break;
-
- case FLOAT4:
- float f = bb.getFloat();
- tuple.put(i, DatumFactory.createFloat4(f));
- break;
-
- case FLOAT8:
- double d = bb.getDouble();
- tuple.put(i, DatumFactory.createFloat8(d));
- break;
-
- case TEXT:
- byte [] _string = new byte[bb.getInt()];
- bb.get(_string);
- tuple.put(i, DatumFactory.createText(_string));
- break;
-
- case BLOB:
- byte [] _bytes = new byte[bb.getInt()];
- bb.get(_bytes);
- tuple.put(i, DatumFactory.createBlob(_bytes));
- break;
-
- case INET4:
- byte [] _ipv4 = new byte[4];
- bb.get(_ipv4);
- tuple.put(i, DatumFactory.createInet4(_ipv4));
- break;
- case INET6:
- // TODO - to be implemented
- throw new UnsupportedException(type.getType().name());
- default:
- throw new RuntimeException(new UnknownDataTypeException(type.getType().name()));
- }
- }
- return tuple;
- }
-
- public Schema getSchema() {
- return schema;
- }
- }
-
- public static class RowStoreEncoder {
- private Schema schema;
- private BitArray nullFlags;
- private int headerSize;
-
- private RowStoreEncoder(Schema schema) {
- this.schema = schema;
- nullFlags = new BitArray(schema.size());
- headerSize = nullFlags.bytesLength();
- }
-
- public byte[] toBytes(Tuple tuple) {
- nullFlags.clear();
- int size = estimateTupleDataSize(tuple);
- ByteBuffer bb = ByteBuffer.allocate(size + headerSize);
- bb.position(headerSize);
- Column col;
- for (int i = 0; i < schema.size(); i++) {
- if (tuple.isNull(i)) {
- nullFlags.set(i);
- continue;
- }
-
- col = schema.getColumn(i);
- switch (col.getDataType().getType()) {
- case NULL_TYPE:
- nullFlags.set(i);
- break;
- case BOOLEAN:
- bb.put(tuple.get(i).asByte());
- break;
- case BIT:
- bb.put(tuple.get(i).asByte());
- break;
- case CHAR:
- bb.put(tuple.get(i).asByte());
- break;
- case INT2:
- bb.putShort(tuple.get(i).asInt2());
- break;
- case INT4:
- bb.putInt(tuple.get(i).asInt4());
- break;
- case INT8:
- bb.putLong(tuple.get(i).asInt8());
- break;
- case FLOAT4:
- bb.putFloat(tuple.get(i).asFloat4());
- break;
- case FLOAT8:
- bb.putDouble(tuple.get(i).asFloat8());
- break;
- case TEXT:
- byte[] _string = tuple.get(i).asByteArray();
- bb.putInt(_string.length);
- bb.put(_string);
- break;
- case DATE:
- bb.putInt(tuple.get(i).asInt4());
- break;
- case TIME:
- case TIMESTAMP:
- bb.putLong(tuple.get(i).asInt8());
- break;
- case INTERVAL:
- IntervalDatum interval = (IntervalDatum) tuple.get(i);
- bb.putInt(interval.getMonths());
- bb.putLong(interval.getMilliSeconds());
- break;
- case BLOB:
- byte[] bytes = tuple.get(i).asByteArray();
- bb.putInt(bytes.length);
- bb.put(bytes);
- break;
- case INET4:
- byte[] ipBytes = tuple.get(i).asByteArray();
- bb.put(ipBytes);
- break;
- case INET6:
- bb.put(tuple.get(i).asByteArray());
- break;
- default:
- throw new RuntimeException(new UnknownDataTypeException(col.getDataType().getType().name()));
- }
- }
-
- byte[] flags = nullFlags.toArray();
- int finalPosition = bb.position();
- bb.position(0);
- bb.put(flags);
-
- bb.position(finalPosition);
- bb.flip();
- byte[] buf = new byte[bb.limit()];
- bb.get(buf);
- return buf;
- }
-
- // Note that, NULL values are treated separately
- private int estimateTupleDataSize(Tuple tuple) {
- int size = 0;
- Column col;
-
- for (int i = 0; i < schema.size(); i++) {
- if (tuple.isNull(i)) {
- continue;
- }
-
- col = schema.getColumn(i);
- switch (col.getDataType().getType()) {
- case BOOLEAN:
- case BIT:
- case CHAR:
- size += 1;
- break;
- case INT2:
- size += 2;
- break;
- case DATE:
- case INT4:
- case FLOAT4:
- size += 4;
- break;
- case TIME:
- case TIMESTAMP:
- case INT8:
- case FLOAT8:
- size += 8;
- break;
- case INTERVAL:
- size += 12;
- break;
- case TEXT:
- case BLOB:
- size += (4 + tuple.get(i).asByteArray().length);
- break;
- case INET4:
- case INET6:
- size += tuple.get(i).asByteArray().length;
- break;
- default:
- throw new RuntimeException(new UnknownDataTypeException(col.getDataType().getType().name()));
- }
- }
-
- size += 100; // optimistic reservation
-
- return size;
- }
-
- public Schema getSchema() {
- return schema;
- }
- }
-
- public static void convert(Tuple tuple, RowWriter writer) {
- writer.startRow();
-
- for (int i = 0; i < writer.dataTypes().length; i++) {
- if (tuple.isNull(i)) {
- writer.skipField();
- continue;
- }
- switch (writer.dataTypes()[i].getType()) {
- case BOOLEAN:
- writer.putBool(tuple.getBool(i));
- break;
- case INT1:
- case INT2:
- writer.putInt2(tuple.getInt2(i));
- break;
- case INT4:
- case DATE:
- case INET4:
- writer.putInt4(tuple.getInt4(i));
- break;
- case INT8:
- case TIMESTAMP:
- case TIME:
- writer.putInt8(tuple.getInt8(i));
- break;
- case FLOAT4:
- writer.putFloat4(tuple.getFloat4(i));
- break;
- case FLOAT8:
- writer.putFloat8(tuple.getFloat8(i));
- break;
- case TEXT:
- writer.putText(tuple.getBytes(i));
- break;
- case INTERVAL:
- writer.putInterval((IntervalDatum) tuple.getInterval(i));
- break;
- case PROTOBUF:
- writer.putProtoDatum((ProtobufDatum) tuple.getProtobufDatum(i));
- break;
- case NULL_TYPE:
- writer.skipField();
- break;
- default:
- throw new UnsupportedException("Unknown data type: " + writer.dataTypes()[i]);
- }
- }
- writer.endRow();
- }
-}