You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by jh...@apache.org on 2013/12/18 07:03:05 UTC
git commit: TAJO-424: Make serializer/deserializer configurable in
CSVFile. (jinho)
Updated Branches:
refs/heads/master 775cfaaad -> 2c53ccc11
TAJO-424: Make serializer/deserializer configurable in CSVFile. (jinho)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/2c53ccc1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/2c53ccc1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/2c53ccc1
Branch: refs/heads/master
Commit: 2c53ccc11ffc94bbbddfaa4fe1831b12405ff229
Parents: 775cfaa
Author: jinossy <ji...@gmail.com>
Authored: Wed Dec 18 15:01:24 2013 +0900
Committer: jinossy <ji...@gmail.com>
Committed: Wed Dec 18 15:01:24 2013 +0900
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../storage/BinarySerializeDeserialize.java | 257 -------------------
.../storage/BinarySerializerDeserializer.java | 257 +++++++++++++++++++
.../java/org/apache/tajo/storage/CSVFile.java | 77 ++++--
.../tajo/storage/CompressedSplitLineReader.java | 182 +++++++++++++
.../java/org/apache/tajo/storage/LazyTuple.java | 7 +-
.../org/apache/tajo/storage/LineReader.java | 10 +-
.../tajo/storage/SerializeDeserialize.java | 34 ---
.../tajo/storage/SerializerDeserializer.java | 34 +++
.../apache/tajo/storage/SplitLineReader.java | 39 +++
.../tajo/storage/TextSerializeDeserialize.java | 204 ---------------
.../storage/TextSerializerDeserializer.java | 204 +++++++++++++++
.../org/apache/tajo/storage/rcfile/RCFile.java | 12 +-
.../apache/tajo/storage/v2/RCFileScanner.java | 11 +-
.../tajo/storage/TestCompressionStorages.java | 2 +-
.../org/apache/tajo/storage/TestLazyTuple.java | 4 +-
.../org/apache/tajo/storage/TestStorages.java | 4 +-
17 files changed, 797 insertions(+), 543 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/2c53ccc1/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 341787c..83f39ab 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -94,6 +94,8 @@ Release 0.8.0 - unreleased
IMPROVEMENTS
+ TAJO-424: Make serializer/deserializer configurable in CSVFile. (jinho)
+
TAJO-419: Add missing visitor methods of AlgebraVisitor and
BaseAlgebraVisitor. (hyunsik)
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/2c53ccc1/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/BinarySerializeDeserialize.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/BinarySerializeDeserialize.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/BinarySerializeDeserialize.java
deleted file mode 100644
index 27de655..0000000
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/BinarySerializeDeserialize.java
+++ /dev/null
@@ -1,257 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import com.google.common.base.Preconditions;
-import com.google.protobuf.Message;
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.datum.*;
-import org.apache.tajo.util.Bytes;
-
-import java.io.IOException;
-import java.io.OutputStream;
-
-public class BinarySerializeDeserialize implements SerializeDeserialize {
-
- static final byte[] INVALID_UTF__SINGLE_BYTE = {(byte) Integer.parseInt("10111111", 2)};
-
- @Override
- public int serialize(Column col, Datum datum, OutputStream out, byte[] nullCharacters)
- throws IOException {
- byte[] bytes;
- int length = 0;
- if (datum == null || datum instanceof NullDatum) {
- return 0;
- }
-
- switch (col.getDataType().getType()) {
- case BOOLEAN:
- case BIT:
- case CHAR:
- bytes = datum.asByteArray();
- length = bytes.length;
- out.write(bytes, 0, length);
- break;
- case INT2:
- length = writeShort(out, datum.asInt2());
- break;
- case INT4:
- length = writeVLong(out, datum.asInt4());
- break;
- case INT8:
- length = writeVLong(out, datum.asInt8());
- break;
- case FLOAT4:
- length = writeFloat(out, datum.asFloat4());
- break;
- case FLOAT8:
- length = writeDouble(out, datum.asFloat8());
- break;
- case TEXT: {
- bytes = datum.asTextBytes();
- length = datum.size();
- if (length == 0) {
- bytes = INVALID_UTF__SINGLE_BYTE;
- length = INVALID_UTF__SINGLE_BYTE.length;
- }
- out.write(bytes, 0, bytes.length);
- break;
- }
- case BLOB:
- case INET4:
- case INET6:
- bytes = datum.asByteArray();
- length = bytes.length;
- out.write(bytes, 0, length);
- break;
- case PROTOBUF:
- ProtobufDatum protobufDatum = (ProtobufDatum) datum;
- bytes = protobufDatum.asByteArray();
- length = bytes.length;
- out.write(bytes, 0, length);
- break;
- case NULL_TYPE:
- break;
- default:
- throw new IOException("Does not support type");
- }
- return length;
- }
-
- @Override
- public Datum deserialize(Column col, byte[] bytes, int offset, int length, byte[] nullCharacters) throws IOException {
- if (length == 0) return NullDatum.get();
-
- Datum datum;
- switch (col.getDataType().getType()) {
- case BOOLEAN:
- datum = DatumFactory.createBool(bytes[offset]);
- break;
- case BIT:
- datum = DatumFactory.createBit(bytes[offset]);
- break;
- case CHAR: {
- byte[] chars = new byte[length];
- System.arraycopy(bytes, offset, chars, 0, length);
- datum = DatumFactory.createChar(chars);
- break;
- }
- case INT2:
- datum = DatumFactory.createInt2(Bytes.toShort(bytes, offset, length));
- break;
- case INT4:
- datum = DatumFactory.createInt4((int) Bytes.readVLong(bytes, offset));
- break;
- case INT8:
- datum = DatumFactory.createInt8(Bytes.readVLong(bytes, offset));
- break;
- case FLOAT4:
- datum = DatumFactory.createFloat4(toFloat(bytes, offset, length));
- break;
- case FLOAT8:
- datum = DatumFactory.createFloat8(toDouble(bytes, offset, length));
- break;
- case TEXT: {
- byte[] chars = new byte[length];
- System.arraycopy(bytes, offset, chars, 0, length);
-
- if (Bytes.equals(INVALID_UTF__SINGLE_BYTE, chars)) {
- datum = DatumFactory.createText(new byte[0]);
- } else {
- datum = DatumFactory.createText(chars);
- }
- break;
- }
- case PROTOBUF: {
- ProtobufDatumFactory factory = ProtobufDatumFactory.get(col.getDataType().getCode());
- Message.Builder builder = factory.newBuilder();
- builder.mergeFrom(bytes, offset, length);
- datum = factory.createDatum(builder);
- break;
- }
- case INET4:
- datum = DatumFactory.createInet4(bytes, offset, length);
- break;
- case BLOB:
- datum = DatumFactory.createBlob(bytes, offset, length);
- break;
- default:
- datum = NullDatum.get();
- }
- return datum;
- }
-
- private byte[] shortBytes = new byte[2];
-
- public int writeShort(OutputStream out, short val) throws IOException {
- shortBytes[0] = (byte) (val >> 8);
- shortBytes[1] = (byte) val;
- out.write(shortBytes, 0, 2);
- return 2;
- }
-
- public float toFloat(byte[] bytes, int offset, int length) {
- Preconditions.checkArgument(length == 4);
-
- int val = ((bytes[offset] & 0x000000FF) << 24) +
- ((bytes[offset + 1] & 0x000000FF) << 16) +
- ((bytes[offset + 2] & 0x000000FF) << 8) +
- (bytes[offset + 3] & 0x000000FF);
- return Float.intBitsToFloat(val);
- }
-
- private byte[] floatBytes = new byte[4];
-
- public int writeFloat(OutputStream out, float f) throws IOException {
- int val = Float.floatToIntBits(f);
-
- floatBytes[0] = (byte) (val >> 24);
- floatBytes[1] = (byte) (val >> 16);
- floatBytes[2] = (byte) (val >> 8);
- floatBytes[3] = (byte) val;
- out.write(floatBytes, 0, 4);
- return floatBytes.length;
- }
-
- public double toDouble(byte[] bytes, int offset, int length) {
- Preconditions.checkArgument(length == 8);
- long val = ((long) (bytes[offset] & 0x00000000000000FF) << 56) +
- ((long) (bytes[offset + 1] & 0x00000000000000FF) << 48) +
- ((long) (bytes[offset + 2] & 0x00000000000000FF) << 40) +
- ((long) (bytes[offset + 3] & 0x00000000000000FF) << 32) +
- ((long) (bytes[offset + 4] & 0x00000000000000FF) << 24) +
- ((long) (bytes[offset + 5] & 0x00000000000000FF) << 16) +
- ((long) (bytes[offset + 6] & 0x00000000000000FF) << 8) +
- (long) (bytes[offset + 7] & 0x00000000000000FF);
- return Double.longBitsToDouble(val);
- }
-
- private byte[] doubleBytes = new byte[8];
-
- public int writeDouble(OutputStream out, double d) throws IOException {
- long val = Double.doubleToLongBits(d);
-
- doubleBytes[0] = (byte) (val >> 56);
- doubleBytes[1] = (byte) (val >> 48);
- doubleBytes[2] = (byte) (val >> 40);
- doubleBytes[3] = (byte) (val >> 32);
- doubleBytes[4] = (byte) (val >> 24);
- doubleBytes[5] = (byte) (val >> 16);
- doubleBytes[6] = (byte) (val >> 8);
- doubleBytes[7] = (byte) val;
- out.write(doubleBytes, 0, 8);
- return doubleBytes.length;
- }
-
- private byte[] vLongBytes = new byte[9];
-
- public int writeVLongToByteArray(byte[] bytes, int offset, long l) {
- if (l >= -112 && l <= 127) {
- bytes[offset] = (byte) l;
- return 1;
- }
-
- int len = -112;
- if (l < 0) {
- l ^= -1L; // take one's complement'
- len = -120;
- }
-
- long tmp = l;
- while (tmp != 0) {
- tmp = tmp >> 8;
- len--;
- }
-
- bytes[offset++] = (byte) len;
- len = (len < -120) ? -(len + 120) : -(len + 112);
-
- for (int idx = len; idx != 0; idx--) {
- int shiftbits = (idx - 1) * 8;
- bytes[offset++] = (byte) ((l & (0xFFL << shiftbits)) >> shiftbits);
- }
- return 1 + len;
- }
-
- public int writeVLong(OutputStream out, long l) throws IOException {
- int len = writeVLongToByteArray(vLongBytes, 0, l);
- out.write(vLongBytes, 0, len);
- return len;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/2c53ccc1/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/BinarySerializerDeserializer.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/BinarySerializerDeserializer.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/BinarySerializerDeserializer.java
new file mode 100644
index 0000000..ed034be
--- /dev/null
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/BinarySerializerDeserializer.java
@@ -0,0 +1,257 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.Message;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.datum.*;
+import org.apache.tajo.util.Bytes;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+public class BinarySerializerDeserializer implements SerializerDeserializer {
+
+ static final byte[] INVALID_UTF__SINGLE_BYTE = {(byte) Integer.parseInt("10111111", 2)};
+
+ @Override
+ public int serialize(Column col, Datum datum, OutputStream out, byte[] nullCharacters)
+ throws IOException {
+ byte[] bytes;
+ int length = 0;
+ if (datum == null || datum instanceof NullDatum) {
+ return 0;
+ }
+
+ switch (col.getDataType().getType()) {
+ case BOOLEAN:
+ case BIT:
+ case CHAR:
+ bytes = datum.asByteArray();
+ length = bytes.length;
+ out.write(bytes, 0, length);
+ break;
+ case INT2:
+ length = writeShort(out, datum.asInt2());
+ break;
+ case INT4:
+ length = writeVLong(out, datum.asInt4());
+ break;
+ case INT8:
+ length = writeVLong(out, datum.asInt8());
+ break;
+ case FLOAT4:
+ length = writeFloat(out, datum.asFloat4());
+ break;
+ case FLOAT8:
+ length = writeDouble(out, datum.asFloat8());
+ break;
+ case TEXT: {
+ bytes = datum.asTextBytes();
+ length = datum.size();
+ if (length == 0) {
+ bytes = INVALID_UTF__SINGLE_BYTE;
+ length = INVALID_UTF__SINGLE_BYTE.length;
+ }
+ out.write(bytes, 0, bytes.length);
+ break;
+ }
+ case BLOB:
+ case INET4:
+ case INET6:
+ bytes = datum.asByteArray();
+ length = bytes.length;
+ out.write(bytes, 0, length);
+ break;
+ case PROTOBUF:
+ ProtobufDatum protobufDatum = (ProtobufDatum) datum;
+ bytes = protobufDatum.asByteArray();
+ length = bytes.length;
+ out.write(bytes, 0, length);
+ break;
+ case NULL_TYPE:
+ break;
+ default:
+ throw new IOException("Does not support type");
+ }
+ return length;
+ }
+
+ @Override
+ public Datum deserialize(Column col, byte[] bytes, int offset, int length, byte[] nullCharacters) throws IOException {
+ if (length == 0) return NullDatum.get();
+
+ Datum datum;
+ switch (col.getDataType().getType()) {
+ case BOOLEAN:
+ datum = DatumFactory.createBool(bytes[offset]);
+ break;
+ case BIT:
+ datum = DatumFactory.createBit(bytes[offset]);
+ break;
+ case CHAR: {
+ byte[] chars = new byte[length];
+ System.arraycopy(bytes, offset, chars, 0, length);
+ datum = DatumFactory.createChar(chars);
+ break;
+ }
+ case INT2:
+ datum = DatumFactory.createInt2(Bytes.toShort(bytes, offset, length));
+ break;
+ case INT4:
+ datum = DatumFactory.createInt4((int) Bytes.readVLong(bytes, offset));
+ break;
+ case INT8:
+ datum = DatumFactory.createInt8(Bytes.readVLong(bytes, offset));
+ break;
+ case FLOAT4:
+ datum = DatumFactory.createFloat4(toFloat(bytes, offset, length));
+ break;
+ case FLOAT8:
+ datum = DatumFactory.createFloat8(toDouble(bytes, offset, length));
+ break;
+ case TEXT: {
+ byte[] chars = new byte[length];
+ System.arraycopy(bytes, offset, chars, 0, length);
+
+ if (Bytes.equals(INVALID_UTF__SINGLE_BYTE, chars)) {
+ datum = DatumFactory.createText(new byte[0]);
+ } else {
+ datum = DatumFactory.createText(chars);
+ }
+ break;
+ }
+ case PROTOBUF: {
+ ProtobufDatumFactory factory = ProtobufDatumFactory.get(col.getDataType().getCode());
+ Message.Builder builder = factory.newBuilder();
+ builder.mergeFrom(bytes, offset, length);
+ datum = factory.createDatum(builder);
+ break;
+ }
+ case INET4:
+ datum = DatumFactory.createInet4(bytes, offset, length);
+ break;
+ case BLOB:
+ datum = DatumFactory.createBlob(bytes, offset, length);
+ break;
+ default:
+ datum = NullDatum.get();
+ }
+ return datum;
+ }
+
+ private byte[] shortBytes = new byte[2];
+
+ public int writeShort(OutputStream out, short val) throws IOException {
+ shortBytes[0] = (byte) (val >> 8);
+ shortBytes[1] = (byte) val;
+ out.write(shortBytes, 0, 2);
+ return 2;
+ }
+
+ public float toFloat(byte[] bytes, int offset, int length) {
+ Preconditions.checkArgument(length == 4);
+
+ int val = ((bytes[offset] & 0x000000FF) << 24) +
+ ((bytes[offset + 1] & 0x000000FF) << 16) +
+ ((bytes[offset + 2] & 0x000000FF) << 8) +
+ (bytes[offset + 3] & 0x000000FF);
+ return Float.intBitsToFloat(val);
+ }
+
+ private byte[] floatBytes = new byte[4];
+
+ public int writeFloat(OutputStream out, float f) throws IOException {
+ int val = Float.floatToIntBits(f);
+
+ floatBytes[0] = (byte) (val >> 24);
+ floatBytes[1] = (byte) (val >> 16);
+ floatBytes[2] = (byte) (val >> 8);
+ floatBytes[3] = (byte) val;
+ out.write(floatBytes, 0, 4);
+ return floatBytes.length;
+ }
+
+ public double toDouble(byte[] bytes, int offset, int length) {
+ Preconditions.checkArgument(length == 8);
+ long val = ((long) (bytes[offset] & 0x00000000000000FF) << 56) +
+ ((long) (bytes[offset + 1] & 0x00000000000000FF) << 48) +
+ ((long) (bytes[offset + 2] & 0x00000000000000FF) << 40) +
+ ((long) (bytes[offset + 3] & 0x00000000000000FF) << 32) +
+ ((long) (bytes[offset + 4] & 0x00000000000000FF) << 24) +
+ ((long) (bytes[offset + 5] & 0x00000000000000FF) << 16) +
+ ((long) (bytes[offset + 6] & 0x00000000000000FF) << 8) +
+ (long) (bytes[offset + 7] & 0x00000000000000FF);
+ return Double.longBitsToDouble(val);
+ }
+
+ private byte[] doubleBytes = new byte[8];
+
+ public int writeDouble(OutputStream out, double d) throws IOException {
+ long val = Double.doubleToLongBits(d);
+
+ doubleBytes[0] = (byte) (val >> 56);
+ doubleBytes[1] = (byte) (val >> 48);
+ doubleBytes[2] = (byte) (val >> 40);
+ doubleBytes[3] = (byte) (val >> 32);
+ doubleBytes[4] = (byte) (val >> 24);
+ doubleBytes[5] = (byte) (val >> 16);
+ doubleBytes[6] = (byte) (val >> 8);
+ doubleBytes[7] = (byte) val;
+ out.write(doubleBytes, 0, 8);
+ return doubleBytes.length;
+ }
+
+ private byte[] vLongBytes = new byte[9];
+
+ public int writeVLongToByteArray(byte[] bytes, int offset, long l) {
+ if (l >= -112 && l <= 127) {
+ bytes[offset] = (byte) l;
+ return 1;
+ }
+
+ int len = -112;
+ if (l < 0) {
+ l ^= -1L; // take one's complement'
+ len = -120;
+ }
+
+ long tmp = l;
+ while (tmp != 0) {
+ tmp = tmp >> 8;
+ len--;
+ }
+
+ bytes[offset++] = (byte) len;
+ len = (len < -120) ? -(len + 120) : -(len + 112);
+
+ for (int idx = len; idx != 0; idx--) {
+ int shiftbits = (idx - 1) * 8;
+ bytes[offset++] = (byte) ((l & (0xFFL << shiftbits)) >> shiftbits);
+ }
+ return 1 + len;
+ }
+
+ public int writeVLong(OutputStream out, long l) throws IOException {
+ int len = writeVLongToByteArray(vLongBytes, 0, l);
+ out.write(vLongBytes, 0, len);
+ return len;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/2c53ccc1/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/CSVFile.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/CSVFile.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/CSVFile.java
index 6e0dc32..5d05d6f 100644
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/CSVFile.java
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/CSVFile.java
@@ -49,6 +49,7 @@ public class CSVFile {
public static final String DELIMITER = "csvfile.delimiter";
public static final String NULL = "csvfile.null"; //read only
+ public static final String SERDE = "csvfile.serde";
public static final String DELIMITER_DEFAULT = "|";
public static final byte LF = '\n';
public static int EOF = -1;
@@ -75,7 +76,7 @@ public class CSVFile {
private long pos = 0;
private NonSyncByteArrayOutputStream os = new NonSyncByteArrayOutputStream(BUFFER_SIZE);
- private TextSerializeDeserialize serializeDeserialize = new TextSerializeDeserialize();
+ private SerializerDeserializer serde;
public CSVAppender(Configuration conf, final Schema schema, final TableMeta meta, final Path path) throws IOException {
super(conf, schema, meta, path);
@@ -114,7 +115,7 @@ public class CSVFile {
fos = fs.create(compressedPath);
deflateFilter = codec.createOutputStream(fos, compressor);
- outputStream = new DataOutputStream(new BufferedOutputStream(deflateFilter));
+ outputStream = new DataOutputStream(deflateFilter);
} else {
if (fs.exists(path)) {
@@ -128,6 +129,14 @@ public class CSVFile {
this.stats = new TableStatistics(this.schema);
}
+ try {
+ String serdeClass = this.meta.getOption(SERDE, TextSerializerDeserializer.class.getName());
+ serde = (SerializerDeserializer) Class.forName(serdeClass).newInstance();
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ throw new IOException(e);
+ }
+
os.reset();
pos = fos.getPos();
bufferedBytes = 0;
@@ -142,7 +151,7 @@ public class CSVFile {
for (int i = 0; i < columnNum; i++) {
datum = tuple.get(i);
- rowBytes += serializeDeserialize.serialize(schema.getColumn(i), datum, os, nullChars);
+ rowBytes += serde.serialize(schema.getColumn(i), datum, os, nullChars);
if(columnNum - 1 > i){
os.write((byte) delimiter);
@@ -251,7 +260,7 @@ public class CSVFile {
}
}
- private final static int DEFAULT_BUFFER_SIZE = 128 * 1024;
+ private final static int DEFAULT_PAGE_SIZE = 256 * 1024;
private char delimiter;
private FileSystem fs;
private FSDataInputStream fis;
@@ -261,16 +270,17 @@ public class CSVFile {
private Decompressor decompressor;
private Seekable filePosition;
private boolean splittable = false;
- private long startOffset, length, end, pos;
- private int currentIdx = 0, validIdx = 0;
+ private long startOffset, end, pos;
+ private int currentIdx = 0, validIdx = 0, recordCount = 0;
private int[] targetColumnIndexes;
private boolean eof = false;
private final byte[] nullChars;
- private LineReader reader;
+ private SplitLineReader reader;
private ArrayList<Long> fileOffsets = new ArrayList<Long>();
private ArrayList<Integer> rowLengthList = new ArrayList<Integer>();
private ArrayList<Integer> startOffsets = new ArrayList<Integer>();
- private NonSyncByteArrayOutputStream buffer = new NonSyncByteArrayOutputStream(DEFAULT_BUFFER_SIZE);
+ private NonSyncByteArrayOutputStream buffer = new NonSyncByteArrayOutputStream(DEFAULT_PAGE_SIZE);
+ private SerializerDeserializer serde;
@Override
public void init() throws IOException {
@@ -281,32 +291,34 @@ public class CSVFile {
}
if(fis == null) fis = fs.open(fragment.getPath());
+ recordCount = 0;
pos = startOffset = fragment.getStartKey();
- length = fragment.getEndKey();
- end = startOffset + length;
- fis.seek(startOffset);
+ end = startOffset + fragment.getEndKey();
if (codec != null) {
decompressor = CodecPool.getDecompressor(codec);
if (codec instanceof SplittableCompressionCodec) {
SplitCompressionInputStream cIn = ((SplittableCompressionCodec) codec).createInputStream(
- fis, decompressor, startOffset, startOffset + length,
+ fis, decompressor, startOffset, end,
SplittableCompressionCodec.READ_MODE.BYBLOCK);
+ reader = new CompressedSplitLineReader(cIn, conf, null);
startOffset = cIn.getAdjustedStart();
- length = cIn.getAdjustedEnd() - startOffset;
+ end = cIn.getAdjustedEnd();
filePosition = cIn;
is = cIn;
} else {
is = new DataInputStream(codec.createInputStream(fis, decompressor));
+ reader = new SplitLineReader(is, null);
filePosition = fis;
}
} else {
+ fis.seek(startOffset);
filePosition = fis;
is = fis;
+ reader = new SplitLineReader(is, null);
}
- reader = new LineReader(is, DEFAULT_BUFFER_SIZE);
if (targets == null) {
targets = schema.toArray();
}
@@ -316,15 +328,24 @@ public class CSVFile {
targetColumnIndexes[i] = schema.getColumnIdByName(targets[i].getColumnName());
}
+ try {
+ String serdeClass = this.meta.getOption(SERDE, TextSerializerDeserializer.class.getName());
+ serde = (SerializerDeserializer) Class.forName(serdeClass).newInstance();
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ throw new IOException(e);
+ }
+
super.init();
Arrays.sort(targetColumnIndexes);
if (LOG.isDebugEnabled()) {
- LOG.debug("CSVScanner open:" + fragment.getPath() + "," + startOffset + "," + length +
+ LOG.debug("CSVScanner open:" + fragment.getPath() + "," + startOffset + "," + end +
"," + fs.getFileStatus(fragment.getPath()).getLen());
}
if (startOffset != 0) {
- pos += reader.readLine(new Text(), 0, maxBytesToConsume(pos));
+ startOffset += reader.readLine(new Text(), 0, maxBytesToConsume(startOffset));
+ pos = startOffset;
}
eof = false;
page();
@@ -362,12 +383,11 @@ public class CSVFile {
if(eof) return;
- while (DEFAULT_BUFFER_SIZE > bufferedSize){
+ while (DEFAULT_PAGE_SIZE >= bufferedSize){
int ret = reader.readDefaultLine(buffer, rowLengthList, Integer.MAX_VALUE, Integer.MAX_VALUE);
- if(ret <= 0){
- eof = true;
+ if(ret == 0){
break;
} else {
fileOffsets.add(pos);
@@ -376,9 +396,13 @@ public class CSVFile {
currentBufferPos += rowLengthList.get(rowLengthList.size() - 1);
bufferedSize += ret;
validIdx++;
+ recordCount++;
}
- if(isSplittable() && getFilePosition() > end) break;
+ if(getFilePosition() > end && !reader.needAdditionalRecordAfterSplit()){
+ eof = true;
+ break;
+ }
}
}
@@ -386,7 +410,7 @@ public class CSVFile {
public Tuple next() throws IOException {
try {
if (currentIdx == validIdx) {
- if (isSplittable() && fragmentable() <= 0) {
+ if (eof) {
return null;
} else {
page();
@@ -405,7 +429,7 @@ public class CSVFile {
byte[][] cells = Bytes.splitPreserveAllTokens(buffer.getData(), startOffsets.get(currentIdx),
rowLengthList.get(currentIdx), delimiter, targetColumnIndexes);
currentIdx++;
- return new LazyTuple(schema, cells, offset, nullChars);
+ return new LazyTuple(schema, cells, offset, nullChars, serde);
} catch (Throwable t) {
LOG.error("Tuple list length: " + (fileOffsets != null ? fileOffsets.size() : 0), t);
LOG.error("Tuple list current index: " + currentIdx, t);
@@ -420,7 +444,6 @@ public class CSVFile {
@Override
public void reset() throws IOException {
if (decompressor != null) {
- decompressor.reset();
CodecPool.returnDecompressor(decompressor);
decompressor = null;
}
@@ -431,11 +454,15 @@ public class CSVFile {
@Override
public void close() throws IOException {
try {
- IOUtils.cleanup(LOG, is, fis);
+ IOUtils.cleanup(LOG, reader, is, fis);
fs = null;
+ is = null;
+ fis = null;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("CSVScanner processed record:" + recordCount);
+ }
} finally {
if (decompressor != null) {
- decompressor.reset();
CodecPool.returnDecompressor(decompressor);
decompressor = null;
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/2c53ccc1/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/CompressedSplitLineReader.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/CompressedSplitLineReader.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/CompressedSplitLineReader.java
new file mode 100644
index 0000000..4f58e68
--- /dev/null
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/CompressedSplitLineReader.java
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.SplitCompressionInputStream;
+import org.apache.tajo.storage.rcfile.NonSyncByteArrayOutputStream;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+
+/**
+ * Line reader for compressed splits
+ *
+ * Reading records from a compressed split is tricky, as the
+ * LineRecordReader is using the reported compressed input stream
+ * position directly to determine when a split has ended. In addition the
+ * compressed input stream is usually faking the actual byte position, often
+ * updating it only after the first compressed block after the split is
+ * accessed.
+ *
+ * Depending upon where the last compressed block of the split ends relative
+ * to the record delimiters it can be easy to accidentally drop the last
+ * record or duplicate the last record between this split and the next.
+ *
+ * Split end scenarios:
+ *
+ * 1) Last block of split ends in the middle of a record
+ * Nothing special that needs to be done here, since the compressed input
+ * stream will report a position after the split end once the record
+ * is fully read. The consumer of the next split will discard the
+ * partial record at the start of the split normally, and no data is lost
+ * or duplicated between the splits.
+ *
+ * 2) Last block of split ends in the middle of a delimiter
+ * The line reader will continue to consume bytes into the next block to
+ * locate the end of the delimiter. If a custom delimiter is being used
+ * then the next record must be read by this split or it will be dropped.
+ * The consumer of the next split will not recognize the partial
+ * delimiter at the beginning of its split and will discard it along with
+ * the next record.
+ *
+ * However for the default delimiter processing there is a special case
+ * because CR, LF, and CRLF are all valid record delimiters. If the
+ * block ends with a CR then the reader must peek at the next byte to see
+ * if it is an LF and therefore part of the same record delimiter.
+ * Peeking at the next byte is an access to the next block and triggers
+ * the stream to report the end of the split. There are two cases based
+ * on the next byte:
+ *
+ * A) The next byte is LF
+ * The split needs to end after the current record is returned. The
+ * consumer of the next split will discard the first record, which
+ * is degenerate since LF is itself a delimiter, and start consuming
+ * records after that byte. If the current split tries to read
+ * another record then the record will be duplicated between splits.
+ *
+ * B) The next byte is not LF
+ * The current record will be returned but the stream will report
+ * the split has ended due to the peek into the next block. If the
+ * next record is not read then it will be lost, as the consumer of
+ * the next split will discard it before processing subsequent
+ * records. Therefore the next record beyond the reported split end
+ * must be consumed by this split to avoid data loss.
+ *
+ * 3) Last block of split ends at the beginning of a delimiter
+ * This is equivalent to case 1, as the reader will consume bytes into
+ * the next block and trigger the end of the split. No further records
+ * should be read as the consumer of the next split will discard the
+ * (degenerate) record at the beginning of its split.
+ *
+ * 4) Last block of split ends at the end of a delimiter
+ * Nothing special needs to be done here. The reader will not start
+ * examining the bytes into the next block until the next record is read,
+ * so the stream will not report the end of the split just yet. Once the
+ * next record is read then the next block will be accessed and the
+ * stream will indicate the end of the split. The consumer of the next
+ * split will correctly discard the first record of its split, and no
+ * data is lost or duplicated.
+ *
+ * If the default delimiter is used and the block ends at a CR then this
+ * is treated as case 2 since the reader does not yet know without
+ * looking at subsequent bytes whether the delimiter has ended.
+ *
+ * NOTE: It is assumed that compressed input streams *never* return bytes from
+ * multiple compressed blocks from a single read. Failure to do so will
+ * violate the buffering performed by this class, as it will access
+ * bytes into the next block after the split before returning all of the
+ * records from the previous block.
+ */
+
+public class CompressedSplitLineReader extends SplitLineReader {
+ SplitCompressionInputStream scin;
+ private boolean usingCRLF;
+ private boolean needAdditionalRecord = false;
+ private boolean finished = false;
+
+ public CompressedSplitLineReader(SplitCompressionInputStream in,
+ Configuration conf,
+ byte[] recordDelimiterBytes)
+ throws IOException {
+ super(in, conf, recordDelimiterBytes);
+ scin = in;
+ usingCRLF = (recordDelimiterBytes == null);
+ }
+
+ @Override
+ protected int fillBuffer(InputStream in, byte[] buffer, boolean inDelimiter)
+ throws IOException {
+ int bytesRead = in.read(buffer);
+
+ // If the split ended in the middle of a record delimiter then we need
+ // to read one additional record, as the consumer of the next split will
+ // not recognize the partial delimiter as a record.
+ // However if using the default delimiter and the next character is a
+ // linefeed then next split will treat it as a delimiter all by itself
+ // and the additional record read should not be performed.
+ if (inDelimiter && bytesRead > 0) {
+ if (usingCRLF) {
+ needAdditionalRecord = (buffer[0] != '\n');
+ } else {
+ needAdditionalRecord = true;
+ }
+ }
+ return bytesRead;
+ }
+
+ @Override
+ public int readLine(Text str, int maxLineLength, int maxBytesToConsume)
+ throws IOException {
+ int bytesRead = 0;
+ if (!finished) {
+ // only allow at most one more record to be read after the stream
+ // reports the split ended
+ if (scin.getPos() > scin.getAdjustedEnd()) {
+ finished = true;
+ }
+
+ bytesRead = super.readLine(str, maxLineLength, maxBytesToConsume);
+ }
+ return bytesRead;
+ }
+
+ @Override
+ public int readDefaultLine(NonSyncByteArrayOutputStream str, ArrayList<Integer> offsets, int maxLineLength
+ , int maxBytesToConsume) throws IOException {
+ int bytesRead = 0;
+ if (!finished) {
+ // only allow at most one more record to be read after the stream
+ // reports the split ended
+ if (scin.getPos() > scin.getAdjustedEnd()) {
+ finished = true;
+ }
+
+ bytesRead = super.readDefaultLine(str, offsets, maxLineLength, maxBytesToConsume);
+ }
+ return bytesRead;
+ }
+
+ @Override
+ public boolean needAdditionalRecordAfterSplit() {
+ return !finished && needAdditionalRecord;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/2c53ccc1/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/LazyTuple.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/LazyTuple.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/LazyTuple.java
index 9765d13..9967d7a 100644
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/LazyTuple.java
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/LazyTuple.java
@@ -31,18 +31,19 @@ public class LazyTuple implements Tuple {
private byte[][] textBytes;
private Schema schema;
private byte[] nullBytes;
- private static TextSerializeDeserialize serializeDeserialize = new TextSerializeDeserialize();
+ private SerializerDeserializer serializeDeserialize;
public LazyTuple(Schema schema, byte[][] textBytes, long offset) {
- this(schema, textBytes, offset, NullDatum.get().asTextBytes());
+ this(schema, textBytes, offset, NullDatum.get().asTextBytes(), new TextSerializerDeserializer());
}
- public LazyTuple(Schema schema, byte[][] textBytes, long offset, byte[] nullBytes) {
+ public LazyTuple(Schema schema, byte[][] textBytes, long offset, byte[] nullBytes, SerializerDeserializer serde) {
this.schema = schema;
this.textBytes = textBytes;
this.values = new Datum[schema.getColumnNum()];
this.offset = offset;
this.nullBytes = nullBytes;
+ this.serializeDeserialize = serde;
}
public LazyTuple(LazyTuple tuple) {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/2c53ccc1/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/LineReader.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/LineReader.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/LineReader.java
index f48c482..66c610a 100644
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/LineReader.java
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/LineReader.java
@@ -185,6 +185,10 @@ public class LineReader implements Closeable {
}
}
+ protected int fillBuffer(InputStream in, byte[] buffer, boolean inDelimiter)
+ throws IOException {
+ return in.read(buffer);
+ }
/**
* Read a line terminated by one of CR, LF, or CRLF.
*/
@@ -218,7 +222,7 @@ public class LineReader implements Closeable {
if (prevCharCR) {
++bytesConsumed; //account for CR from previous read
}
- bufferLength = in.read(buffer);
+ bufferLength = fillBuffer(in, buffer, prevCharCR);
if (bufferLength <= 0) {
break; // EOF
}
@@ -290,7 +294,7 @@ public class LineReader implements Closeable {
if (prevCharCR) {
++bytesConsumed; //account for CR from previous read
}
- bufferLength = in.read(buffer);
+ bufferLength = fillBuffer(in, buffer, prevCharCR);
if (bufferLength <= 0) {
break; // EOF
}
@@ -482,7 +486,7 @@ public class LineReader implements Closeable {
int startPosn = bufferPosn; // Start from previous end position
if (bufferPosn >= bufferLength) {
startPosn = bufferPosn = 0;
- bufferLength = in.read(buffer);
+ bufferLength = fillBuffer(in, buffer, ambiguousByteCount > 0);
if (bufferLength <= 0) {
str.append(recordDelimiterBytes, 0, ambiguousByteCount);
break; // EOF
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/2c53ccc1/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/SerializeDeserialize.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/SerializeDeserialize.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/SerializeDeserialize.java
deleted file mode 100644
index 53f3ad8..0000000
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/SerializeDeserialize.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.datum.Datum;
-
-import java.io.IOException;
-import java.io.OutputStream;
-
-
-public interface SerializeDeserialize {
-
- public int serialize(Column col, Datum datum, OutputStream out, byte[] nullCharacters) throws IOException;
-
- public Datum deserialize(Column col, byte[] bytes, int offset, int length, byte[] nullCharacters) throws IOException;
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/2c53ccc1/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/SerializerDeserializer.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/SerializerDeserializer.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/SerializerDeserializer.java
new file mode 100644
index 0000000..333f205
--- /dev/null
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/SerializerDeserializer.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.datum.Datum;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+
+public interface SerializerDeserializer {
+
+ public int serialize(Column col, Datum datum, OutputStream out, byte[] nullCharacters) throws IOException;
+
+ public Datum deserialize(Column col, byte[] bytes, int offset, int length, byte[] nullCharacters) throws IOException;
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/2c53ccc1/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/SplitLineReader.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/SplitLineReader.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/SplitLineReader.java
new file mode 100644
index 0000000..3579674
--- /dev/null
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/SplitLineReader.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+public class SplitLineReader extends LineReader {
+ public SplitLineReader(InputStream in, byte[] recordDelimiterBytes) {
+ super(in, recordDelimiterBytes);
+ }
+
+ public SplitLineReader(InputStream in, Configuration conf,
+ byte[] recordDelimiterBytes) throws IOException {
+ super(in, conf, recordDelimiterBytes);
+ }
+
+ public boolean needAdditionalRecordAfterSplit() {
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/2c53ccc1/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/TextSerializeDeserialize.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/TextSerializeDeserialize.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/TextSerializeDeserialize.java
deleted file mode 100644
index 6afd9b9..0000000
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/TextSerializeDeserialize.java
+++ /dev/null
@@ -1,204 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import com.google.protobuf.Message;
-import org.apache.commons.codec.binary.Base64;
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.common.TajoDataTypes;
-import org.apache.tajo.datum.*;
-import org.apache.tajo.datum.protobuf.ProtobufJsonFormat;
-import org.apache.tajo.util.Bytes;
-
-import java.io.IOException;
-import java.io.OutputStream;
-
-//Compatibility with Apache Hive
-public class TextSerializeDeserialize implements SerializeDeserialize {
- public static byte[] trueBytes = "true".getBytes();
- public static byte[] falseBytes = "false".getBytes();
- private ProtobufJsonFormat protobufJsonFormat = ProtobufJsonFormat.getInstance();
-
-
- @Override
- public int serialize(Column col, Datum datum, OutputStream out, byte[] nullCharacters) throws IOException {
-
- byte[] bytes;
- int length = 0;
- TajoDataTypes.DataType dataType = col.getDataType();
-
- if (datum == null || datum instanceof NullDatum) {
- switch (dataType.getType()) {
- case CHAR:
- case TEXT:
- length = nullCharacters.length;
- out.write(nullCharacters);
- break;
- default:
- break;
- }
- return length;
- }
-
- switch (dataType.getType()) {
- case BOOLEAN:
- out.write(datum.asBool() ? trueBytes : falseBytes);
- length = trueBytes.length;
- break;
- case CHAR:
- byte[] pad = new byte[dataType.getLength() - datum.size()];
- bytes = datum.asTextBytes();
- out.write(bytes);
- out.write(pad);
- length = bytes.length + pad.length;
- break;
- case TEXT:
- case BIT:
- case INT2:
- case INT4:
- case INT8:
- case FLOAT4:
- case FLOAT8:
- case INET4:
- case DATE:
- case TIMESTAMP:
- bytes = datum.asTextBytes();
- length = bytes.length;
- out.write(bytes);
- break;
- case INET6:
- case BLOB:
- bytes = Base64.encodeBase64(datum.asByteArray(), false);
- length = bytes.length;
- out.write(bytes, 0, length);
- break;
- case PROTOBUF:
- ProtobufDatum protobuf = (ProtobufDatum) datum;
- byte[] protoBytes = protobufJsonFormat.printToString(protobuf.get()).getBytes();
- length = protoBytes.length;
- out.write(protoBytes, 0, protoBytes.length);
- break;
- case NULL_TYPE:
- default:
- break;
- }
- return length;
- }
-
- @Override
- public Datum deserialize(Column col, byte[] bytes, int offset, int length, byte[] nullCharacters) throws IOException {
-
- Datum datum;
- switch (col.getDataType().getType()) {
- case BOOLEAN:
- datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get()
- : DatumFactory.createBool(bytes[offset] == 't' || bytes[offset] == 'T');
- break;
- case BIT:
- datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get()
- : DatumFactory.createBit(Byte.parseByte(new String(bytes, offset, length)));
- break;
- case CHAR:
- datum = isNullText(bytes, offset, length, nullCharacters) ? NullDatum.get()
- : DatumFactory.createChar(new String(bytes, offset, length).trim());
- break;
- case INT2:
- datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get()
- : DatumFactory.createInt2(new String(bytes, offset, length));
- break;
- case INT4:
- datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get()
- : DatumFactory.createInt4(new String(bytes, offset, length));
- break;
- case INT8:
- datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get()
- : DatumFactory.createInt8(new String(bytes, offset, length));
- break;
- case FLOAT4:
- datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get()
- : DatumFactory.createFloat4(new String(bytes, offset, length));
- break;
- case FLOAT8:
- datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get()
- : DatumFactory.createFloat8(new String(bytes, offset, length));
- break;
- case TEXT: {
- byte[] chars = new byte[length];
- System.arraycopy(bytes, offset, chars, 0, length);
- datum = isNullText(bytes, offset, length, nullCharacters) ? NullDatum.get()
- : DatumFactory.createText(chars);
- break;
- }
- case DATE:
- datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get()
- : DatumFactory.createDate(new String(bytes, offset, length));
- break;
- case TIMESTAMP:
- datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get()
- : DatumFactory.createTimeStamp(new String(bytes, offset, length));
- break;
- case PROTOBUF: {
- if (isNull(bytes, offset, length, nullCharacters)) {
- datum = NullDatum.get();
- } else {
- ProtobufDatumFactory factory = ProtobufDatumFactory.get(col.getDataType());
- Message.Builder builder = factory.newBuilder();
- try {
- byte[] protoBytes = new byte[length];
- System.arraycopy(bytes, offset, protoBytes, 0, length);
- protobufJsonFormat.merge(protoBytes, builder);
- datum = factory.createDatum(builder.build());
- } catch (IOException e) {
- e.printStackTrace();
- throw new RuntimeException(e);
- }
- }
- break;
- }
- case INET4:
- datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get()
- : DatumFactory.createInet4(new String(bytes, offset, length));
- break;
- case BLOB: {
- if (isNull(bytes, offset, length, nullCharacters)) {
- datum = NullDatum.get();
- } else {
- byte[] blob = new byte[length];
- System.arraycopy(bytes, offset, blob, 0, length);
- datum = DatumFactory.createBlob(Base64.decodeBase64(blob));
- }
- break;
- }
- default:
- datum = NullDatum.get();
- break;
- }
- return datum;
- }
-
- private static boolean isNull(byte[] val, int offset, int length, byte[] nullBytes) {
- return length == 0 || ((length == nullBytes.length)
- && Bytes.equals(val, offset, length, nullBytes, 0, nullBytes.length));
- }
-
- private static boolean isNullText(byte[] val, int offset, int length, byte[] nullBytes) {
- return length > 0 && length == nullBytes.length
- && Bytes.equals(val, offset, length, nullBytes, 0, nullBytes.length);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/2c53ccc1/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/TextSerializerDeserializer.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/TextSerializerDeserializer.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/TextSerializerDeserializer.java
new file mode 100644
index 0000000..b80b461
--- /dev/null
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/TextSerializerDeserializer.java
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import com.google.protobuf.Message;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.datum.*;
+import org.apache.tajo.datum.protobuf.ProtobufJsonFormat;
+import org.apache.tajo.util.Bytes;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+//Compatibility with Apache Hive
+public class TextSerializerDeserializer implements SerializerDeserializer {
+ public static byte[] trueBytes = "true".getBytes();
+ public static byte[] falseBytes = "false".getBytes();
+ private ProtobufJsonFormat protobufJsonFormat = ProtobufJsonFormat.getInstance();
+
+
+ @Override
+ public int serialize(Column col, Datum datum, OutputStream out, byte[] nullCharacters) throws IOException {
+
+ byte[] bytes;
+ int length = 0;
+ TajoDataTypes.DataType dataType = col.getDataType();
+
+ if (datum == null || datum instanceof NullDatum) {
+ switch (dataType.getType()) {
+ case CHAR:
+ case TEXT:
+ length = nullCharacters.length;
+ out.write(nullCharacters);
+ break;
+ default:
+ break;
+ }
+ return length;
+ }
+
+ switch (dataType.getType()) {
+ case BOOLEAN:
+ out.write(datum.asBool() ? trueBytes : falseBytes);
+ length = trueBytes.length;
+ break;
+ case CHAR:
+ byte[] pad = new byte[dataType.getLength() - datum.size()];
+ bytes = datum.asTextBytes();
+ out.write(bytes);
+ out.write(pad);
+ length = bytes.length + pad.length;
+ break;
+ case TEXT:
+ case BIT:
+ case INT2:
+ case INT4:
+ case INT8:
+ case FLOAT4:
+ case FLOAT8:
+ case INET4:
+ case DATE:
+ case TIMESTAMP:
+ bytes = datum.asTextBytes();
+ length = bytes.length;
+ out.write(bytes);
+ break;
+ case INET6:
+ case BLOB:
+ bytes = Base64.encodeBase64(datum.asByteArray(), false);
+ length = bytes.length;
+ out.write(bytes, 0, length);
+ break;
+ case PROTOBUF:
+ ProtobufDatum protobuf = (ProtobufDatum) datum;
+ byte[] protoBytes = protobufJsonFormat.printToString(protobuf.get()).getBytes();
+ length = protoBytes.length;
+ out.write(protoBytes, 0, protoBytes.length);
+ break;
+ case NULL_TYPE:
+ default:
+ break;
+ }
+ return length;
+ }
+
+ @Override
+ public Datum deserialize(Column col, byte[] bytes, int offset, int length, byte[] nullCharacters) throws IOException {
+
+ Datum datum;
+ switch (col.getDataType().getType()) {
+ case BOOLEAN:
+ datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get()
+ : DatumFactory.createBool(bytes[offset] == 't' || bytes[offset] == 'T');
+ break;
+ case BIT:
+ datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get()
+ : DatumFactory.createBit(Byte.parseByte(new String(bytes, offset, length)));
+ break;
+ case CHAR:
+ datum = isNullText(bytes, offset, length, nullCharacters) ? NullDatum.get()
+ : DatumFactory.createChar(new String(bytes, offset, length).trim());
+ break;
+ case INT2:
+ datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get()
+ : DatumFactory.createInt2(new String(bytes, offset, length));
+ break;
+ case INT4:
+ datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get()
+ : DatumFactory.createInt4(new String(bytes, offset, length));
+ break;
+ case INT8:
+ datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get()
+ : DatumFactory.createInt8(new String(bytes, offset, length));
+ break;
+ case FLOAT4:
+ datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get()
+ : DatumFactory.createFloat4(new String(bytes, offset, length));
+ break;
+ case FLOAT8:
+ datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get()
+ : DatumFactory.createFloat8(new String(bytes, offset, length));
+ break;
+ case TEXT: {
+ byte[] chars = new byte[length];
+ System.arraycopy(bytes, offset, chars, 0, length);
+ datum = isNullText(bytes, offset, length, nullCharacters) ? NullDatum.get()
+ : DatumFactory.createText(chars);
+ break;
+ }
+ case DATE:
+ datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get()
+ : DatumFactory.createDate(new String(bytes, offset, length));
+ break;
+ case TIMESTAMP:
+ datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get()
+ : DatumFactory.createTimeStamp(new String(bytes, offset, length));
+ break;
+ case PROTOBUF: {
+ if (isNull(bytes, offset, length, nullCharacters)) {
+ datum = NullDatum.get();
+ } else {
+ ProtobufDatumFactory factory = ProtobufDatumFactory.get(col.getDataType());
+ Message.Builder builder = factory.newBuilder();
+ try {
+ byte[] protoBytes = new byte[length];
+ System.arraycopy(bytes, offset, protoBytes, 0, length);
+ protobufJsonFormat.merge(protoBytes, builder);
+ datum = factory.createDatum(builder.build());
+ } catch (IOException e) {
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
+ }
+ break;
+ }
+ case INET4:
+ datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get()
+ : DatumFactory.createInet4(new String(bytes, offset, length));
+ break;
+ case BLOB: {
+ if (isNull(bytes, offset, length, nullCharacters)) {
+ datum = NullDatum.get();
+ } else {
+ byte[] blob = new byte[length];
+ System.arraycopy(bytes, offset, blob, 0, length);
+ datum = DatumFactory.createBlob(Base64.decodeBase64(blob));
+ }
+ break;
+ }
+ default:
+ datum = NullDatum.get();
+ break;
+ }
+ return datum;
+ }
+
+ private static boolean isNull(byte[] val, int offset, int length, byte[] nullBytes) {
+ return length == 0 || ((length == nullBytes.length)
+ && Bytes.equals(val, offset, length, nullBytes, 0, nullBytes.length));
+ }
+
+ private static boolean isNullText(byte[] val, int offset, int length, byte[] nullBytes) {
+ return length > 0 && length == nullBytes.length
+ && Bytes.equals(val, offset, length, nullBytes, 0, nullBytes.length);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/2c53ccc1/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java
index 74e2192..9fe0bce 100644
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java
@@ -590,7 +590,7 @@ public class RCFile {
private ColumnBuffer[] columnBuffers = null;
boolean useNewMagic = true;
private byte[] nullChars;
- SerializeDeserialize serde;
+ SerializerDeserializer serde;
// Insert a globally unique 16-byte value every few entries, so that one
// can seek into the middle of a file and then synchronize with record
@@ -742,9 +742,9 @@ public class RCFile {
metadata.set(new Text(COLUMN_NUMBER_METADATA_STR), new Text("" + columnNumber));
- String serdeClass = this.meta.getOption(SERDE, BinarySerializeDeserialize.class.getName());
+ String serdeClass = this.meta.getOption(SERDE, BinarySerializerDeserializer.class.getName());
try {
- serde = (SerializeDeserialize) Class.forName(serdeClass).newInstance();
+ serde = (SerializerDeserializer) Class.forName(serdeClass).newInstance();
} catch (Exception e) {
LOG.error(e.getMessage(), e);
throw new IOException(e);
@@ -1152,7 +1152,7 @@ public class RCFile {
private LongWritable rowId;
private byte[] nullChars;
- private SerializeDeserialize serde;
+ private SerializerDeserializer serde;
public RCFileScanner(Configuration conf, final Schema schema, final TableMeta meta,
final FileFragment fragment) throws IOException {
@@ -1347,9 +1347,9 @@ public class RCFile {
if(text != null && !text.toString().isEmpty()){
serdeClass = text.toString();
} else{
- serdeClass = this.meta.getOption(SERDE, BinarySerializeDeserialize.class.getName());
+ serdeClass = this.meta.getOption(SERDE, BinarySerializerDeserializer.class.getName());
}
- serde = (SerializeDeserialize) Class.forName(serdeClass).newInstance();
+ serde = (SerializerDeserializer) Class.forName(serdeClass).newInstance();
} catch (Exception e) {
LOG.error(e.getMessage(), e);
throw new IOException(e);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/2c53ccc1/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/RCFileScanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/RCFileScanner.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/RCFileScanner.java
index 0b63d20..b7a5929 100644
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/RCFileScanner.java
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/RCFileScanner.java
@@ -30,13 +30,10 @@ import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.TableMeta;
import org.apache.tajo.datum.Datum;
import org.apache.tajo.datum.NullDatum;
-import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.storage.*;
import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.storage.BinarySerializeDeserialize;
import org.apache.tajo.storage.rcfile.BytesRefArrayWritable;
import org.apache.tajo.storage.rcfile.ColumnProjectionUtils;
-import org.apache.tajo.storage.SerializeDeserialize;
import org.apache.tajo.util.TUtil;
import java.io.IOException;
@@ -57,7 +54,7 @@ public class RCFileScanner extends FileScannerV2 {
private ScheduledInputStream sin;
private boolean first = true;
private int maxBytesPerSchedule;
- SerializeDeserialize serde;
+ SerializerDeserializer serde;
byte[] nullChars;
public RCFileScanner(final Configuration conf, final Schema schema, final TableMeta meta, final FileFragment fragment)
@@ -206,9 +203,9 @@ public class RCFileScanner extends FileScannerV2 {
if(text != null && !text.toString().isEmpty()){
serdeClass = text.toString();
} else{
- serdeClass = this.meta.getOption(SERDE, BinarySerializeDeserialize.class.getName());
+ serdeClass = this.meta.getOption(SERDE, BinarySerializerDeserializer.class.getName());
}
- serde = (SerializeDeserialize) Class.forName(serdeClass).newInstance();
+ serde = (SerializerDeserializer) Class.forName(serdeClass).newInstance();
} catch (Exception e) {
LOG.error(e.getMessage(), e);
throw new IOException(e);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/2c53ccc1/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java b/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
index 08a181a..bec1556 100644
--- a/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
+++ b/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
@@ -178,7 +178,7 @@ public class TestCompressionStorages {
TableMeta meta = CatalogUtil.newTableMeta(storeType);
meta.putOption("compression.codec", codec.getCanonicalName());
- meta.putOption("rcfile.serde", "org.apache.tajo.storage.TextSerializeDeserialize");
+ meta.putOption("rcfile.serde", TextSerializerDeserializer.class.getName());
String fileName = "Compression_" + codec.getSimpleName();
Path tablePath = new Path(testDir, fileName);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/2c53ccc1/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestLazyTuple.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestLazyTuple.java b/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestLazyTuple.java
index 811a191..fecbe89 100644
--- a/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestLazyTuple.java
+++ b/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestLazyTuple.java
@@ -34,6 +34,7 @@ public class TestLazyTuple {
Schema schema;
byte[][] textRow;
byte[] nullbytes;
+ SerializerDeserializer serde;
@Before
public void setUp() {
@@ -69,12 +70,13 @@ public class TestLazyTuple {
sb.append(new String(nullbytes)).append('|');
sb.append(NullDatum.get());
textRow = Bytes.splitPreserveAllTokens(sb.toString().getBytes(), '|');
+ serde = new TextSerializerDeserializer();
}
@Test
public void testGetDatum() {
- LazyTuple t1 = new LazyTuple(schema, textRow, -1, nullbytes);
+ LazyTuple t1 = new LazyTuple(schema, textRow, -1, nullbytes, serde);
assertEquals(DatumFactory.createBool(true), t1.get(0));
assertEquals(DatumFactory.createBit((byte) 0x99), t1.get(1));
assertEquals(DatumFactory.createChar("str"), t1.get(2));
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/2c53ccc1/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestStorages.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestStorages.java b/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestStorages.java
index 2faae7f..16b370c 100644
--- a/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestStorages.java
+++ b/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestStorages.java
@@ -268,7 +268,7 @@ public class TestStorages {
Options options = new Options();
TableMeta meta = CatalogUtil.newTableMeta(storeType, options);
- meta.putOption(RCFile.SERDE, TextSerializeDeserialize.class.getName());
+ meta.putOption(RCFile.SERDE, TextSerializerDeserializer.class.getName());
Path tablePath = new Path(testDir, "testVariousTypes.data");
Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, tablePath);
@@ -331,7 +331,7 @@ public class TestStorages {
Options options = new Options();
TableMeta meta = CatalogUtil.newTableMeta(storeType, options);
- meta.putOption(RCFile.SERDE, BinarySerializeDeserialize.class.getName());
+ meta.putOption(RCFile.SERDE, BinarySerializerDeserializer.class.getName());
Path tablePath = new Path(testDir, "testVariousTypes.data");
Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, tablePath);