You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2015/06/16 00:42:24 UTC
[02/13] tajo git commit: TAJO-1450: Encapsulate Datum in Tuple.
http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BaseTupleComparator.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BaseTupleComparator.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BaseTupleComparator.java
index cba9ee0..b25b2fd 100644
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BaseTupleComparator.java
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BaseTupleComparator.java
@@ -110,8 +110,8 @@ public class BaseTupleComparator extends TupleComparator implements ProtoObject<
@Override
public int compare(Tuple tuple1, Tuple tuple2) {
for (int i = 0; i < sortKeyIds.length; i++) {
- left = tuple1.get(sortKeyIds[i]);
- right = tuple2.get(sortKeyIds[i]);
+ left = tuple1.asDatum(sortKeyIds[i]);
+ right = tuple2.asDatum(sortKeyIds[i]);
if (left.isNull() || right.isNull()) {
if (!left.equals(right)) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BinarySerializerDeserializer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BinarySerializerDeserializer.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BinarySerializerDeserializer.java
index a3b8da8..2cccb69 100644
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BinarySerializerDeserializer.java
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BinarySerializerDeserializer.java
@@ -21,6 +21,7 @@ package org.apache.tajo.storage;
import com.google.common.base.Preconditions;
import com.google.protobuf.Message;
import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
import org.apache.tajo.datum.*;
import org.apache.tajo.exception.ValueTooLongForTypeCharactersException;
import org.apache.tajo.util.Bytes;
@@ -33,50 +34,58 @@ public class BinarySerializerDeserializer implements SerializerDeserializer {
static final byte[] INVALID_UTF__SINGLE_BYTE = {(byte) Integer.parseInt("10111111", 2)};
+ private Schema schema;
+
@Override
- public int serialize(Column col, Datum datum, OutputStream out, byte[] nullCharacters)
+ public void init(Schema schema) {
+ this.schema = schema;
+ }
+
+ @Override
+ public int serialize(int index, Tuple tuple, OutputStream out, byte[] nullCharacters)
throws IOException {
- byte[] bytes;
- int length = 0;
- if (datum == null || datum instanceof NullDatum) {
+ if (tuple.isBlankOrNull(index)) {
return 0;
}
- switch (col.getDataType().getType()) {
+ byte[] bytes;
+ int length = 0;
+ Column column = schema.getColumn(index);
+ switch (column.getDataType().getType()) {
case BOOLEAN:
case BIT:
- bytes = datum.asByteArray();
+ bytes = tuple.getBytes(index);
length = bytes.length;
out.write(bytes, 0, length);
break;
case CHAR:
- bytes = datum.asByteArray();
+ bytes = tuple.getBytes(index);
length = bytes.length;
- if (length > col.getDataType().getLength()) {
- throw new ValueTooLongForTypeCharactersException(col.getDataType().getLength());
+ if (length > column.getDataType().getLength()) {
+ throw new ValueTooLongForTypeCharactersException(column.getDataType().getLength());
}
out.write(bytes, 0, length);
break;
case INT2:
- length = writeShort(out, datum.asInt2());
+ length = writeShort(out, tuple.getInt2(index));
break;
case INT4:
- length = writeVLong(out, datum.asInt4());
+ length = writeVLong(out, tuple.getInt4(index));
break;
case INT8:
- length = writeVLong(out, datum.asInt8());
+ length = writeVLong(out, tuple.getInt8(index));
break;
case FLOAT4:
- length = writeFloat(out, datum.asFloat4());
+ length = writeFloat(out, tuple.getFloat4(index));
break;
case FLOAT8:
- length = writeDouble(out, datum.asFloat8());
+ length = writeDouble(out, tuple.getFloat8(index));
break;
case TEXT: {
- bytes = datum.asTextBytes();
- length = datum.size();
+ bytes = tuple.getTextBytes(index);
+ length = bytes.length;
if (length == 0) {
bytes = INVALID_UTF__SINGLE_BYTE;
length = INVALID_UTF__SINGLE_BYTE.length;
@@ -87,12 +96,12 @@ public class BinarySerializerDeserializer implements SerializerDeserializer {
case BLOB:
case INET4:
case INET6:
- bytes = datum.asByteArray();
+ bytes = tuple.getBytes(index);
length = bytes.length;
out.write(bytes, 0, length);
break;
case PROTOBUF:
- ProtobufDatum protobufDatum = (ProtobufDatum) datum;
+ ProtobufDatum protobufDatum = (ProtobufDatum) tuple.getProtobufDatum(index);
bytes = protobufDatum.asByteArray();
length = bytes.length;
out.write(bytes, 0, length);
@@ -106,11 +115,12 @@ public class BinarySerializerDeserializer implements SerializerDeserializer {
}
@Override
- public Datum deserialize(Column col, byte[] bytes, int offset, int length, byte[] nullCharacters) throws IOException {
+ public Datum deserialize(int index, byte[] bytes, int offset, int length, byte[] nullCharacters) throws IOException {
if (length == 0) return NullDatum.get();
Datum datum;
- switch (col.getDataType().getType()) {
+ Column column = schema.getColumn(index);
+ switch (column.getDataType().getType()) {
case BOOLEAN:
datum = DatumFactory.createBool(bytes[offset]);
break;
@@ -150,7 +160,7 @@ public class BinarySerializerDeserializer implements SerializerDeserializer {
break;
}
case PROTOBUF: {
- ProtobufDatumFactory factory = ProtobufDatumFactory.get(col.getDataType().getCode());
+ ProtobufDatumFactory factory = ProtobufDatumFactory.get(column.getDataType().getCode());
Message.Builder builder = factory.newBuilder();
builder.mergeFrom(bytes, offset, length);
datum = factory.createDatum(builder);
http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/FrameTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/FrameTuple.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/FrameTuple.java
index a5561ed..ed53832 100644
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/FrameTuple.java
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/FrameTuple.java
@@ -22,10 +22,12 @@
package org.apache.tajo.storage;
import com.google.common.base.Preconditions;
+import org.apache.tajo.common.TajoDataTypes;
import org.apache.tajo.datum.Datum;
import org.apache.tajo.datum.IntervalDatum;
import org.apache.tajo.datum.ProtobufDatum;
import org.apache.tajo.exception.UnsupportedException;
+import org.apache.tajo.util.datetime.TimeMeta;
/**
* An instance of FrameTuple is an immutable tuple.
@@ -82,13 +84,19 @@ public class FrameTuple implements Tuple, Cloneable {
}
@Override
- public boolean isNull(int fieldid) {
- return get(fieldid).isNull();
+ public boolean isBlank(int fieldid) {
+ return asDatum(fieldid) == null;
}
@Override
- public boolean isNotNull(int fieldid) {
- return !isNull(fieldid);
+ public boolean isBlankOrNull(int fieldid) {
+ Datum datum = asDatum(fieldid);
+ return datum == null || datum.isNull();
+ }
+
+ @Override
+ public void put(int fieldId, Tuple tuple) {
+ throw new UnsupportedException();
}
@Override
@@ -102,13 +110,18 @@ public class FrameTuple implements Tuple, Cloneable {
}
@Override
- public void put(int fieldId, Datum[] values) {
+ public void put(Datum[] values) {
throw new UnsupportedException();
}
@Override
- public void put(int fieldId, Tuple tuple) {
- throw new UnsupportedException();
+ public TajoDataTypes.Type type(int fieldId) {
+ return null;
+ }
+
+ @Override
+ public int size(int fieldId) {
+ return 0;
}
@Override
@@ -122,85 +135,90 @@ public class FrameTuple implements Tuple, Cloneable {
}
@Override
- public void put(Datum [] values) {
- throw new UnsupportedException();
- }
-
- @Override
- public Datum get(int fieldId) {
+ public Datum asDatum(int fieldId) {
Preconditions.checkArgument(fieldId < size,
"Out of field access: " + fieldId);
if (fieldId < leftSize) {
- return left.get(fieldId);
+ return left.asDatum(fieldId);
} else {
- return right.get(fieldId - leftSize);
+ return right.asDatum(fieldId - leftSize);
}
}
@Override
public boolean getBool(int fieldId) {
- return get(fieldId).asBool();
+ return asDatum(fieldId).asBool();
}
@Override
public byte getByte(int fieldId) {
- return get(fieldId).asByte();
+ return asDatum(fieldId).asByte();
}
@Override
public char getChar(int fieldId) {
- return get(fieldId).asChar();
+ return asDatum(fieldId).asChar();
}
@Override
public byte [] getBytes(int fieldId) {
- return get(fieldId).asByteArray();
+ return asDatum(fieldId).asByteArray();
+ }
+
+ @Override
+ public byte[] getTextBytes(int fieldId) {
+ return asDatum(fieldId).asTextBytes();
}
@Override
public short getInt2(int fieldId) {
- return get(fieldId).asInt2();
+ return asDatum(fieldId).asInt2();
}
@Override
public int getInt4(int fieldId) {
- return get(fieldId).asInt4();
+ return asDatum(fieldId).asInt4();
}
@Override
public long getInt8(int fieldId) {
- return get(fieldId).asInt8();
+ return asDatum(fieldId).asInt8();
}
@Override
public float getFloat4(int fieldId) {
- return get(fieldId).asFloat4();
+ return asDatum(fieldId).asFloat4();
}
@Override
public double getFloat8(int fieldId) {
- return get(fieldId).asFloat8();
+ return asDatum(fieldId).asFloat8();
}
@Override
public String getText(int fieldId) {
- return get(fieldId).asChars();
+ return asDatum(fieldId).asChars();
+ }
+
+ @Override
+ public TimeMeta getTimeDate(int fieldId) {
+ return null;
}
@Override
public ProtobufDatum getProtobufDatum(int fieldId) {
- return (ProtobufDatum) get(fieldId);
+ return (ProtobufDatum) asDatum(fieldId);
}
@Override
public IntervalDatum getInterval(int fieldId) {
- return (IntervalDatum) get(fieldId);
+ return (IntervalDatum) asDatum(fieldId);
}
@Override
public char [] getUnicodeChars(int fieldId) {
- return get(fieldId).asUnicodeChars();
+ return asDatum(fieldId).asUnicodeChars();
}
@Override
@@ -228,7 +246,7 @@ public class FrameTuple implements Tuple, Cloneable {
}
str.append(i)
.append("=>")
- .append(get(i));
+ .append(asDatum(i));
}
}
str.append(")");
http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/LazyTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/LazyTuple.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/LazyTuple.java
index bfbe478..7bfc166 100644
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/LazyTuple.java
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/LazyTuple.java
@@ -19,11 +19,13 @@
package org.apache.tajo.storage;
import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.common.TajoDataTypes;
import org.apache.tajo.datum.Datum;
import org.apache.tajo.datum.IntervalDatum;
import org.apache.tajo.datum.NullDatum;
import org.apache.tajo.datum.ProtobufDatum;
import org.apache.tajo.exception.UnsupportedException;
+import org.apache.tajo.util.datetime.TimeMeta;
import java.util.Arrays;
@@ -36,7 +38,7 @@ public class LazyTuple implements Tuple, Cloneable {
private SerializerDeserializer serializeDeserialize;
public LazyTuple(Schema schema, byte[][] textBytes, long offset) {
- this(schema, textBytes, offset, NullDatum.get().asTextBytes(), new TextSerializerDeserializer());
+ this(schema, textBytes, offset, NullDatum.get().asTextBytes(), new TextSerializerDeserializer(schema));
}
public LazyTuple(Schema schema, byte[][] textBytes, long offset, byte[] nullBytes, SerializerDeserializer serde) {
@@ -68,13 +70,19 @@ public class LazyTuple implements Tuple, Cloneable {
}
@Override
- public boolean isNull(int fieldid) {
- return get(fieldid).isNull();
+ public boolean isBlank(int fieldid) {
+ return get(fieldid) == null;
}
@Override
- public boolean isNotNull(int fieldid) {
- return !isNull(fieldid);
+ public boolean isBlankOrNull(int fieldid) {
+ Datum datum = get(fieldid);
+ return datum == null || datum.isNull();
+ }
+
+ @Override
+ public void put(int fieldId, Tuple tuple) {
+ this.put(fieldId, tuple.asDatum(fieldId));
}
@Override
@@ -95,31 +103,28 @@ public class LazyTuple implements Tuple, Cloneable {
}
@Override
- public void put(int fieldId, Datum[] values) {
- for (int i = fieldId, j = 0; j < values.length; i++, j++) {
- this.values[i] = values[j];
- }
- this.textBytes = new byte[values.length][];
+ public void put(Datum[] values) {
+ System.arraycopy(values, 0, this.values, 0, this.values.length);
}
@Override
- public void put(int fieldId, Tuple tuple) {
- for (int i = fieldId, j = 0; j < tuple.size(); i++, j++) {
- values[i] = tuple.get(j);
- textBytes[i] = null;
- }
+ public Datum asDatum(int fieldId) {
+ return get(fieldId);
}
@Override
- public void put(Datum[] values) {
- System.arraycopy(values, 0, this.values, 0, size());
- this.textBytes = new byte[values.length][];
+ public TajoDataTypes.Type type(int fieldId) {
+ return get(fieldId).type();
+ }
+
+ @Override
+ public int size(int fieldId) {
+ return get(fieldId).size();
}
//////////////////////////////////////////////////////
// Getter
//////////////////////////////////////////////////////
- @Override
public Datum get(int fieldId) {
if (values[fieldId] != null)
return values[fieldId];
@@ -127,7 +132,7 @@ public class LazyTuple implements Tuple, Cloneable {
values[fieldId] = NullDatum.get(); // split error. (col : 3, separator: ',', row text: "a,")
} else if (textBytes[fieldId] != null) {
try {
- values[fieldId] = serializeDeserialize.deserialize(schema.getColumn(fieldId),
+ values[fieldId] = serializeDeserialize.deserialize(fieldId,
textBytes[fieldId], 0, textBytes[fieldId].length, nullBytes);
} catch (Exception e) {
values[fieldId] = NullDatum.get();
@@ -170,6 +175,11 @@ public class LazyTuple implements Tuple, Cloneable {
}
@Override
+ public byte[] getTextBytes(int fieldId) {
+ return get(fieldId).asTextBytes();
+ }
+
+ @Override
public short getInt2(int fieldId) {
return get(fieldId).asInt2();
}
@@ -200,6 +210,11 @@ public class LazyTuple implements Tuple, Cloneable {
}
@Override
+ public TimeMeta getTimeDate(int fieldId) {
+ return get(fieldId).asTimeMeta();
+ }
+
+ @Override
public ProtobufDatum getProtobufDatum(int fieldId) {
throw new UnsupportedException();
}
@@ -214,7 +229,9 @@ public class LazyTuple implements Tuple, Cloneable {
return get(fieldId).asUnicodeChars();
}
+ @Override
public String toString() {
+ // todo this changes internal state, which causes funny result in GUI debugging
boolean first = true;
StringBuilder str = new StringBuilder();
str.append("(");
http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/RowStoreUtil.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/RowStoreUtil.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/RowStoreUtil.java
index 256bc78..6643d45 100644
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/RowStoreUtil.java
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/RowStoreUtil.java
@@ -47,7 +47,7 @@ public class RowStoreUtil {
public static Tuple project(Tuple in, Tuple out, int[] targetIds) {
out.clear();
for (int idx = 0; idx < targetIds.length; idx++) {
- out.put(idx, in.get(targetIds[idx]));
+ out.put(idx, in.asDatum(targetIds[idx]));
}
return out;
}
@@ -76,7 +76,7 @@ public class RowStoreUtil {
public Tuple toTuple(byte [] bytes) {
nullFlags.clear();
ByteBuffer bb = ByteBuffer.wrap(bytes);
- Tuple tuple = new VTuple(schema.size());
+ VTuple tuple = new VTuple(schema.size());
Column col;
TajoDataTypes.DataType type;
@@ -189,7 +189,7 @@ public class RowStoreUtil {
bb.position(headerSize);
Column col;
for (int i = 0; i < schema.size(); i++) {
- if (tuple.isNull(i)) {
+ if (tuple.isBlankOrNull(i)) {
nullFlags.set(i);
continue;
}
@@ -200,15 +200,15 @@ public class RowStoreUtil {
nullFlags.set(i);
break;
case BOOLEAN:
- bb.put(tuple.get(i).asByte());
+ bb.put(tuple.getByte(i));
break;
case BIT:
- bb.put(tuple.get(i).asByte());
+ bb.put(tuple.getByte(i));
break;
case CHAR:
int charSize = col.getDataType().getLength();
byte [] _char = new byte[charSize];
- byte [] src = tuple.get(i).asByteArray();
+ byte [] src = tuple.getBytes(i);
if (charSize < src.length) {
throw new ValueTooLongForTypeCharactersException(charSize);
}
@@ -217,48 +217,48 @@ public class RowStoreUtil {
bb.put(_char);
break;
case INT2:
- bb.putShort(tuple.get(i).asInt2());
+ bb.putShort(tuple.getInt2(i));
break;
case INT4:
- bb.putInt(tuple.get(i).asInt4());
+ bb.putInt(tuple.getInt4(i));
break;
case INT8:
- bb.putLong(tuple.get(i).asInt8());
+ bb.putLong(tuple.getInt8(i));
break;
case FLOAT4:
- bb.putFloat(tuple.get(i).asFloat4());
+ bb.putFloat(tuple.getFloat4(i));
break;
case FLOAT8:
- bb.putDouble(tuple.get(i).asFloat8());
+ bb.putDouble(tuple.getFloat8(i));
break;
case TEXT:
- byte[] _string = tuple.get(i).asByteArray();
+ byte[] _string = tuple.getBytes(i);
bb.putInt(_string.length);
bb.put(_string);
break;
case DATE:
- bb.putInt(tuple.get(i).asInt4());
+ bb.putInt(tuple.getInt4(i));
break;
case TIME:
case TIMESTAMP:
- bb.putLong(tuple.get(i).asInt8());
+ bb.putLong(tuple.getInt8(i));
break;
case INTERVAL:
- IntervalDatum interval = (IntervalDatum) tuple.get(i);
+ IntervalDatum interval = (IntervalDatum) tuple.getInterval(i);
bb.putInt(interval.getMonths());
bb.putLong(interval.getMilliSeconds());
break;
case BLOB:
- byte[] bytes = tuple.get(i).asByteArray();
+ byte[] bytes = tuple.getBytes(i);
bb.putInt(bytes.length);
bb.put(bytes);
break;
case INET4:
- byte[] ipBytes = tuple.get(i).asByteArray();
+ byte[] ipBytes = tuple.getBytes(i);
bb.put(ipBytes);
break;
case INET6:
- bb.put(tuple.get(i).asByteArray());
+ bb.put(tuple.getBytes(i));
break;
default:
throw new RuntimeException(new UnknownDataTypeException(col.getDataType().getType().name()));
@@ -283,7 +283,7 @@ public class RowStoreUtil {
Column col;
for (int i = 0; i < schema.size(); i++) {
- if (tuple.isNull(i)) {
+ if (tuple.isBlankOrNull(i)) {
continue;
}
@@ -315,11 +315,11 @@ public class RowStoreUtil {
break;
case TEXT:
case BLOB:
- size += (4 + tuple.get(i).asByteArray().length);
+ size += (4 + tuple.getBytes(i).length);
break;
case INET4:
case INET6:
- size += tuple.get(i).asByteArray().length;
+ size += tuple.getBytes(i).length;
break;
default:
throw new RuntimeException(new UnknownDataTypeException(col.getDataType().getType().name()));
@@ -340,7 +340,7 @@ public class RowStoreUtil {
writer.startRow();
for (int i = 0; i < writer.dataTypes().length; i++) {
- if (tuple.isNull(i)) {
+ if (tuple.isBlankOrNull(i)) {
writer.skipField();
continue;
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/SerializerDeserializer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/SerializerDeserializer.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/SerializerDeserializer.java
index 564a9f5..44cd214 100644
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/SerializerDeserializer.java
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/SerializerDeserializer.java
@@ -19,6 +19,7 @@
package org.apache.tajo.storage;
import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
import org.apache.tajo.datum.Datum;
import java.io.IOException;
@@ -27,8 +28,10 @@ import java.io.OutputStream;
@Deprecated
public interface SerializerDeserializer {
- public int serialize(Column col, Datum datum, OutputStream out, byte[] nullCharacters) throws IOException;
+ public void init(Schema schema);
- public Datum deserialize(Column col, byte[] bytes, int offset, int length, byte[] nullCharacters) throws IOException;
+ public int serialize(int index, Tuple tuple, OutputStream out, byte[] nullCharacters) throws IOException;
+
+ public Datum deserialize(int index, byte[] bytes, int offset, int length, byte[] nullCharacters) throws IOException;
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableStatistics.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableStatistics.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableStatistics.java
index a2c08de..c101b0b 100644
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableStatistics.java
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableStatistics.java
@@ -20,13 +20,13 @@ package org.apache.tajo.storage;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.catalog.Column;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.statistics.ColumnStats;
import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.common.TajoDataTypes.DataType;
import org.apache.tajo.common.TajoDataTypes.Type;
import org.apache.tajo.datum.Datum;
-import org.apache.tajo.datum.NullDatum;
/**
* This class is not thread-safe.
@@ -34,8 +34,8 @@ import org.apache.tajo.datum.NullDatum;
public class TableStatistics {
private static final Log LOG = LogFactory.getLog(TableStatistics.class);
private Schema schema;
- private Tuple minValues;
- private Tuple maxValues;
+ private VTuple minValues;
+ private VTuple maxValues;
private long [] numNulls;
private long numRows = 0;
private long numBytes = 0;
@@ -81,12 +81,17 @@ public class TableStatistics {
return this.numBytes;
}
- public void analyzeField(int idx, Datum datum) {
- if (datum instanceof NullDatum) {
+ public void analyzeNull(int idx) {
+ numNulls[idx]++;
+ }
+
+ public void analyzeField(int idx, Tuple tuple) {
+ if (tuple.isBlankOrNull(idx)) {
numNulls[idx]++;
return;
}
+ Datum datum = tuple.asDatum(idx);
if (comparable[idx]) {
if (!maxValues.contains(idx) ||
maxValues.get(idx).compareTo(datum) < 0) {
@@ -102,21 +107,21 @@ public class TableStatistics {
public TableStats getTableStat() {
TableStats stat = new TableStats();
- ColumnStats columnStats;
for (int i = 0; i < schema.size(); i++) {
- columnStats = new ColumnStats(schema.getColumn(i));
+ Column column = schema.getColumn(i);
+ ColumnStats columnStats = new ColumnStats(column);
columnStats.setNumNulls(numNulls[i]);
- if (minValues.get(i) == null || schema.getColumn(i).getDataType().getType() == minValues.get(i).type()) {
+ if (minValues.isBlank(i) || column.getDataType().getType() == minValues.type(i)) {
columnStats.setMinValue(minValues.get(i));
} else {
- LOG.warn("Wrong statistics column type (" + minValues.get(i).type() +
- ", expected=" + schema.getColumn(i).getDataType().getType() + ")");
+ LOG.warn("Wrong statistics column type (" + minValues.type(i) +
+ ", expected=" + column.getDataType().getType() + ")");
}
- if (maxValues.get(i) == null || schema.getColumn(i).getDataType().getType() == maxValues.get(i).type()) {
+ if (minValues.isBlank(i) || column.getDataType().getType() == maxValues.type(i)) {
columnStats.setMaxValue(maxValues.get(i));
} else {
- LOG.warn("Wrong statistics column type (" + maxValues.get(i).type() +
- ", expected=" + schema.getColumn(i).getDataType().getType() + ")");
+ LOG.warn("Wrong statistics column type (" + maxValues.type(i) +
+ ", expected=" + column.getDataType().getType() + ")");
}
stat.addColumnStat(columnStats);
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TextSerializerDeserializer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TextSerializerDeserializer.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TextSerializerDeserializer.java
index 954b62d..1ec13bc 100644
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TextSerializerDeserializer.java
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TextSerializerDeserializer.java
@@ -21,6 +21,7 @@ package org.apache.tajo.storage;
import com.google.protobuf.Message;
import org.apache.commons.codec.binary.Base64;
import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
import org.apache.tajo.common.TajoDataTypes;
import org.apache.tajo.datum.*;
import org.apache.tajo.datum.protobuf.ProtobufJsonFormat;
@@ -39,39 +40,48 @@ public class TextSerializerDeserializer implements SerializerDeserializer {
public static final byte[] falseBytes = "false".getBytes();
private ProtobufJsonFormat protobufJsonFormat = ProtobufJsonFormat.getInstance();
+ public TextSerializerDeserializer() {}
+
+ public TextSerializerDeserializer(Schema schema) {
+ init(schema);
+ }
+
+ private Schema schema;
+
@Override
- public int serialize(Column col, Datum datum, OutputStream out, byte[] nullCharacters) throws IOException {
+ public void init(Schema schema) {
+ this.schema = schema;
+ }
- byte[] bytes;
- int length = 0;
- TajoDataTypes.DataType dataType = col.getDataType();
-
- if (datum == null || datum instanceof NullDatum) {
- switch (dataType.getType()) {
- case CHAR:
- case TEXT:
- length = nullCharacters.length;
- out.write(nullCharacters);
- break;
- default:
- break;
+ @Override
+ public int serialize(int index, Tuple tuple, OutputStream out, byte[] nullCharacters)
+ throws IOException {
+
+ Column col = schema.getColumn(index);
+ TajoDataTypes.Type type = col.getDataType().getType();
+ if (tuple.isBlankOrNull(index)) {
+ if (type == TajoDataTypes.Type.CHAR || type == TajoDataTypes.Type.TEXT) {
+ out.write(nullCharacters);
+ return nullCharacters.length;
}
- return length;
+ return 0;
}
- switch (dataType.getType()) {
+ byte[] bytes;
+ int length = 0;
+ switch (type) {
case BOOLEAN:
- out.write(datum.asBool() ? trueBytes : falseBytes);
+ out.write(tuple.getBool(index) ? trueBytes : falseBytes);
length = trueBytes.length;
break;
case CHAR:
- int size = dataType.getLength() - datum.size();
+ int size = col.getDataType().getLength() - tuple.size(index);
if (size < 0){
- throw new ValueTooLongForTypeCharactersException(dataType.getLength());
+ throw new ValueTooLongForTypeCharactersException(col.getDataType().getLength());
}
byte[] pad = new byte[size];
- bytes = datum.asTextBytes();
+ bytes = tuple.getTextBytes(index);
out.write(bytes);
out.write(pad);
length = bytes.length + pad.length;
@@ -86,28 +96,28 @@ public class TextSerializerDeserializer implements SerializerDeserializer {
case INET4:
case DATE:
case INTERVAL:
- bytes = datum.asTextBytes();
+ bytes = tuple.getTextBytes(index);
length = bytes.length;
out.write(bytes);
break;
case TIME:
- bytes = ((TimeDatum)datum).asChars(TimeZone.getDefault(), true).getBytes();
+ bytes = TimeDatum.asChars(tuple.getTimeDate(index), TimeZone.getDefault(), true).getBytes();
length = bytes.length;
out.write(bytes);
break;
case TIMESTAMP:
- bytes = ((TimestampDatum)datum).asChars(TimeZone.getDefault(), true).getBytes();
+ bytes = TimestampDatum.asChars(tuple.getTimeDate(index), TimeZone.getDefault(), true).getBytes();
length = bytes.length;
out.write(bytes);
break;
case INET6:
case BLOB:
- bytes = Base64.encodeBase64(datum.asByteArray(), false);
+ bytes = Base64.encodeBase64(tuple.getBytes(index), false);
length = bytes.length;
out.write(bytes, 0, length);
break;
case PROTOBUF:
- ProtobufDatum protobuf = (ProtobufDatum) datum;
+ ProtobufDatum protobuf = (ProtobufDatum) tuple.getProtobufDatum(index);
byte[] protoBytes = protobufJsonFormat.printToString(protobuf.get()).getBytes();
length = protoBytes.length;
out.write(protoBytes, 0, protoBytes.length);
@@ -120,10 +130,13 @@ public class TextSerializerDeserializer implements SerializerDeserializer {
}
@Override
- public Datum deserialize(Column col, byte[] bytes, int offset, int length, byte[] nullCharacters) throws IOException {
+ public Datum deserialize(int index, byte[] bytes, int offset, int length, byte[] nullCharacters) throws IOException {
+
+ Column col = schema.getColumn(index);
+ TajoDataTypes.Type type = col.getDataType().getType();
Datum datum;
- switch (col.getDataType().getType()) {
+ switch (type) {
case BOOLEAN:
datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get()
: DatumFactory.createBool(bytes[offset] == 't' || bytes[offset] == 'T');
http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TupleRange.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TupleRange.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TupleRange.java
index 043409a..c42cdd6 100644
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TupleRange.java
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TupleRange.java
@@ -22,8 +22,6 @@ import com.google.common.base.Objects;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.SortSpec;
-import java.util.Comparator;
-
/**
* It represents a pair of start and end tuples.
*/
http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/HeapTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/HeapTuple.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/HeapTuple.java
index 33f9f1c..1f43ef8 100644
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/HeapTuple.java
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/HeapTuple.java
@@ -21,6 +21,7 @@ package org.apache.tajo.tuple.offheap;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Message;
+import org.apache.tajo.common.TajoDataTypes;
import org.apache.tajo.datum.*;
import org.apache.tajo.exception.UnsupportedException;
import org.apache.tajo.storage.Tuple;
@@ -29,6 +30,7 @@ import org.apache.tajo.util.SizeOf;
import org.apache.tajo.util.StringUtils;
import org.apache.tajo.util.UnsafeUtil;
+import org.apache.tajo.util.datetime.TimeMeta;
import sun.misc.Unsafe;
import java.nio.ByteBuffer;
@@ -53,6 +55,16 @@ public class HeapTuple implements Tuple {
return data.length;
}
+ @Override
+ public TajoDataTypes.Type type(int fieldId) {
+ return types[fieldId].getType();
+ }
+
+ @Override
+ public int size(int fieldId) {
+ return UNSAFE.getInt(data, BASE_OFFSET + checkNullAndGetOffset(fieldId));
+ }
+
public ByteBuffer nioBuffer() {
return ByteBuffer.wrap(data);
}
@@ -75,13 +87,18 @@ public class HeapTuple implements Tuple {
}
@Override
- public boolean isNull(int fieldid) {
+ public boolean isBlank(int fieldid) {
return getFieldOffset(fieldid) == OffHeapRowBlock.NULL_FIELD_OFFSET;
}
@Override
- public boolean isNotNull(int fieldid) {
- return getFieldOffset(fieldid) > OffHeapRowBlock.NULL_FIELD_OFFSET;
+ public boolean isBlankOrNull(int fieldid) {
+ return getFieldOffset(fieldid) == OffHeapRowBlock.NULL_FIELD_OFFSET;
+ }
+
+ @Override
+ public void put(int fieldId, Tuple tuple) {
+ throw new UnsupportedException("UnSafeTuple does not support put(int, Tuple).");
}
@Override
@@ -95,23 +112,13 @@ public class HeapTuple implements Tuple {
}
@Override
- public void put(int fieldId, Datum[] values) {
- throw new UnsupportedException("UnSafeTuple does not support put(int, Datum []).");
- }
-
- @Override
- public void put(int fieldId, Tuple tuple) {
- throw new UnsupportedException("UnSafeTuple does not support put(int, Tuple).");
- }
-
- @Override
public void put(Datum[] values) {
- throw new UnsupportedException("UnSafeTuple does not support put(Datum []).");
+ throw new UnsupportedException("UnSafeTuple does not support put(Datum[]).");
}
@Override
- public Datum get(int fieldId) {
- if (isNull(fieldId)) {
+ public Datum asDatum(int fieldId) {
+ if (isBlankOrNull(fieldId)) {
return NullDatum.get();
}
@@ -184,6 +191,11 @@ public class HeapTuple implements Tuple {
}
@Override
+ public byte[] getTextBytes(int fieldId) {
+ return getText(fieldId).getBytes();
+ }
+
+ @Override
public short getInt2(int fieldId) {
return UNSAFE.getShort(data, BASE_OFFSET + checkNullAndGetOffset(fieldId));
}
@@ -213,6 +225,11 @@ public class HeapTuple implements Tuple {
return new String(getBytes(fieldId));
}
+ @Override
+ public TimeMeta getTimeDate(int fieldId) {
+ return asDatum(fieldId).asTimeMeta();
+ }
+
public IntervalDatum getInterval(int fieldId) {
long pos = checkNullAndGetOffset(fieldId);
int months = UNSAFE.getInt(data, BASE_OFFSET + pos);
@@ -257,7 +274,7 @@ public class HeapTuple implements Tuple {
Datum [] datums = new Datum[size()];
for (int i = 0; i < size(); i++) {
if (contains(i)) {
- datums[i] = get(i);
+ datums[i] = asDatum(i);
} else {
datums[i] = NullDatum.get();
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java
index b742e6d..e7bd2aa 100644
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java
@@ -22,6 +22,7 @@ import com.google.common.base.Preconditions;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Message;
+import org.apache.tajo.common.TajoDataTypes;
import org.apache.tajo.datum.*;
import org.apache.tajo.exception.UnsupportedException;
import org.apache.tajo.storage.Tuple;
@@ -30,6 +31,7 @@ import org.apache.tajo.util.SizeOf;
import org.apache.tajo.util.StringUtils;
import org.apache.tajo.util.UnsafeUtil;
+import org.apache.tajo.util.datetime.TimeMeta;
import sun.misc.Unsafe;
import sun.nio.ch.DirectBuffer;
@@ -63,6 +65,16 @@ public abstract class UnSafeTuple implements Tuple {
return types.length;
}
+ @Override
+ public TajoDataTypes.Type type(int fieldId) {
+ return types[fieldId].getType();
+ }
+
+ @Override
+ public int size(int fieldId) {
+ return UNSAFE.getInt(getFieldAddr(fieldId));
+ }
+
public ByteBuffer nioBuffer() {
return ((ByteBuffer)((ByteBuffer)bb).duplicate().position(relativePos).limit(relativePos + length)).slice();
}
@@ -105,13 +117,13 @@ public abstract class UnSafeTuple implements Tuple {
}
@Override
- public boolean isNull(int fieldid) {
+ public boolean isBlank(int fieldid) {
return getFieldOffset(fieldid) == OffHeapRowBlock.NULL_FIELD_OFFSET;
}
@Override
- public boolean isNotNull(int fieldid) {
- return getFieldOffset(fieldid) > OffHeapRowBlock.NULL_FIELD_OFFSET;
+ public boolean isBlankOrNull(int fieldid) {
+ return getFieldOffset(fieldid) == OffHeapRowBlock.NULL_FIELD_OFFSET;
}
@Override
@@ -125,23 +137,18 @@ public abstract class UnSafeTuple implements Tuple {
}
@Override
- public void put(int fieldId, Datum[] values) {
- throw new UnsupportedException("UnSafeTuple does not support put(int, Datum []).");
- }
-
- @Override
public void put(int fieldId, Tuple tuple) {
throw new UnsupportedException("UnSafeTuple does not support put(int, Tuple).");
}
@Override
public void put(Datum[] values) {
- throw new UnsupportedException("UnSafeTuple does not support put(Datum []).");
+ throw new UnsupportedException("UnSafeTuple does not support put(Datum[]).");
}
@Override
- public Datum get(int fieldId) {
- if (isNull(fieldId)) {
+ public Datum asDatum(int fieldId) {
+ if (isBlankOrNull(fieldId)) {
return NullDatum.get();
}
@@ -214,6 +221,17 @@ public abstract class UnSafeTuple implements Tuple {
}
@Override
+ public byte[] getTextBytes(int fieldId) {
+ long pos = getFieldAddr(fieldId);
+ int len = UNSAFE.getInt(pos);
+ pos += SizeOf.SIZE_OF_INT;
+
+ byte[] bytes = new byte[len];
+ UNSAFE.copyMemory(null, pos, bytes, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, len);
+ return bytes;
+ }
+
+ @Override
public short getInt2(int fieldId) {
long addr = getFieldAddr(fieldId);
return UNSAFE.getShort(addr);
@@ -241,13 +259,7 @@ public abstract class UnSafeTuple implements Tuple {
@Override
public String getText(int fieldId) {
- long pos = getFieldAddr(fieldId);
- int len = UNSAFE.getInt(pos);
- pos += SizeOf.SIZE_OF_INT;
-
- byte [] bytes = new byte[len];
- UNSAFE.copyMemory(null, pos, bytes, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, len);
- return new String(bytes);
+ return new String(getTextBytes(fieldId));
}
public IntervalDatum getInterval(int fieldId) {
@@ -285,6 +297,11 @@ public abstract class UnSafeTuple implements Tuple {
}
@Override
+ public TimeMeta getTimeDate(int fieldId) {
+ return null;
+ }
+
+ @Override
public Tuple clone() throws CloneNotSupportedException {
return toHeapTuple();
}
@@ -294,7 +311,7 @@ public abstract class UnSafeTuple implements Tuple {
Datum [] datums = new Datum[size()];
for (int i = 0; i < size(); i++) {
if (contains(i)) {
- datums[i] = get(i);
+ datums[i] = asDatum(i);
} else {
datums[i] = NullDatum.get();
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestFrameTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestFrameTuple.java b/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestFrameTuple.java
index 0251dc7..d0ee8e0 100644
--- a/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestFrameTuple.java
+++ b/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestFrameTuple.java
@@ -33,8 +33,7 @@ public class TestFrameTuple {
@Before
public void setUp() throws Exception {
- tuple1 = new VTuple(11);
- tuple1.put(new Datum[] {
+ tuple1 = new VTuple(new Datum[] {
DatumFactory.createBool(true),
DatumFactory.createBit((byte) 0x99),
DatumFactory.createChar('9'),
@@ -48,8 +47,7 @@ public class TestFrameTuple {
DatumFactory.createInet4("192.168.0.1")
});
- tuple2 = new VTuple(11);
- tuple2.put(new Datum[] {
+ tuple2 = new VTuple(new Datum[] {
DatumFactory.createBool(true),
DatumFactory.createBit((byte) 0x99),
DatumFactory.createChar('9'),
@@ -76,9 +74,9 @@ public class TestFrameTuple {
assertTrue(frame.contains(i));
}
- assertEquals(DatumFactory.createInt8(23l), frame.get(5));
- assertEquals(DatumFactory.createInt8(23l), frame.get(16));
- assertEquals(DatumFactory.createInet4("192.168.0.1"), frame.get(10));
- assertEquals(DatumFactory.createInet4("192.168.0.1"), frame.get(21));
+ assertEquals(23l, frame.getInt8(5));
+ assertEquals(23l, frame.getInt8(16));
+ assertEquals("192.168.0.1", frame.getText(10));
+ assertEquals("192.168.0.1", frame.getText(21));
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestLazyTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestLazyTuple.java b/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestLazyTuple.java
index 9e7f334..96f90e7 100644
--- a/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestLazyTuple.java
+++ b/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestLazyTuple.java
@@ -71,6 +71,7 @@ public class TestLazyTuple {
sb.append(NullDatum.get());
textRow = BytesUtils.splitPreserveAllTokens(sb.toString().getBytes(), '|', 13);
serde = new TextSerializerDeserializer();
+ serde.init(schema);
}
@Test
@@ -194,31 +195,6 @@ public class TestLazyTuple {
}
@Test
- public void testPutTuple() {
- int colNum = schema.size();
- LazyTuple t1 = new LazyTuple(schema, new byte[colNum][], -1);
-
- t1.put(0, DatumFactory.createInt4(1));
- t1.put(1, DatumFactory.createInt4(2));
- t1.put(2, DatumFactory.createInt4(3));
-
-
- Schema schema2 = new Schema();
- schema2.addColumn("col1", TajoDataTypes.Type.INT8);
- schema2.addColumn("col2", TajoDataTypes.Type.INT8);
-
- LazyTuple t2 = new LazyTuple(schema2, new byte[schema2.size()][], -1);
- t2.put(0, DatumFactory.createInt4(4));
- t2.put(1, DatumFactory.createInt4(5));
-
- t1.put(3, t2);
-
- for (int i = 0; i < 5; i++) {
- assertEquals(i + 1, t1.get(i).asInt4());
- }
- }
-
- @Test
public void testInvalidNumber() {
byte[][] bytes = BytesUtils.splitPreserveAllTokens(" 1| |2 ||".getBytes(), '|', 5);
Schema schema = new Schema();
http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestTupleComparator.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestTupleComparator.java b/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestTupleComparator.java
index 5e94531..9eec96f 100644
--- a/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestTupleComparator.java
+++ b/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestTupleComparator.java
@@ -51,17 +51,14 @@ public class TestTupleComparator {
schema.addColumn("col4", Type.INT4);
schema.addColumn("col5", Type.TEXT);
- Tuple tuple1 = new VTuple(5);
- Tuple tuple2 = new VTuple(5);
-
- tuple1.put(
+ VTuple tuple1 = new VTuple(
new Datum[] {
DatumFactory.createInt4(9),
DatumFactory.createInt4(3),
DatumFactory.createInt4(33),
DatumFactory.createInt4(4),
DatumFactory.createText("abc")});
- tuple2.put(
+ VTuple tuple2 = new VTuple(
new Datum[] {
DatumFactory.createInt4(1),
DatumFactory.createInt4(25),
http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestVTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestVTuple.java b/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestVTuple.java
index 1bbd9ec..4ef595c 100644
--- a/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestVTuple.java
+++ b/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestVTuple.java
@@ -121,34 +121,15 @@ public class TestVTuple {
}
@Test
- public void testPutTuple() {
- Tuple t1 = new VTuple(5);
-
- t1.put(0, DatumFactory.createInt4(1));
- t1.put(1, DatumFactory.createInt4(2));
- t1.put(2, DatumFactory.createInt4(3));
-
- Tuple t2 = new VTuple(2);
- t2.put(0, DatumFactory.createInt4(4));
- t2.put(1, DatumFactory.createInt4(5));
-
- t1.put(3, t2);
-
- for (int i = 0; i < 5; i++) {
- assertEquals(i+1, t1.get(i).asInt4());
- }
- }
-
- @Test
public void testClone() throws CloneNotSupportedException {
- Tuple t1 = new VTuple(5);
+ VTuple t1 = new VTuple(5);
t1.put(0, DatumFactory.createInt4(1));
t1.put(1, DatumFactory.createInt4(2));
t1.put(3, DatumFactory.createInt4(2));
t1.put(4, DatumFactory.createText("str"));
- VTuple t2 = (VTuple) t1.clone();
+ VTuple t2 = t1.clone();
assertNotSame(t1, t2);
assertEquals(t1, t2);
http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/tuple/offheap/TestOffHeapRowBlock.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/tuple/offheap/TestOffHeapRowBlock.java b/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/tuple/offheap/TestOffHeapRowBlock.java
index c43ba38..278d733 100644
--- a/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/tuple/offheap/TestOffHeapRowBlock.java
+++ b/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/tuple/offheap/TestOffHeapRowBlock.java
@@ -497,79 +497,79 @@ public class TestOffHeapRowBlock {
public static void validateNullity(int j, Tuple tuple) {
if (j == 0) {
- tuple.isNull(0);
+ tuple.isBlankOrNull(0);
} else {
assertTrue((j % 1 == 0) == tuple.getBool(0));
}
if (j % 1 == 0) {
- tuple.isNull(1);
+ tuple.isBlankOrNull(1);
} else {
assertTrue(1 == tuple.getInt2(1));
}
if (j % 2 == 0) {
- tuple.isNull(2);
+ tuple.isBlankOrNull(2);
} else {
assertEquals(j, tuple.getInt4(2));
}
if (j % 3 == 0) {
- tuple.isNull(3);
+ tuple.isBlankOrNull(3);
} else {
assertEquals(j, tuple.getInt8(3));
}
if (j % 4 == 0) {
- tuple.isNull(4);
+ tuple.isBlankOrNull(4);
} else {
assertTrue(j == tuple.getFloat4(4));
}
if (j % 5 == 0) {
- tuple.isNull(5);
+ tuple.isBlankOrNull(5);
} else {
assertTrue(j == tuple.getFloat8(5));
}
if (j % 6 == 0) {
- tuple.isNull(6);
+ tuple.isBlankOrNull(6);
} else {
assertEquals(new String(UNICODE_FIELD_PREFIX + j), tuple.getText(6));
}
if (j % 7 == 0) {
- tuple.isNull(7);
+ tuple.isBlankOrNull(7);
} else {
assertEquals(DatumFactory.createTimestamp("2014-04-16 08:48:00").asInt8() + (long) j, tuple.getInt8(7));
}
if (j % 8 == 0) {
- tuple.isNull(8);
+ tuple.isBlankOrNull(8);
} else {
assertEquals(DatumFactory.createDate("2014-04-16").asInt4() + j, tuple.getInt4(8));
}
if (j % 9 == 0) {
- tuple.isNull(9);
+ tuple.isBlankOrNull(9);
} else {
assertEquals(DatumFactory.createTime("08:48:00").asInt8() + j, tuple.getInt8(9));
}
if (j % 10 == 0) {
- tuple.isNull(10);
+ tuple.isBlankOrNull(10);
} else {
assertEquals(DatumFactory.createInterval((j + 1) + " hours"), tuple.getInterval(10));
}
if (j % 11 == 0) {
- tuple.isNull(11);
+ tuple.isBlankOrNull(11);
} else {
assertEquals(DatumFactory.createInet4("192.168.0.1").asInt4() + j, tuple.getInt4(11));
}
if (j % 12 == 0) {
- tuple.isNull(12);
+ tuple.isBlankOrNull(12);
} else {
assertEquals(new ProtobufDatum(ProtoUtil.convertString(j + "")), tuple.getProtobufDatum(12));
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AbstractHBaseAppender.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AbstractHBaseAppender.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AbstractHBaseAppender.java
index 0e3441b..425f392 100644
--- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AbstractHBaseAppender.java
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AbstractHBaseAppender.java
@@ -151,11 +151,10 @@ public abstract class AbstractHBaseAppender implements Appender {
if (rowkeyColumnIndexes.length > 1) {
bout.reset();
for (int i = 0; i < rowkeyColumnIndexes.length; i++) {
- datum = tuple.get(rowkeyColumnIndexes[i]);
if (isBinaryColumns[rowkeyColumnIndexes[i]]) {
- rowkey = HBaseBinarySerializerDeserializer.serialize(schema.getColumn(rowkeyColumnIndexes[i]), datum);
+ rowkey = HBaseBinarySerializerDeserializer.serialize(schema.getColumn(rowkeyColumnIndexes[i]), tuple, i);
} else {
- rowkey = HBaseTextSerializerDeserializer.serialize(schema.getColumn(rowkeyColumnIndexes[i]), datum);
+ rowkey = HBaseTextSerializerDeserializer.serialize(schema.getColumn(rowkeyColumnIndexes[i]), tuple, i);
}
bout.write(rowkey);
if (i < rowkeyColumnIndexes.length - 1) {
@@ -165,11 +164,10 @@ public abstract class AbstractHBaseAppender implements Appender {
rowkey = bout.toByteArray();
} else {
int index = rowkeyColumnIndexes[0];
- datum = tuple.get(index);
if (isBinaryColumns[index]) {
- rowkey = HBaseBinarySerializerDeserializer.serialize(schema.getColumn(index), datum);
+ rowkey = HBaseBinarySerializerDeserializer.serialize(schema.getColumn(index), tuple, index);
} else {
- rowkey = HBaseTextSerializerDeserializer.serialize(schema.getColumn(index), datum);
+ rowkey = HBaseTextSerializerDeserializer.serialize(schema.getColumn(index), tuple, index);
}
}
@@ -182,12 +180,11 @@ public abstract class AbstractHBaseAppender implements Appender {
if (isRowKeyMappings[i]) {
continue;
}
- Datum datum = tuple.get(i);
byte[] value;
if (isBinaryColumns[i]) {
- value = HBaseBinarySerializerDeserializer.serialize(schema.getColumn(i), datum);
+ value = HBaseBinarySerializerDeserializer.serialize(schema.getColumn(i), tuple, i);
} else {
- value = HBaseTextSerializerDeserializer.serialize(schema.getColumn(i), datum);
+ value = HBaseTextSerializerDeserializer.serialize(schema.getColumn(i), tuple, i);
}
if (isColumnKeys[i]) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseBinarySerializerDeserializer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseBinarySerializerDeserializer.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseBinarySerializerDeserializer.java
index 53ff9dc..40c4aea 100644
--- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseBinarySerializerDeserializer.java
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseBinarySerializerDeserializer.java
@@ -22,6 +22,7 @@ import org.apache.tajo.catalog.Column;
import org.apache.tajo.datum.Datum;
import org.apache.tajo.datum.DatumFactory;
import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.storage.Tuple;
import org.apache.tajo.util.Bytes;
import java.io.IOException;
@@ -98,4 +99,38 @@ public class HBaseBinarySerializerDeserializer {
return bytes;
}
+
+ public static byte[] serialize(Column col, Tuple tuple, int index) throws IOException {
+ if (tuple.isBlankOrNull(index)) {
+ return null;
+ }
+
+ byte[] bytes;
+ switch (col.getDataType().getType()) {
+ case INT1:
+ case INT2:
+ bytes = Bytes.toBytes(tuple.getInt2(index));
+ break;
+ case INT4:
+ bytes = Bytes.toBytes(tuple.getInt4(index));
+ break;
+ case INT8:
+ bytes = Bytes.toBytes(tuple.getInt8(index));
+ break;
+ case FLOAT4:
+ bytes = Bytes.toBytes(tuple.getFloat4(index));
+ break;
+ case FLOAT8:
+ bytes = Bytes.toBytes(tuple.getFloat8(index));
+ break;
+ case TEXT:
+ bytes = Bytes.toBytes(tuple.getText(index));
+ break;
+ default:
+ bytes = null;
+ break;
+ }
+
+ return bytes;
+ }
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java
index b1a2d59..19fdf80 100644
--- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java
@@ -27,7 +27,6 @@ import org.apache.tajo.TaskAttemptId;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.TableMeta;
import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.datum.Datum;
import org.apache.tajo.storage.TableSpaceManager;
import org.apache.tajo.storage.Tuple;
@@ -65,12 +64,11 @@ public class HBasePutAppender extends AbstractHBaseAppender {
if (isRowKeyMappings[i]) {
continue;
}
- Datum datum = tuple.get(i);
byte[] value;
if (isBinaryColumns[i]) {
- value = HBaseBinarySerializerDeserializer.serialize(schema.getColumn(i), datum);
+ value = HBaseBinarySerializerDeserializer.serialize(schema.getColumn(i), tuple, i);
} else {
- value = HBaseTextSerializerDeserializer.serialize(schema.getColumn(i), datum);
+ value = HBaseTextSerializerDeserializer.serialize(schema.getColumn(i), tuple, i);
}
if (isColumnKeys[i]) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java
index 670f87e..845c2d7 100644
--- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java
@@ -1008,7 +1008,7 @@ public class HBaseTablespace extends Tablespace {
Tuple previousTuple = dataRange.getStart();
for (byte[] eachEndKey : endKeys) {
- Tuple endTuple = new VTuple(sortSpecs.length);
+ VTuple endTuple = new VTuple(sortSpecs.length);
byte[][] rowKeyFields;
if (sortSpecs.length > 1) {
byte[][] splitValues = BytesUtils.splitPreserveAllTokens(
http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTextSerializerDeserializer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTextSerializerDeserializer.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTextSerializerDeserializer.java
index a0ad492..ea5d0b0 100644
--- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTextSerializerDeserializer.java
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTextSerializerDeserializer.java
@@ -22,6 +22,7 @@ import org.apache.tajo.catalog.Column;
import org.apache.tajo.datum.Datum;
import org.apache.tajo.datum.DatumFactory;
import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.storage.Tuple;
import org.apache.tajo.util.NumberUtil;
import java.io.IOException;
@@ -68,4 +69,12 @@ public class HBaseTextSerializerDeserializer {
return datum.asChars().getBytes();
}
+
+ public static byte[] serialize(Column col, Tuple tuple, int index) throws IOException {
+ if (tuple.isBlankOrNull(index)) {
+ return null;
+ }
+
+ return tuple.getBytes(index);
+ }
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/CSVFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/CSVFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/CSVFile.java
index 5fc96f1..ee3095c 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/CSVFile.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/CSVFile.java
@@ -33,7 +33,6 @@ import org.apache.tajo.catalog.TableMeta;
import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.datum.Datum;
import org.apache.tajo.datum.NullDatum;
import org.apache.tajo.exception.UnsupportedException;
import org.apache.tajo.storage.compress.CodecPool;
@@ -149,6 +148,7 @@ public class CSVFile {
String serdeClass = this.meta.getOption(StorageConstants.CSVFILE_SERDE,
TextSerializerDeserializer.class.getName());
serde = (SerializerDeserializer) Class.forName(serdeClass).newInstance();
+ serde.init(schema);
} catch (Exception e) {
LOG.error(e.getMessage(), e);
throw new IOException(e);
@@ -163,12 +163,10 @@ public class CSVFile {
@Override
public void addTuple(Tuple tuple) throws IOException {
- Datum datum;
int rowBytes = 0;
for (int i = 0; i < columnNum; i++) {
- datum = tuple.get(i);
- rowBytes += serde.serialize(schema.getColumn(i), datum, os, nullChars);
+ rowBytes += serde.serialize(i, tuple, os, nullChars);
if(columnNum - 1 > i){
os.write(delimiter);
@@ -176,7 +174,7 @@ public class CSVFile {
}
if (isShuffle) {
// it is to calculate min/max values, and it is only used for the intermediate file.
- stats.analyzeField(i, datum);
+ stats.analyzeField(i, tuple);
}
}
os.write(LF);
@@ -358,6 +356,7 @@ public class CSVFile {
String serdeClass = this.meta.getOption(StorageConstants.CSVFILE_SERDE,
TextSerializerDeserializer.class.getName());
serde = (SerializerDeserializer) Class.forName(serdeClass).newInstance();
+ serde.init(schema);
} catch (Exception e) {
LOG.error(e.getMessage(), e);
throw new IOException(e);
http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FieldSerializerDeserializer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FieldSerializerDeserializer.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FieldSerializerDeserializer.java
index 0b3755d..cfd5a79 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FieldSerializerDeserializer.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FieldSerializerDeserializer.java
@@ -20,6 +20,7 @@ package org.apache.tajo.storage;
import io.netty.buffer.ByteBuf;
import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
import org.apache.tajo.datum.Datum;
import org.apache.tajo.storage.text.TextLineParsingError;
@@ -29,9 +30,11 @@ import java.io.OutputStream;
public interface FieldSerializerDeserializer {
- public int serialize(OutputStream out, Datum datum, Column col, int columnIndex, byte[] nullChars) throws IOException;
+ void init(Schema schema);
- public Datum deserialize(ByteBuf buf, Column col, int columnIndex, ByteBuf nullChars)
+ int serialize(int columnIndex, Tuple datum, OutputStream out, byte[] nullChars) throws IOException;
+
+ Datum deserialize(int columnIndex, ByteBuf buf, ByteBuf nullChars)
throws IOException, TextLineParsingError;
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java
index 5213ba0..4e9bcda 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java
@@ -634,10 +634,10 @@ public class RawFile {
nullFlags.clear();
for (int i = 0; i < schema.size(); i++) {
if (enabledStats) {
- stats.analyzeField(i, t.get(i));
+ stats.analyzeField(i, t);
}
- if (t.isNull(i)) {
+ if (t.isBlankOrNull(i)) {
nullFlags.set(i);
continue;
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RowFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RowFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RowFile.java
index 1ff6c4f..0e628d4 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RowFile.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RowFile.java
@@ -376,56 +376,56 @@ public class RowFile {
for (int i = 0; i < schema.size(); i++) {
if (enabledStats) {
- stats.analyzeField(i, t.get(i));
+ stats.analyzeField(i, t);
}
- if (t.isNull(i)) {
+ if (t.isBlankOrNull(i)) {
nullFlags.set(i);
} else {
col = schema.getColumn(i);
switch (col.getDataType().getType()) {
case BOOLEAN:
- buffer.put(t.get(i).asByte());
+ buffer.put(t.getByte(i));
break;
case BIT:
- buffer.put(t.get(i).asByte());
+ buffer.put(t.getByte(i));
break;
case CHAR:
- byte[] src = t.get(i).asByteArray();
+ byte[] src = t.getBytes(i);
byte[] dst = Arrays.copyOf(src, col.getDataType().getLength());
buffer.putInt(src.length);
buffer.put(dst);
break;
case TEXT:
- byte [] strbytes = t.get(i).asByteArray();
+ byte [] strbytes = t.getBytes(i);
buffer.putShort((short)strbytes.length);
buffer.put(strbytes, 0, strbytes.length);
break;
case INT2:
- buffer.putShort(t.get(i).asInt2());
+ buffer.putShort(t.getInt2(i));
break;
case INT4:
- buffer.putInt(t.get(i).asInt4());
+ buffer.putInt(t.getInt4(i));
break;
case INT8:
- buffer.putLong(t.get(i).asInt8());
+ buffer.putLong(t.getInt8(i));
break;
case FLOAT4:
- buffer.putFloat(t.get(i).asFloat4());
+ buffer.putFloat(t.getFloat4(i));
break;
case FLOAT8:
- buffer.putDouble(t.get(i).asFloat8());
+ buffer.putDouble(t.getFloat8(i));
break;
case BLOB:
- byte [] bytes = t.get(i).asByteArray();
+ byte [] bytes = t.getBytes(i);
buffer.putShort((short)bytes.length);
buffer.put(bytes);
break;
case INET4:
- buffer.put(t.get(i).asByteArray());
+ buffer.put(t.getBytes(i));
break;
case INET6:
- buffer.put(t.get(i).asByteArray());
+ buffer.put(t.getBytes(i));
break;
case NULL_TYPE:
nullFlags.set(i);
http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java
index da426ea..2782955 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java
@@ -33,7 +33,6 @@ import org.apache.tajo.TaskAttemptId;
import org.apache.tajo.catalog.Column;
import org.apache.tajo.catalog.TableMeta;
import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.datum.NullDatum;
import org.apache.tajo.storage.FileAppender;
import org.apache.tajo.storage.TableStatistics;
import org.apache.tajo.storage.Tuple;
@@ -104,7 +103,7 @@ public class AvroAppender extends FileAppender {
}
private Object getPrimitive(Tuple tuple, int i, Schema.Type avroType) {
- if (tuple.get(i) instanceof NullDatum) {
+ if (tuple.isBlankOrNull(i)) {
return null;
}
switch (avroType) {
@@ -141,7 +140,7 @@ public class AvroAppender extends FileAppender {
for (int i = 0; i < schema.size(); ++i) {
Column column = schema.getColumn(i);
if (enabledStats) {
- stats.analyzeField(i, tuple.get(i));
+ stats.analyzeField(i, tuple);
}
Object value;
Schema.Field avroField = avroFields.get(i);
http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineSerializer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineSerializer.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineSerializer.java
index 34e9661..60c32a7 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineSerializer.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineSerializer.java
@@ -56,7 +56,7 @@ public class JsonLineSerializer extends TextLineSerializer {
JSONObject jsonObject = new JSONObject();
for (int i = 0; i < columnNum; i++) {
- if (input.isNull(i)) {
+ if (input.isBlankOrNull(i)) {
continue;
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java
index 415c338..45960aa 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java
@@ -108,7 +108,7 @@ public class ParquetAppender extends FileAppender {
public void addTuple(Tuple tuple) throws IOException {
if (enabledStats) {
for (int i = 0; i < schema.size(); ++i) {
- stats.analyzeField(i, tuple.get(i));
+ stats.analyzeField(i, tuple);
}
}
writer.write(tuple);
http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordConverter.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordConverter.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordConverter.java
index 4c675a4..7f236b6 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordConverter.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordConverter.java
@@ -158,7 +158,7 @@ public class TajoRecordConverter extends GroupConverter {
final int projectionIndex = projectionMap[i];
Column column = tajoReadSchema.getColumn(projectionIndex);
if (column.getDataType().getType() == TajoDataTypes.Type.NULL_TYPE
- || currentTuple.get(i) == null) {
+ || currentTuple.isBlankOrNull(i)) {
set(projectionIndex, NullDatum.get());
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoWriteSupport.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoWriteSupport.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoWriteSupport.java
index dd951e1..de2a1e3 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoWriteSupport.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoWriteSupport.java
@@ -22,7 +22,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.tajo.catalog.Column;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.common.TajoDataTypes;
-import org.apache.tajo.datum.Datum;
import org.apache.tajo.exception.ValueTooLongForTypeCharactersException;
import org.apache.tajo.storage.Tuple;
import parquet.hadoop.api.WriteSupport;
@@ -99,11 +98,10 @@ public class TajoWriteSupport extends WriteSupport<Tuple> {
if (column.getDataType().getType() == TajoDataTypes.Type.NULL_TYPE) {
continue;
}
- Datum datum = tuple.get(tajoIndex);
Type fieldType = fields.get(index);
- if (!tuple.isNull(tajoIndex)) {
+ if (!tuple.isBlankOrNull(tajoIndex)) {
recordConsumer.startField(fieldType.getName(), index);
- writeValue(fieldType, column, datum);
+ writeValue(fieldType, column, tuple, tajoIndex);
recordConsumer.endField(fieldType.getName(), index);
} else if (fieldType.isRepetition(Type.Repetition.REQUIRED)) {
throw new RuntimeException("Null-value for required field: " +
@@ -113,40 +111,40 @@ public class TajoWriteSupport extends WriteSupport<Tuple> {
}
}
- private void writeValue(Type fieldType, Column column, Datum datum) {
+ private void writeValue(Type fieldType, Column column, Tuple tuple, int index) {
switch (column.getDataType().getType()) {
case BOOLEAN:
- recordConsumer.addBoolean(datum.asBool());
+ recordConsumer.addBoolean(tuple.getBool(index));
break;
case BIT:
case INT2:
case INT4:
- recordConsumer.addInteger(datum.asInt4());
+ recordConsumer.addInteger(tuple.getInt4(index));
break;
case INT8:
- recordConsumer.addLong(datum.asInt8());
+ recordConsumer.addLong(tuple.getInt8(index));
break;
case FLOAT4:
- recordConsumer.addFloat(datum.asFloat4());
+ recordConsumer.addFloat(tuple.getFloat4(index));
break;
case FLOAT8:
- recordConsumer.addDouble(datum.asFloat8());
+ recordConsumer.addDouble(tuple.getFloat8(index));
break;
case CHAR:
- if (datum.size() > column.getDataType().getLength()) {
+ if (tuple.size(index) > column.getDataType().getLength()) {
throw new ValueTooLongForTypeCharactersException(column.getDataType().getLength());
}
- recordConsumer.addBinary(Binary.fromByteArray(datum.asTextBytes()));
+ recordConsumer.addBinary(Binary.fromByteArray(tuple.getTextBytes(index)));
break;
case TEXT:
- recordConsumer.addBinary(Binary.fromByteArray(datum.asTextBytes()));
+ recordConsumer.addBinary(Binary.fromByteArray(tuple.getBytes(index)));
break;
case PROTOBUF:
case BLOB:
case INET4:
case INET6:
- recordConsumer.addBinary(Binary.fromByteArray(datum.asByteArray()));
+ recordConsumer.addBinary(Binary.fromByteArray(tuple.getBytes(index)));
break;
default:
break;
http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java
index af260b4..1dcec5f 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java
@@ -30,7 +30,6 @@ import org.apache.hadoop.io.SequenceFile.Metadata;
import org.apache.hadoop.io.compress.*;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.tajo.TaskAttemptId;
-import org.apache.tajo.catalog.Column;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.TableMeta;
import org.apache.tajo.catalog.proto.CatalogProtos;
@@ -646,8 +645,12 @@ public class RCFile {
valLenBuffer = new NonSyncByteArrayOutputStream();
}
- public int append(Column column, Datum datum) throws IOException {
- int currentLen = serde.serialize(column, datum, columnValBuffer, nullChars);
+ public int append(NullDatum nill) {
+ return nullChars.length;
+ }
+
+ public int append(Tuple tuple, int i) throws IOException {
+ int currentLen = serde.serialize(i, tuple, columnValBuffer, nullChars);
columnValueLength += currentLen;
uncompressedColumnValueLength += currentLen;
@@ -765,6 +768,7 @@ public class RCFile {
BinarySerializerDeserializer.class.getName());
try {
serde = (SerializerDeserializer) Class.forName(serdeClass).newInstance();
+ serde.init(schema);
} catch (Exception e) {
LOG.error(e.getMessage(), e);
throw new IOException(e);
@@ -892,20 +896,19 @@ public class RCFile {
int size = schema.size();
for (int i = 0; i < size; i++) {
- Datum datum = tuple.get(i);
- int length = columnBuffers[i].append(schema.getColumn(i), datum);
+ int length = columnBuffers[i].append(tuple, i);
columnBufferSize += length;
if (isShuffle) {
// it is to calculate min/max values, and it is only used for the intermediate file.
- stats.analyzeField(i, datum);
+ stats.analyzeField(i, tuple);
}
}
if (size < columnNumber) {
for (int i = size; i < columnNumber; i++) {
- columnBuffers[i].append(schema.getColumn(i), NullDatum.get());
+ columnBuffers[i].append(NullDatum.get());
if (isShuffle) {
- stats.analyzeField(i, NullDatum.get());
+ stats.analyzeNull(i);
}
}
}
@@ -1377,6 +1380,7 @@ public class RCFile {
serdeClass = this.meta.getOption(StorageConstants.RCFILE_SERDE, BinarySerializerDeserializer.class.getName());
}
serde = (SerializerDeserializer) Class.forName(serdeClass).newInstance();
+ serde.init(schema);
} catch (Exception e) {
LOG.error(e.getMessage(), e);
throw new IOException(e);
@@ -1712,7 +1716,7 @@ public class RCFile {
} else {
colAdvanceRow(j, col);
- Datum datum = serde.deserialize(schema.getColumn(actualColumnIdx),
+ Datum datum = serde.deserialize(actualColumnIdx,
currentValue.loadedColumnsValueBuffer[j].getData(), col.rowReadIndex, col.prvLength, nullChars);
tuple.put(j, datum);
col.rowReadIndex += col.prvLength;
http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileAppender.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileAppender.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileAppender.java
index 404352c..9b09d78 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileAppender.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileAppender.java
@@ -35,7 +35,6 @@ import org.apache.tajo.catalog.TableMeta;
import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.datum.Datum;
import org.apache.tajo.datum.NullDatum;
import org.apache.tajo.datum.ProtobufDatum;
import org.apache.tajo.storage.*;
@@ -125,6 +124,7 @@ public class SequenceFileAppender extends FileAppender {
String serdeClass = this.meta.getOption(StorageConstants.SEQUENCEFILE_SERDE,
TextSerializerDeserializer.class.getName());
serde = (SerializerDeserializer) Class.forName(serdeClass).newInstance();
+ serde.init(schema);
} catch (Exception e) {
LOG.error(e.getMessage(), e);
throw new IOException(e);
@@ -159,16 +159,13 @@ public class SequenceFileAppender extends FileAppender {
@Override
public void addTuple(Tuple tuple) throws IOException {
- Datum datum;
if (serde instanceof BinarySerializerDeserializer) {
byte nullByte = 0;
int lasti = 0;
for (int i = 0; i < columnNum; i++) {
- datum = tuple.get(i);
-
// set bit to 1 if a field is not null
- if (null != datum) {
+ if (!tuple.isBlank(i)) {
nullByte |= 1 << (i % 8);
}
@@ -179,29 +176,28 @@ public class SequenceFileAppender extends FileAppender {
os.write(nullByte);
for (int j = lasti; j <= i; j++) {
- datum = tuple.get(j);
switch (schema.getColumn(j).getDataType().getType()) {
case TEXT:
- BytesUtils.writeVLong(os, datum.asTextBytes().length);
+ BytesUtils.writeVLong(os, tuple.getTextBytes(j).length);
break;
case PROTOBUF:
- ProtobufDatum protobufDatum = (ProtobufDatum) datum;
+ ProtobufDatum protobufDatum = (ProtobufDatum) tuple.getProtobufDatum(j);
BytesUtils.writeVLong(os, protobufDatum.asByteArray().length);
break;
case CHAR:
case INET4:
case BLOB:
- BytesUtils.writeVLong(os, datum.asByteArray().length);
+ BytesUtils.writeVLong(os, tuple.getBytes(j).length);
break;
default:
}
- serde.serialize(schema.getColumn(j), datum, os, nullChars);
+ serde.serialize(j, tuple, os, nullChars);
if (isShuffle) {
// it is to calculate min/max values, and it is only used for the intermediate file.
- stats.analyzeField(j, datum);
+ stats.analyzeField(j, tuple);
}
}
lasti = i + 1;
@@ -215,8 +211,7 @@ public class SequenceFileAppender extends FileAppender {
} else {
for (int i = 0; i < columnNum; i++) {
- datum = tuple.get(i);
- serde.serialize(schema.getColumn(i), datum, os, nullChars);
+ serde.serialize(i, tuple, os, nullChars);
if (columnNum -1 > i) {
os.write((byte) delimiter);
@@ -224,7 +219,7 @@ public class SequenceFileAppender extends FileAppender {
if (isShuffle) {
// it is to calculate min/max values, and it is only used for the intermediate file.
- stats.analyzeField(i, datum);
+ stats.analyzeField(i, tuple);
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileScanner.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileScanner.java
index af0973e..ff73a1c 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileScanner.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileScanner.java
@@ -117,6 +117,7 @@ public class SequenceFileScanner extends FileScanner {
try {
String serdeClass = this.meta.getOption(StorageConstants.SEQUENCEFILE_SERDE, TextSerializerDeserializer.class.getName());
serde = (SerializerDeserializer) Class.forName(serdeClass).newInstance();
+ serde.init(schema);
if (serde instanceof BinarySerializerDeserializer) {
hasBinarySerDe = true;
@@ -225,7 +226,7 @@ public class SequenceFileScanner extends FileScanner {
for (int j = 0; j < projectionMap.length; j++) {
if (projectionMap[j] == i) {
- Datum datum = serde.deserialize(schema.getColumn(i), bytes, fieldStart[i], fieldLength[i], nullChars);
+ Datum datum = serde.deserialize(i, bytes, fieldStart[i], fieldLength[i], nullChars);
tuple.put(i, datum);
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java
index 0901c0b..eabab22 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java
@@ -19,8 +19,8 @@
package org.apache.tajo.storage.text;
import io.netty.buffer.ByteBuf;
-import org.apache.tajo.catalog.Column;
import io.netty.buffer.ByteBufProcessor;
+import org.apache.tajo.catalog.Column;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.TableMeta;
import org.apache.tajo.datum.Datum;
@@ -38,12 +38,10 @@ public class CSVLineDeserializer extends TextLineDeserializer {
private int delimiterCompensation;
private int [] targetColumnIndexes;
- private Column [] projected;
public CSVLineDeserializer(Schema schema, TableMeta meta, Column [] projected) {
super(schema, meta);
- this.projected = projected;
targetColumnIndexes = new int[projected.length];
for (int i = 0; i < projected.length; i++) {
targetColumnIndexes[i] = schema.getColumnId(projected[i].getQualifiedName());
@@ -67,13 +65,14 @@ public class CSVLineDeserializer extends TextLineDeserializer {
nullChars = TextLineSerDe.getNullChars(meta);
fieldSerDer = new TextFieldSerializerDeserializer(meta);
+ fieldSerDer.init(schema);
}
public void deserialize(final ByteBuf lineBuf, Tuple output) throws IOException, TextLineParsingError {
- int[] projection = targetColumnIndexes;
if (lineBuf == null || targetColumnIndexes == null || targetColumnIndexes.length == 0) {
return;
}
+ int[] projection = targetColumnIndexes;
final int rowLength = lineBuf.readableBytes();
int start = 0, fieldLength = 0, end = 0;
@@ -93,14 +92,12 @@ public class CSVLineDeserializer extends TextLineDeserializer {
if (projection.length > currentTarget && currentIndex == projection[currentTarget]) {
lineBuf.setIndex(start, start + fieldLength);
-
try {
- Datum datum = fieldSerDer.deserialize(lineBuf, projected[currentTarget], currentIndex, nullChars);
+ Datum datum = fieldSerDer.deserialize(currentIndex, lineBuf, nullChars);
output.put(currentTarget, datum);
} catch (Exception e) {
output.put(currentTarget, NullDatum.get());
}
-
currentTarget++;
}