You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2014/12/12 09:22:33 UTC
[19/45] tajo git commit: TAJO-1233: Merge hbase_storage branch to the
master branch. (Hyoungjun Kim via hyunsik)
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestLazyTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestLazyTuple.java b/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestLazyTuple.java
new file mode 100644
index 0000000..c6149f7
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestLazyTuple.java
@@ -0,0 +1,258 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.util.BytesUtils;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class TestLazyTuple {
+
+ Schema schema;
+ byte[][] textRow;
+ byte[] nullbytes;
+ SerializerDeserializer serde;
+
+ @Before
+ public void setUp() {
+ nullbytes = "\\N".getBytes();
+
+ schema = new Schema();
+ schema.addColumn("col1", TajoDataTypes.Type.BOOLEAN);
+ schema.addColumn("col2", TajoDataTypes.Type.BIT);
+ schema.addColumn("col3", TajoDataTypes.Type.CHAR, 7);
+ schema.addColumn("col4", TajoDataTypes.Type.INT2);
+ schema.addColumn("col5", TajoDataTypes.Type.INT4);
+ schema.addColumn("col6", TajoDataTypes.Type.INT8);
+ schema.addColumn("col7", TajoDataTypes.Type.FLOAT4);
+ schema.addColumn("col8", TajoDataTypes.Type.FLOAT8);
+ schema.addColumn("col9", TajoDataTypes.Type.TEXT);
+ schema.addColumn("col10", TajoDataTypes.Type.BLOB);
+ schema.addColumn("col11", TajoDataTypes.Type.INET4);
+ schema.addColumn("col12", TajoDataTypes.Type.INT4);
+ schema.addColumn("col13", TajoDataTypes.Type.NULL_TYPE);
+
+ StringBuilder sb = new StringBuilder();
+ sb.append(DatumFactory.createBool(true)).append('|');
+ sb.append(new String(DatumFactory.createBit((byte) 0x99).asTextBytes())).append('|');
+ sb.append(DatumFactory.createChar("str")).append('|');
+ sb.append(DatumFactory.createInt2((short) 17)).append('|');
+ sb.append(DatumFactory.createInt4(59)).append('|');
+ sb.append(DatumFactory.createInt8(23l)).append('|');
+ sb.append(DatumFactory.createFloat4(77.9f)).append('|');
+ sb.append(DatumFactory.createFloat8(271.9f)).append('|');
+ sb.append(DatumFactory.createText("str2")).append('|');
+ sb.append(DatumFactory.createBlob("jinho".getBytes())).append('|');
+ sb.append(DatumFactory.createInet4("192.168.0.1")).append('|');
+ sb.append(new String(nullbytes)).append('|');
+ sb.append(NullDatum.get());
+ textRow = BytesUtils.splitPreserveAllTokens(sb.toString().getBytes(), '|');
+ serde = new TextSerializerDeserializer();
+ }
+
+ @Test
+ public void testGetDatum() {
+
+ LazyTuple t1 = new LazyTuple(schema, textRow, -1, nullbytes, serde);
+ assertEquals(DatumFactory.createBool(true), t1.get(0));
+ assertEquals(DatumFactory.createBit((byte) 0x99), t1.get(1));
+ assertEquals(DatumFactory.createChar("str"), t1.get(2));
+ assertEquals(DatumFactory.createInt2((short) 17), t1.get(3));
+ assertEquals(DatumFactory.createInt4(59), t1.get(4));
+ assertEquals(DatumFactory.createInt8(23l), t1.get(5));
+ assertEquals(DatumFactory.createFloat4(77.9f), t1.get(6));
+ assertEquals(DatumFactory.createFloat8(271.9f), t1.get(7));
+ assertEquals(DatumFactory.createText("str2"), t1.get(8));
+ assertEquals(DatumFactory.createBlob("jinho".getBytes()), t1.get(9));
+ assertEquals(DatumFactory.createInet4("192.168.0.1"), t1.get(10));
+ assertEquals(NullDatum.get(), t1.get(11));
+ assertEquals(NullDatum.get(), t1.get(12));
+ }
+
+ @Test
+ public void testContain() {
+ int colNum = schema.size();
+
+ LazyTuple t1 = new LazyTuple(schema, new byte[colNum][], -1);
+ t1.put(0, DatumFactory.createInt4(1));
+ t1.put(3, DatumFactory.createInt4(1));
+ t1.put(7, DatumFactory.createInt4(1));
+
+ assertTrue(t1.contains(0));
+ assertFalse(t1.contains(1));
+ assertFalse(t1.contains(2));
+ assertTrue(t1.contains(3));
+ assertFalse(t1.contains(4));
+ assertFalse(t1.contains(5));
+ assertFalse(t1.contains(6));
+ assertTrue(t1.contains(7));
+ assertFalse(t1.contains(8));
+ assertFalse(t1.contains(9));
+ assertFalse(t1.contains(10));
+ assertFalse(t1.contains(11));
+ assertFalse(t1.contains(12));
+ }
+
+ @Test
+ public void testPut() {
+ int colNum = schema.size();
+ LazyTuple t1 = new LazyTuple(schema, new byte[colNum][], -1);
+ t1.put(0, DatumFactory.createText("str"));
+ t1.put(1, DatumFactory.createInt4(2));
+ t1.put(11, DatumFactory.createFloat4(0.76f));
+
+ assertTrue(t1.contains(0));
+ assertTrue(t1.contains(1));
+
+ assertEquals(t1.getText(0), "str");
+ assertEquals(t1.get(1).asInt4(), 2);
+ assertTrue(t1.get(11).asFloat4() == 0.76f);
+ }
+
+ @Test
+ public void testEquals() {
+ int colNum = schema.size();
+ LazyTuple t1 = new LazyTuple(schema, new byte[colNum][], -1);
+ LazyTuple t2 = new LazyTuple(schema, new byte[colNum][], -1);
+
+ t1.put(0, DatumFactory.createInt4(1));
+ t1.put(1, DatumFactory.createInt4(2));
+ t1.put(3, DatumFactory.createInt4(2));
+
+ t2.put(0, DatumFactory.createInt4(1));
+ t2.put(1, DatumFactory.createInt4(2));
+ t2.put(3, DatumFactory.createInt4(2));
+
+ assertEquals(t1, t2);
+
+ Tuple t3 = new VTuple(colNum);
+ t3.put(0, DatumFactory.createInt4(1));
+ t3.put(1, DatumFactory.createInt4(2));
+ t3.put(3, DatumFactory.createInt4(2));
+ assertEquals(t1, t3);
+ assertEquals(t2, t3);
+
+ LazyTuple t4 = new LazyTuple(schema, new byte[colNum][], -1);
+ assertNotSame(t1, t4);
+ }
+
+ @Test
+ public void testHashCode() {
+ int colNum = schema.size();
+ LazyTuple t1 = new LazyTuple(schema, new byte[colNum][], -1);
+ LazyTuple t2 = new LazyTuple(schema, new byte[colNum][], -1);
+
+ t1.put(0, DatumFactory.createInt4(1));
+ t1.put(1, DatumFactory.createInt4(2));
+ t1.put(3, DatumFactory.createInt4(2));
+ t1.put(4, DatumFactory.createText("str"));
+
+ t2.put(0, DatumFactory.createInt4(1));
+ t2.put(1, DatumFactory.createInt4(2));
+ t2.put(3, DatumFactory.createInt4(2));
+ t2.put(4, DatumFactory.createText("str"));
+
+ assertEquals(t1.hashCode(), t2.hashCode());
+
+ Tuple t3 = new VTuple(colNum);
+ t3.put(0, DatumFactory.createInt4(1));
+ t3.put(1, DatumFactory.createInt4(2));
+ t3.put(3, DatumFactory.createInt4(2));
+ t3.put(4, DatumFactory.createText("str"));
+ assertEquals(t1.hashCode(), t3.hashCode());
+ assertEquals(t2.hashCode(), t3.hashCode());
+
+ Tuple t4 = new VTuple(5);
+ t4.put(0, DatumFactory.createInt4(1));
+ t4.put(1, DatumFactory.createInt4(2));
+ t4.put(4, DatumFactory.createInt4(2));
+
+ assertNotSame(t1.hashCode(), t4.hashCode());
+ }
+
+ @Test
+ public void testPutTuple() {
+ int colNum = schema.size();
+ LazyTuple t1 = new LazyTuple(schema, new byte[colNum][], -1);
+
+ t1.put(0, DatumFactory.createInt4(1));
+ t1.put(1, DatumFactory.createInt4(2));
+ t1.put(2, DatumFactory.createInt4(3));
+
+
+ Schema schema2 = new Schema();
+ schema2.addColumn("col1", TajoDataTypes.Type.INT8);
+ schema2.addColumn("col2", TajoDataTypes.Type.INT8);
+
+ LazyTuple t2 = new LazyTuple(schema2, new byte[schema2.size()][], -1);
+ t2.put(0, DatumFactory.createInt4(4));
+ t2.put(1, DatumFactory.createInt4(5));
+
+ t1.put(3, t2);
+
+ for (int i = 0; i < 5; i++) {
+ assertEquals(i + 1, t1.get(i).asInt4());
+ }
+ }
+
+ @Test
+ public void testInvalidNumber() {
+ byte[][] bytes = BytesUtils.splitPreserveAllTokens(" 1| |2 ||".getBytes(), '|');
+ Schema schema = new Schema();
+ schema.addColumn("col1", TajoDataTypes.Type.INT2);
+ schema.addColumn("col2", TajoDataTypes.Type.INT4);
+ schema.addColumn("col3", TajoDataTypes.Type.INT8);
+ schema.addColumn("col4", TajoDataTypes.Type.FLOAT4);
+ schema.addColumn("col5", TajoDataTypes.Type.FLOAT8);
+
+ LazyTuple tuple = new LazyTuple(schema, bytes, 0);
+ assertEquals(bytes.length, tuple.size());
+
+ for (int i = 0; i < tuple.size(); i++){
+ assertEquals(NullDatum.get(), tuple.get(i));
+ }
+ }
+
+ @Test
+ public void testClone() throws CloneNotSupportedException {
+ int colNum = schema.size();
+ LazyTuple t1 = new LazyTuple(schema, new byte[colNum][], -1);
+
+ t1.put(0, DatumFactory.createInt4(1));
+ t1.put(1, DatumFactory.createInt4(2));
+ t1.put(3, DatumFactory.createInt4(2));
+ t1.put(4, DatumFactory.createText("str"));
+
+ LazyTuple t2 = (LazyTuple) t1.clone();
+ assertNotSame(t1, t2);
+ assertEquals(t1, t2);
+
+ assertSame(t1.get(4), t2.get(4));
+
+ t1.clear();
+ assertFalse(t1.equals(t2));
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestTupleComparator.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestTupleComparator.java b/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestTupleComparator.java
new file mode 100644
index 0000000..639ca04
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestTupleComparator.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.SortSpec;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestTupleComparator {
+
+ @Before
+ public void setUp() throws Exception {
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ }
+
+ @Test
+ public final void testCompare() {
+ Schema schema = new Schema();
+ schema.addColumn("col1", Type.INT4);
+ schema.addColumn("col2", Type.INT4);
+ schema.addColumn("col3", Type.INT4);
+ schema.addColumn("col4", Type.INT4);
+ schema.addColumn("col5", Type.TEXT);
+
+ Tuple tuple1 = new VTuple(5);
+ Tuple tuple2 = new VTuple(5);
+
+ tuple1.put(
+ new Datum[] {
+ DatumFactory.createInt4(9),
+ DatumFactory.createInt4(3),
+ DatumFactory.createInt4(33),
+ DatumFactory.createInt4(4),
+ DatumFactory.createText("abc")});
+ tuple2.put(
+ new Datum[] {
+ DatumFactory.createInt4(1),
+ DatumFactory.createInt4(25),
+ DatumFactory.createInt4(109),
+ DatumFactory.createInt4(4),
+ DatumFactory.createText("abd")});
+
+ SortSpec sortKey1 = new SortSpec(schema.getColumn("col4"), true, false);
+ SortSpec sortKey2 = new SortSpec(schema.getColumn("col5"), true, false);
+
+ BaseTupleComparator tc = new BaseTupleComparator(schema,
+ new SortSpec[] {sortKey1, sortKey2});
+ assertEquals(-1, tc.compare(tuple1, tuple2));
+ assertEquals(1, tc.compare(tuple2, tuple1));
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestVTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestVTuple.java b/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestVTuple.java
new file mode 100644
index 0000000..1bbd9ec
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestVTuple.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+
+import org.apache.tajo.datum.DatumFactory;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class TestVTuple {
+
+ /**
+ * @throws Exception
+ */
+ @Before
+ public void setUp() throws Exception {
+
+ }
+
+ @Test
+ public void testContain() {
+ VTuple t1 = new VTuple(260);
+ t1.put(0, DatumFactory.createInt4(1));
+ t1.put(1, DatumFactory.createInt4(1));
+ t1.put(27, DatumFactory.createInt4(1));
+ t1.put(96, DatumFactory.createInt4(1));
+ t1.put(257, DatumFactory.createInt4(1));
+
+ assertTrue(t1.contains(0));
+ assertTrue(t1.contains(1));
+ assertFalse(t1.contains(2));
+ assertFalse(t1.contains(3));
+ assertFalse(t1.contains(4));
+ assertTrue(t1.contains(27));
+ assertFalse(t1.contains(28));
+ assertFalse(t1.contains(95));
+ assertTrue(t1.contains(96));
+ assertFalse(t1.contains(97));
+ assertTrue(t1.contains(257));
+ }
+
+ @Test
+ public void testPut() {
+ VTuple t1 = new VTuple(260);
+ t1.put(0, DatumFactory.createText("str"));
+ t1.put(1, DatumFactory.createInt4(2));
+ t1.put(257, DatumFactory.createFloat4(0.76f));
+
+ assertTrue(t1.contains(0));
+ assertTrue(t1.contains(1));
+
+ assertEquals(t1.getText(0),"str");
+ assertEquals(t1.get(1).asInt4(),2);
+ assertTrue(t1.get(257).asFloat4() == 0.76f);
+ }
+
+ @Test
+ public void testEquals() {
+ Tuple t1 = new VTuple(5);
+ Tuple t2 = new VTuple(5);
+
+ t1.put(0, DatumFactory.createInt4(1));
+ t1.put(1, DatumFactory.createInt4(2));
+ t1.put(3, DatumFactory.createInt4(2));
+
+ t2.put(0, DatumFactory.createInt4(1));
+ t2.put(1, DatumFactory.createInt4(2));
+ t2.put(3, DatumFactory.createInt4(2));
+
+ assertEquals(t1,t2);
+
+ Tuple t3 = new VTuple(5);
+ t2.put(0, DatumFactory.createInt4(1));
+ t2.put(1, DatumFactory.createInt4(2));
+ t2.put(4, DatumFactory.createInt4(2));
+
+ assertNotSame(t1,t3);
+ }
+
+ @Test
+ public void testHashCode() {
+ Tuple t1 = new VTuple(5);
+ Tuple t2 = new VTuple(5);
+
+ t1.put(0, DatumFactory.createInt4(1));
+ t1.put(1, DatumFactory.createInt4(2));
+ t1.put(3, DatumFactory.createInt4(2));
+ t1.put(4, DatumFactory.createText("hyunsik"));
+
+ t2.put(0, DatumFactory.createInt4(1));
+ t2.put(1, DatumFactory.createInt4(2));
+ t2.put(3, DatumFactory.createInt4(2));
+ t2.put(4, DatumFactory.createText("hyunsik"));
+
+ assertEquals(t1.hashCode(),t2.hashCode());
+
+ Tuple t3 = new VTuple(5);
+ t3.put(0, DatumFactory.createInt4(1));
+ t3.put(1, DatumFactory.createInt4(2));
+ t3.put(4, DatumFactory.createInt4(2));
+
+ assertNotSame(t1.hashCode(),t3.hashCode());
+ }
+
+ @Test
+ public void testPutTuple() {
+ Tuple t1 = new VTuple(5);
+
+ t1.put(0, DatumFactory.createInt4(1));
+ t1.put(1, DatumFactory.createInt4(2));
+ t1.put(2, DatumFactory.createInt4(3));
+
+ Tuple t2 = new VTuple(2);
+ t2.put(0, DatumFactory.createInt4(4));
+ t2.put(1, DatumFactory.createInt4(5));
+
+ t1.put(3, t2);
+
+ for (int i = 0; i < 5; i++) {
+ assertEquals(i+1, t1.get(i).asInt4());
+ }
+ }
+
+ @Test
+ public void testClone() throws CloneNotSupportedException {
+ Tuple t1 = new VTuple(5);
+
+ t1.put(0, DatumFactory.createInt4(1));
+ t1.put(1, DatumFactory.createInt4(2));
+ t1.put(3, DatumFactory.createInt4(2));
+ t1.put(4, DatumFactory.createText("str"));
+
+ VTuple t2 = (VTuple) t1.clone();
+ assertNotSame(t1, t2);
+ assertEquals(t1, t2);
+
+ assertSame(t1.get(4), t2.get(4));
+
+ t1.clear();
+ assertFalse(t1.equals(t2));
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/tuple/TestBaseTupleBuilder.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/tuple/TestBaseTupleBuilder.java b/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/tuple/TestBaseTupleBuilder.java
new file mode 100644
index 0000000..b332364
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/tuple/TestBaseTupleBuilder.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple;
+
+import org.apache.tajo.storage.RowStoreUtil;
+import org.apache.tajo.tuple.offheap.*;
+import org.junit.Test;
+
+public class TestBaseTupleBuilder {
+
+ @Test
+ public void testBuild() {
+ BaseTupleBuilder builder = new BaseTupleBuilder(TestOffHeapRowBlock.schema);
+
+ OffHeapRowBlock rowBlock = TestOffHeapRowBlock.createRowBlock(10248);
+ OffHeapRowBlockReader reader = rowBlock.getReader();
+
+ ZeroCopyTuple inputTuple = new ZeroCopyTuple();
+
+ HeapTuple heapTuple = null;
+ ZeroCopyTuple zcTuple = null;
+ int i = 0;
+ while(reader.next(inputTuple)) {
+ RowStoreUtil.convert(inputTuple, builder);
+
+ heapTuple = builder.buildToHeapTuple();
+ TestOffHeapRowBlock.validateTupleResult(i, heapTuple);
+
+ zcTuple = builder.buildToZeroCopyTuple();
+ TestOffHeapRowBlock.validateTupleResult(i, zcTuple);
+
+ i++;
+ }
+ }
+
+ @Test
+ public void testBuildWithNull() {
+ BaseTupleBuilder builder = new BaseTupleBuilder(TestOffHeapRowBlock.schema);
+
+ OffHeapRowBlock rowBlock = TestOffHeapRowBlock.createRowBlockWithNull(10248);
+ OffHeapRowBlockReader reader = rowBlock.getReader();
+
+ ZeroCopyTuple inputTuple = new ZeroCopyTuple();
+
+ HeapTuple heapTuple = null;
+ ZeroCopyTuple zcTuple = null;
+ int i = 0;
+ while(reader.next(inputTuple)) {
+ RowStoreUtil.convert(inputTuple, builder);
+
+ heapTuple = builder.buildToHeapTuple();
+ TestOffHeapRowBlock.validateNullity(i, heapTuple);
+
+ zcTuple = builder.buildToZeroCopyTuple();
+ TestOffHeapRowBlock.validateNullity(i, zcTuple);
+
+ i++;
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/tuple/offheap/TestHeapTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/tuple/offheap/TestHeapTuple.java b/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/tuple/offheap/TestHeapTuple.java
new file mode 100644
index 0000000..96f465a
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/tuple/offheap/TestHeapTuple.java
@@ -0,0 +1,45 @@
+/***
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple.offheap;
+
+import org.apache.tajo.catalog.SchemaUtil;
+import org.junit.Test;
+
+public class TestHeapTuple {
+
+ @Test
+ public void testHeapTuple() {
+ OffHeapRowBlock rowBlock = TestOffHeapRowBlock.createRowBlock(1024);
+
+ OffHeapRowBlockReader reader = new OffHeapRowBlockReader(rowBlock);
+
+ ZeroCopyTuple zcTuple = new ZeroCopyTuple();
+ int i = 0;
+ while (reader.next(zcTuple)) {
+ byte [] bytes = new byte[zcTuple.nioBuffer().limit()];
+ zcTuple.nioBuffer().get(bytes);
+
+ HeapTuple heapTuple = new HeapTuple(bytes, SchemaUtil.toDataTypes(TestOffHeapRowBlock.schema));
+ TestOffHeapRowBlock.validateTupleResult(i, heapTuple);
+ i++;
+ }
+
+ rowBlock.release();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/tuple/offheap/TestOffHeapRowBlock.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/tuple/offheap/TestOffHeapRowBlock.java b/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/tuple/offheap/TestOffHeapRowBlock.java
new file mode 100644
index 0000000..c43ba38
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/tuple/offheap/TestOffHeapRowBlock.java
@@ -0,0 +1,577 @@
+/***
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple.offheap;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.datum.ProtobufDatum;
+import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
+import org.apache.tajo.storage.BaseTupleComparator;
+import org.apache.tajo.storage.RowStoreUtil;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.unit.StorageUnit;
+import org.apache.tajo.util.FileUtil;
+import org.apache.tajo.util.ProtoUtil;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.tajo.common.TajoDataTypes.Type;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestOffHeapRowBlock {
+ private static final Log LOG = LogFactory.getLog(TestOffHeapRowBlock.class);
+ public static String UNICODE_FIELD_PREFIX = "abc_가나다_";
+ public static Schema schema;
+
+ static {
+ schema = new Schema();
+ schema.addColumn("col0", Type.BOOLEAN);
+ schema.addColumn("col1", Type.INT2);
+ schema.addColumn("col2", Type.INT4);
+ schema.addColumn("col3", Type.INT8);
+ schema.addColumn("col4", Type.FLOAT4);
+ schema.addColumn("col5", Type.FLOAT8);
+ schema.addColumn("col6", Type.TEXT);
+ schema.addColumn("col7", Type.TIMESTAMP);
+ schema.addColumn("col8", Type.DATE);
+ schema.addColumn("col9", Type.TIME);
+ schema.addColumn("col10", Type.INTERVAL);
+ schema.addColumn("col11", Type.INET4);
+ schema.addColumn("col12",
+ CatalogUtil.newDataType(TajoDataTypes.Type.PROTOBUF, PrimitiveProtos.StringProto.class.getName()));
+ }
+
+ private void explainRowBlockAllocation(OffHeapRowBlock rowBlock, long startTime, long endTime) {
+ LOG.info(FileUtil.humanReadableByteCount(rowBlock.size(), true) + " bytes allocated "
+ + (endTime - startTime) + " msec");
+ }
+
+ @Test
+ public void testPutAndReadValidation() {
+ int rowNum = 1000;
+
+ long allocStart = System.currentTimeMillis();
+ OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, 1024);
+ long allocEnd = System.currentTimeMillis();
+ explainRowBlockAllocation(rowBlock, allocStart, allocEnd);
+
+ OffHeapRowBlockReader reader = new OffHeapRowBlockReader(rowBlock);
+
+ ZeroCopyTuple tuple = new ZeroCopyTuple();
+ long writeStart = System.currentTimeMillis();
+ for (int i = 0; i < rowNum; i++) {
+ fillRow(i, rowBlock.getWriter());
+
+ reader.reset();
+ int j = 0;
+ while(reader.next(tuple)) {
+ validateTupleResult(j, tuple);
+
+ j++;
+ }
+ }
+ long writeEnd = System.currentTimeMillis();
+ LOG.info("writing and validating take " + (writeEnd - writeStart) + " msec");
+
+ long readStart = System.currentTimeMillis();
+ tuple = new ZeroCopyTuple();
+ int j = 0;
+ reader.reset();
+ while(reader.next(tuple)) {
+ validateTupleResult(j, tuple);
+ j++;
+ }
+ long readEnd = System.currentTimeMillis();
+ LOG.info("reading takes " + (readEnd - readStart) + " msec");
+
+ rowBlock.release();
+ }
+
+ @Test
+ public void testNullityValidation() {
+ int rowNum = 1000;
+
+ long allocStart = System.currentTimeMillis();
+ OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, 1024);
+ long allocEnd = System.currentTimeMillis();
+ explainRowBlockAllocation(rowBlock, allocStart, allocEnd);
+
+ OffHeapRowBlockReader reader = new OffHeapRowBlockReader(rowBlock);
+ ZeroCopyTuple tuple = new ZeroCopyTuple();
+ long writeStart = System.currentTimeMillis();
+ for (int i = 0; i < rowNum; i++) {
+ fillRowBlockWithNull(i, rowBlock.getWriter());
+
+ reader.reset();
+ int j = 0;
+ while(reader.next(tuple)) {
+ validateNullity(j, tuple);
+
+ j++;
+ }
+ }
+ long writeEnd = System.currentTimeMillis();
+ LOG.info("writing and nullity validating take " + (writeEnd - writeStart) +" msec");
+
+ long readStart = System.currentTimeMillis();
+ tuple = new ZeroCopyTuple();
+ int j = 0;
+ reader.reset();
+ while(reader.next(tuple)) {
+ validateNullity(j, tuple);
+
+ j++;
+ }
+ long readEnd = System.currentTimeMillis();
+ LOG.info("reading takes " + (readEnd - readStart) + " msec");
+
+ rowBlock.release();
+ }
+
+ @Test
+ public void testEmptyRow() {
+ int rowNum = 1000;
+
+ long allocStart = System.currentTimeMillis();
+ OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, StorageUnit.MB * 10);
+ long allocEnd = System.currentTimeMillis();
+ explainRowBlockAllocation(rowBlock, allocStart, allocEnd);
+
+ long writeStart = System.currentTimeMillis();
+ for (int i = 0; i < rowNum; i++) {
+ rowBlock.getWriter().startRow();
+ // empty columns
+ rowBlock.getWriter().endRow();
+ }
+ long writeEnd = System.currentTimeMillis();
+ LOG.info("writing tooks " + (writeEnd - writeStart) + " msec");
+
+ OffHeapRowBlockReader reader = new OffHeapRowBlockReader(rowBlock);
+
+ long readStart = System.currentTimeMillis();
+ ZeroCopyTuple tuple = new ZeroCopyTuple();
+ int j = 0;
+ reader.reset();
+ while(reader.next(tuple)) {
+ j++;
+ }
+ long readEnd = System.currentTimeMillis();
+ LOG.info("reading takes " + (readEnd - readStart) + " msec");
+ rowBlock.release();
+
+ assertEquals(rowNum, j);
+ assertEquals(rowNum, rowBlock.rows());
+ }
+
+ @Test
+ public void testSortBenchmark() {
+ int rowNum = 1000;
+
+ OffHeapRowBlock rowBlock = createRowBlock(rowNum);
+ OffHeapRowBlockReader reader = new OffHeapRowBlockReader(rowBlock);
+
+ List<ZeroCopyTuple> unSafeTuples = Lists.newArrayList();
+
+ long readStart = System.currentTimeMillis();
+ ZeroCopyTuple tuple = new ZeroCopyTuple();
+ reader.reset();
+ while(reader.next(tuple)) {
+ unSafeTuples.add(tuple);
+ tuple = new ZeroCopyTuple();
+ }
+ long readEnd = System.currentTimeMillis();
+ LOG.info("reading takes " + (readEnd - readStart) + " msec");
+
+ SortSpec sortSpec = new SortSpec(new Column("col2", Type.INT4));
+ BaseTupleComparator comparator = new BaseTupleComparator(schema, new SortSpec[] {sortSpec});
+
+ long sortStart = System.currentTimeMillis();
+ Collections.sort(unSafeTuples, comparator);
+ long sortEnd = System.currentTimeMillis();
+ LOG.info("sorting took " + (sortEnd - sortStart) + " msec");
+ rowBlock.release();
+ }
+
+ @Test
+ public void testVTuplePutAndGetBenchmark() {
+ int rowNum = 1000;
+
+ List<VTuple> rowBlock = Lists.newArrayList();
+ long writeStart = System.currentTimeMillis();
+ VTuple tuple;
+ for (int i = 0; i < rowNum; i++) {
+ tuple = new VTuple(schema.size());
+ fillVTuple(i, tuple);
+ rowBlock.add(tuple);
+ }
+ long writeEnd = System.currentTimeMillis();
+ LOG.info("Writing takes " + (writeEnd - writeStart) + " msec");
+
+ long readStart = System.currentTimeMillis();
+ int j = 0;
+ for (VTuple t : rowBlock) {
+ validateTupleResult(j, t);
+ j++;
+ }
+ long readEnd = System.currentTimeMillis();
+ LOG.info("reading takes " + (readEnd - readStart) + " msec");
+
+ int count = 0;
+ for (int l = 0; l < rowBlock.size(); l++) {
+ for(int m = 0; m < schema.size(); m++ ) {
+ if (rowBlock.get(l).contains(m) && rowBlock.get(l).get(m).type() == Type.INT4) {
+ count ++;
+ }
+ }
+ }
+ // For preventing unnecessary code elimination optimization.
+ LOG.info("The number of INT4 values is " + count + ".");
+ }
+
+ @Test
+ public void testVTuplePutAndGetBenchmarkViaDirectRowEncoder() {
+ int rowNum = 1000;
+
+ OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, StorageUnit.MB * 100);
+
+ long writeStart = System.currentTimeMillis();
+ VTuple tuple = new VTuple(schema.size());
+ for (int i = 0; i < rowNum; i++) {
+ fillVTuple(i, tuple);
+
+ RowStoreUtil.convert(tuple, rowBlock.getWriter());
+ }
+ long writeEnd = System.currentTimeMillis();
+ LOG.info("Writing takes " + (writeEnd - writeStart) + " msec");
+
+ validateResults(rowBlock);
+ rowBlock.release();
+ }
+
+ @Test
+ public void testSerDerOfRowBlock() {
+ int rowNum = 1000;
+
+ OffHeapRowBlock rowBlock = createRowBlock(rowNum);
+
+ ByteBuffer bb = rowBlock.nioBuffer();
+ OffHeapRowBlock restoredRowBlock = new OffHeapRowBlock(schema, bb);
+ validateResults(restoredRowBlock);
+ rowBlock.release();
+ }
+
+ @Test
+ public void testSerDerOfZeroCopyTuple() {
+ int rowNum = 1000;
+
+ OffHeapRowBlock rowBlock = createRowBlock(rowNum);
+
+ ByteBuffer bb = rowBlock.nioBuffer();
+ OffHeapRowBlock restoredRowBlock = new OffHeapRowBlock(schema, bb);
+ OffHeapRowBlockReader reader = new OffHeapRowBlockReader(restoredRowBlock);
+
+ long readStart = System.currentTimeMillis();
+ ZeroCopyTuple tuple = new ZeroCopyTuple();
+ ZeroCopyTuple copyTuple = new ZeroCopyTuple();
+ int j = 0;
+ reader.reset();
+ while(reader.next(tuple)) {
+ ByteBuffer copy = tuple.nioBuffer();
+ copyTuple.set(copy, SchemaUtil.toDataTypes(schema));
+
+ validateTupleResult(j, copyTuple);
+
+ j++;
+ }
+ long readEnd = System.currentTimeMillis();
+ LOG.info("reading takes " + (readEnd - readStart) + " msec");
+
+ rowBlock.release();
+ }
+
+ public static OffHeapRowBlock createRowBlock(int rowNum) {
+ long allocateStart = System.currentTimeMillis();
+ OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, StorageUnit.MB * 8);
+ long allocatedEnd = System.currentTimeMillis();
+ LOG.info(FileUtil.humanReadableByteCount(rowBlock.size(), true) + " bytes allocated "
+ + (allocatedEnd - allocateStart) + " msec");
+
+ long writeStart = System.currentTimeMillis();
+ for (int i = 0; i < rowNum; i++) {
+ fillRow(i, rowBlock.getWriter());
+ }
+ long writeEnd = System.currentTimeMillis();
+ LOG.info("writing takes " + (writeEnd - writeStart) + " msec");
+
+ return rowBlock;
+ }
+
+ public static OffHeapRowBlock createRowBlockWithNull(int rowNum) {
+ long allocateStart = System.currentTimeMillis();
+ OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, StorageUnit.MB * 8);
+ long allocatedEnd = System.currentTimeMillis();
+ LOG.info(FileUtil.humanReadableByteCount(rowBlock.size(), true) + " bytes allocated "
+ + (allocatedEnd - allocateStart) + " msec");
+
+ long writeStart = System.currentTimeMillis();
+ for (int i = 0; i < rowNum; i++) {
+ fillRowBlockWithNull(i, rowBlock.getWriter());
+ }
+ long writeEnd = System.currentTimeMillis();
+ LOG.info("writing and validating take " + (writeEnd - writeStart) + " msec");
+
+ return rowBlock;
+ }
+
+ public static void fillRow(int i, RowWriter builder) {
+ builder.startRow();
+ builder.putBool(i % 1 == 0 ? true : false); // 0
+ builder.putInt2((short) 1); // 1
+ builder.putInt4(i); // 2
+ builder.putInt8(i); // 3
+ builder.putFloat4(i); // 4
+ builder.putFloat8(i); // 5
+ builder.putText((UNICODE_FIELD_PREFIX + i).getBytes()); // 6
+ builder.putTimestamp(DatumFactory.createTimestamp("2014-04-16 08:48:00").asInt8() + i); // 7
+ builder.putDate(DatumFactory.createDate("2014-04-16").asInt4() + i); // 8
+ builder.putTime(DatumFactory.createTime("08:48:00").asInt8() + i); // 9
+ builder.putInterval(DatumFactory.createInterval((i + 1) + " hours")); // 10
+ builder.putInet4(DatumFactory.createInet4("192.168.0.1").asInt4() + i); // 11
+ builder.putProtoDatum(new ProtobufDatum(ProtoUtil.convertString(i + ""))); // 12
+ builder.endRow();
+ }
+
+ public static void fillRowBlockWithNull(int i, RowWriter writer) {
+ writer.startRow();
+
+ if (i == 0) {
+ writer.skipField();
+ } else {
+ writer.putBool(i % 1 == 0 ? true : false); // 0
+ }
+ if (i % 1 == 0) {
+ writer.skipField();
+ } else {
+ writer.putInt2((short) 1); // 1
+ }
+
+ if (i % 2 == 0) {
+ writer.skipField();
+ } else {
+ writer.putInt4(i); // 2
+ }
+
+ if (i % 3 == 0) {
+ writer.skipField();
+ } else {
+ writer.putInt8(i); // 3
+ }
+
+ if (i % 4 == 0) {
+ writer.skipField();
+ } else {
+ writer.putFloat4(i); // 4
+ }
+
+ if (i % 5 == 0) {
+ writer.skipField();
+ } else {
+ writer.putFloat8(i); // 5
+ }
+
+ if (i % 6 == 0) {
+ writer.skipField();
+ } else {
+ writer.putText((UNICODE_FIELD_PREFIX + i).getBytes()); // 6
+ }
+
+ if (i % 7 == 0) {
+ writer.skipField();
+ } else {
+ writer.putTimestamp(DatumFactory.createTimestamp("2014-04-16 08:48:00").asInt8() + i); // 7
+ }
+
+ if (i % 8 == 0) {
+ writer.skipField();
+ } else {
+ writer.putDate(DatumFactory.createDate("2014-04-16").asInt4() + i); // 8
+ }
+
+ if (i % 9 == 0) {
+ writer.skipField();
+ } else {
+ writer.putTime(DatumFactory.createTime("08:48:00").asInt8() + i); // 9
+ }
+
+ if (i % 10 == 0) {
+ writer.skipField();
+ } else {
+ writer.putInterval(DatumFactory.createInterval((i + 1) + " hours")); // 10
+ }
+
+ if (i % 11 == 0) {
+ writer.skipField();
+ } else {
+ writer.putInet4(DatumFactory.createInet4("192.168.0.1").asInt4() + i); // 11
+ }
+
+ if (i % 12 == 0) {
+ writer.skipField();
+ } else {
+ writer.putProtoDatum(new ProtobufDatum(ProtoUtil.convertString(i + ""))); // 12
+ }
+
+ writer.endRow();
+ }
+
+ public static void fillVTuple(int i, VTuple tuple) {
+ tuple.put(0, DatumFactory.createBool(i % 1 == 0));
+ tuple.put(1, DatumFactory.createInt2((short) 1));
+ tuple.put(2, DatumFactory.createInt4(i));
+ tuple.put(3, DatumFactory.createInt8(i));
+ tuple.put(4, DatumFactory.createFloat4(i));
+ tuple.put(5, DatumFactory.createFloat8(i));
+ tuple.put(6, DatumFactory.createText((UNICODE_FIELD_PREFIX + i).getBytes()));
+ tuple.put(7, DatumFactory.createTimestamp(DatumFactory.createTimestamp("2014-04-16 08:48:00").asInt8() + i)); // 7
+ tuple.put(8, DatumFactory.createDate(DatumFactory.createDate("2014-04-16").asInt4() + i)); // 8
+ tuple.put(9, DatumFactory.createTime(DatumFactory.createTime("08:48:00").asInt8() + i)); // 9
+ tuple.put(10, DatumFactory.createInterval((i + 1) + " hours")); // 10
+ tuple.put(11, DatumFactory.createInet4(DatumFactory.createInet4("192.168.0.1").asInt4() + i)); // 11
+ tuple.put(12, new ProtobufDatum(ProtoUtil.convertString(i + ""))); // 12;
+ }
+
+ public static void validateResults(OffHeapRowBlock rowBlock) {
+ long readStart = System.currentTimeMillis();
+ ZeroCopyTuple tuple = new ZeroCopyTuple();
+ int j = 0;
+ OffHeapRowBlockReader reader = new OffHeapRowBlockReader(rowBlock);
+ reader.reset();
+ while(reader.next(tuple)) {
+ validateTupleResult(j, tuple);
+ j++;
+ }
+ long readEnd = System.currentTimeMillis();
+ LOG.info("Reading takes " + (readEnd - readStart) + " msec");
+ }
+
+ public static void validateTupleResult(int j, Tuple t) {
+ assertTrue((j % 1 == 0) == t.getBool(0));
+ assertTrue(1 == t.getInt2(1));
+ assertEquals(j, t.getInt4(2));
+ assertEquals(j, t.getInt8(3));
+ assertTrue(j == t.getFloat4(4));
+ assertTrue(j == t.getFloat8(5));
+ assertEquals(new String(UNICODE_FIELD_PREFIX + j), t.getText(6));
+ assertEquals(DatumFactory.createTimestamp("2014-04-16 08:48:00").asInt8() + (long) j, t.getInt8(7));
+ assertEquals(DatumFactory.createDate("2014-04-16").asInt4() + j, t.getInt4(8));
+ assertEquals(DatumFactory.createTime("08:48:00").asInt8() + j, t.getInt8(9));
+ assertEquals(DatumFactory.createInterval((j + 1) + " hours"), t.getInterval(10));
+ assertEquals(DatumFactory.createInet4("192.168.0.1").asInt4() + j, t.getInt4(11));
+ assertEquals(new ProtobufDatum(ProtoUtil.convertString(j + "")), t.getProtobufDatum(12));
+ }
+
+ public static void validateNullity(int j, Tuple tuple) {
+ if (j == 0) {
+ tuple.isNull(0);
+ } else {
+ assertTrue((j % 1 == 0) == tuple.getBool(0));
+ }
+
+ if (j % 1 == 0) {
+ tuple.isNull(1);
+ } else {
+ assertTrue(1 == tuple.getInt2(1));
+ }
+
+ if (j % 2 == 0) {
+ tuple.isNull(2);
+ } else {
+ assertEquals(j, tuple.getInt4(2));
+ }
+
+ if (j % 3 == 0) {
+ tuple.isNull(3);
+ } else {
+ assertEquals(j, tuple.getInt8(3));
+ }
+
+ if (j % 4 == 0) {
+ tuple.isNull(4);
+ } else {
+ assertTrue(j == tuple.getFloat4(4));
+ }
+
+ if (j % 5 == 0) {
+ tuple.isNull(5);
+ } else {
+ assertTrue(j == tuple.getFloat8(5));
+ }
+
+ if (j % 6 == 0) {
+ tuple.isNull(6);
+ } else {
+ assertEquals(new String(UNICODE_FIELD_PREFIX + j), tuple.getText(6));
+ }
+
+ if (j % 7 == 0) {
+ tuple.isNull(7);
+ } else {
+ assertEquals(DatumFactory.createTimestamp("2014-04-16 08:48:00").asInt8() + (long) j, tuple.getInt8(7));
+ }
+
+ if (j % 8 == 0) {
+ tuple.isNull(8);
+ } else {
+ assertEquals(DatumFactory.createDate("2014-04-16").asInt4() + j, tuple.getInt4(8));
+ }
+
+ if (j % 9 == 0) {
+ tuple.isNull(9);
+ } else {
+ assertEquals(DatumFactory.createTime("08:48:00").asInt8() + j, tuple.getInt8(9));
+ }
+
+ if (j % 10 == 0) {
+ tuple.isNull(10);
+ } else {
+ assertEquals(DatumFactory.createInterval((j + 1) + " hours"), tuple.getInterval(10));
+ }
+
+ if (j % 11 == 0) {
+ tuple.isNull(11);
+ } else {
+ assertEquals(DatumFactory.createInet4("192.168.0.1").asInt4() + j, tuple.getInt4(11));
+ }
+
+ if (j % 12 == 0) {
+ tuple.isNull(12);
+ } else {
+ assertEquals(new ProtobufDatum(ProtoUtil.convertString(j + "")), tuple.getProtobufDatum(12));
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/tuple/offheap/TestResizableSpec.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/tuple/offheap/TestResizableSpec.java b/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/tuple/offheap/TestResizableSpec.java
new file mode 100644
index 0000000..1eb9c17
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/tuple/offheap/TestResizableSpec.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple.offheap;
+
+import org.apache.tajo.unit.StorageUnit;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class TestResizableSpec {
+
+ @Test
+ public void testResizableLimit() {
+ ResizableLimitSpec limit = new ResizableLimitSpec(10 * StorageUnit.MB, 1000 * StorageUnit.MB, 0.1f, 1.0f);
+
+ long expectedMaxSize = (long) (1000 * StorageUnit.MB + (1000 * StorageUnit.MB * 0.1f));
+
+ assertTrue(limit.limit() == 1000 * StorageUnit.MB + (1000 * StorageUnit.MB * 0.1f));
+
+ assertEquals(20971520, limit.increasedSize(10 * StorageUnit.MB));
+
+ assertEquals(expectedMaxSize, limit.increasedSize(1600 * StorageUnit.MB));
+
+ assertEquals(0.98f, limit.remainRatio(980 * StorageUnit.MB), 0.1);
+
+ assertFalse(limit.canIncrease(limit.limit()));
+ }
+
+ @Test
+ public void testFixedLimit() {
+ FixedSizeLimitSpec limit = new FixedSizeLimitSpec(100 * StorageUnit.MB, 0.0f);
+
+ assertEquals(limit.limit(), 100 * StorageUnit.MB);
+
+ assertEquals(100 * StorageUnit.MB, limit.increasedSize(1000));
+
+ assertEquals(100 * StorageUnit.MB, limit.increasedSize(1600 * StorageUnit.MB));
+
+ assertTrue(0.98f == limit.remainRatio(98 * StorageUnit.MB));
+
+ assertFalse(limit.canIncrease(limit.limit()));
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml b/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml
new file mode 100644
index 0000000..d1c561b
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml
@@ -0,0 +1,164 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+
+<configuration>
+ <property>
+ <name>fs.s3.impl</name>
+ <value>org.apache.tajo.storage.s3.SmallBlockS3FileSystem</value>
+ </property>
+
+ <!-- Storage Manager Configuration -->
+ <property>
+ <name>tajo.storage.manager.hdfs.class</name>
+ <value>org.apache.tajo.storage.FileStorageManager</value>
+ </property>
+ <property>
+ <name>tajo.storage.manager.hbase.class</name>
+ <value>org.apache.tajo.storage.hbase.HBaseStorageManager</value>
+ </property>
+
+ <!--- Registered Scanner Handler -->
+ <property>
+ <name>tajo.storage.scanner-handler</name>
+ <value>csv,raw,rcfile,row,trevni,parquet,sequencefile,avro</value>
+ </property>
+
+ <!--- Fragment Class Configurations -->
+ <property>
+ <name>tajo.storage.fragment.csv.class</name>
+ <value>org.apache.tajo.storage.fragment.FileFragment</value>
+ </property>
+ <property>
+ <name>tajo.storage.fragment.raw.class</name>
+ <value>org.apache.tajo.storage.fragment.FileFragment</value>
+ </property>
+ <property>
+ <name>tajo.storage.fragment.rcfile.class</name>
+ <value>org.apache.tajo.storage.fragment.FileFragment</value>
+ </property>
+ <property>
+ <name>tajo.storage.fragment.row.class</name>
+ <value>org.apache.tajo.storage.fragment.FileFragment</value>
+ </property>
+ <property>
+ <name>tajo.storage.fragment.trevni.class</name>
+ <value>org.apache.tajo.storage.fragment.FileFragment</value>
+ </property>
+ <property>
+ <name>tajo.storage.fragment.parquet.class</name>
+ <value>org.apache.tajo.storage.FileFragment</value>
+ </property>
+ <property>
+ <name>tajo.storage.fragment.sequencefile.class</name>
+ <value>org.apache.tajo.storage.fragment.FileFragment</value>
+ </property>
+ <property>
+ <name>tajo.storage.fragment.avro.class</name>
+ <value>org.apache.tajo.storage.fragment.FileFragment</value>
+ </property>
+
+ <!--- Scanner Handler -->
+ <property>
+ <name>tajo.storage.scanner-handler.csv.class</name>
+ <value>org.apache.tajo.storage.CSVFile$CSVScanner</value>
+ </property>
+
+ <property>
+ <name>tajo.storage.scanner-handler.raw.class</name>
+ <value>org.apache.tajo.storage.RawFile$RawFileScanner</value>
+ </property>
+
+ <property>
+ <name>tajo.storage.scanner-handler.rcfile.class</name>
+ <value>org.apache.tajo.storage.rcfile.RCFile$RCFileScanner</value>
+ </property>
+
+ <property>
+ <name>tajo.storage.scanner-handler.rowfile.class</name>
+ <value>org.apache.tajo.storage.RowFile$RowFileScanner</value>
+ </property>
+
+ <property>
+ <name>tajo.storage.scanner-handler.trevni.class</name>
+ <value>org.apache.tajo.storage.trevni.TrevniScanner</value>
+ </property>
+
+ <property>
+ <name>tajo.storage.scanner-handler.parquet.class</name>
+ <value>org.apache.tajo.storage.parquet.ParquetScanner</value>
+ </property>
+
+ <property>
+ <name>tajo.storage.scanner-handler.sequencefile.class</name>
+ <value>org.apache.tajo.storage.sequencefile.SequenceFileScanner</value>
+ </property>
+
+ <property>
+ <name>tajo.storage.scanner-handler.avro.class</name>
+ <value>org.apache.tajo.storage.avro.AvroScanner</value>
+ </property>
+
+ <!--- Appender Handler -->
+ <property>
+ <name>tajo.storage.appender-handler</name>
+ <value>csv,raw,rcfile,row,trevni,parquet,sequencefile,avro</value>
+ </property>
+
+ <property>
+ <name>tajo.storage.appender-handler.csv.class</name>
+ <value>org.apache.tajo.storage.CSVFile$CSVAppender</value>
+ </property>
+
+ <property>
+ <name>tajo.storage.appender-handler.raw.class</name>
+ <value>org.apache.tajo.storage.RawFile$RawFileAppender</value>
+ </property>
+
+ <property>
+ <name>tajo.storage.appender-handler.rcfile.class</name>
+ <value>org.apache.tajo.storage.rcfile.RCFile$RCFileAppender</value>
+ </property>
+
+ <property>
+ <name>tajo.storage.appender-handler.rowfile.class</name>
+ <value>org.apache.tajo.storage.RowFile$RowFileAppender</value>
+ </property>
+
+ <property>
+ <name>tajo.storage.appender-handler.trevni.class</name>
+ <value>org.apache.tajo.storage.trevni.TrevniAppender</value>
+ </property>
+
+ <property>
+ <name>tajo.storage.appender-handler.parquet.class</name>
+ <value>org.apache.tajo.storage.parquet.ParquetAppender</value>
+ </property>
+
+ <property>
+ <name>tajo.storage.appender-handler.sequencefile.class</name>
+ <value>org.apache.tajo.storage.sequencefile.SequenceFileAppender</value>
+ </property>
+
+ <property>
+ <name>tajo.storage.appender-handler.avro.class</name>
+ <value>org.apache.tajo.storage.avro.AvroAppender</value>
+ </property>
+</configuration>
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hbase/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/pom.xml b/tajo-storage/tajo-storage-hbase/pom.xml
new file mode 100644
index 0000000..e37149d
--- /dev/null
+++ b/tajo-storage/tajo-storage-hbase/pom.xml
@@ -0,0 +1,349 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Copyright 2012 Database Lab., Korea Univ.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>tajo-project</artifactId>
+ <groupId>org.apache.tajo</groupId>
+ <version>0.9.1-SNAPSHOT</version>
+ <relativePath>../../tajo-project</relativePath>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>tajo-storage-hbase</artifactId>
+ <packaging>jar</packaging>
+ <name>Tajo HBase Storage</name>
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+ </properties>
+
+ <repositories>
+ <repository>
+ <id>repository.jboss.org</id>
+ <url>https://repository.jboss.org/nexus/content/repositories/releases/
+ </url>
+ <snapshots>
+ <enabled>false</enabled>
+ </snapshots>
+ </repository>
+ </repositories>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <source>1.6</source>
+ <target>1.6</target>
+ <encoding>${project.build.sourceEncoding}</encoding>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.rat</groupId>
+ <artifactId>apache-rat-plugin</artifactId>
+ <executions>
+ <execution>
+ <phase>verify</phase>
+ <goals>
+ <goal>check</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <systemProperties>
+ <tajo.test>TRUE</tajo.test>
+ </systemProperties>
+ <argLine>-Xms512m -Xmx1024m -XX:MaxPermSize=128m -Dfile.encoding=UTF-8</argLine>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <version>2.4</version>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-antrun-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>create-protobuf-generated-sources-directory</id>
+ <phase>initialize</phase>
+ <configuration>
+ <target>
+ <mkdir dir="target/generated-sources/proto" />
+ </target>
+ </configuration>
+ <goals>
+ <goal>run</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>exec-maven-plugin</artifactId>
+ <version>1.2</version>
+ <executions>
+ <execution>
+ <id>generate-sources</id>
+ <phase>generate-sources</phase>
+ <configuration>
+ <executable>protoc</executable>
+ <arguments>
+ <argument>-Isrc/main/proto/</argument>
+ <argument>--proto_path=../../tajo-common/src/main/proto</argument>
+ <argument>--proto_path=../../tajo-catalog/tajo-catalog-common/src/main/proto</argument>
+ <argument>--java_out=target/generated-sources/proto</argument>
+ <argument>src/main/proto/StorageFragmentProtos.proto</argument>
+ </arguments>
+ </configuration>
+ <goals>
+ <goal>exec</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <version>1.5</version>
+ <executions>
+ <execution>
+ <id>add-source</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>target/generated-sources/proto</source>
+ </sources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-report-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.tajo</groupId>
+ <artifactId>tajo-common</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tajo</groupId>
+ <artifactId>tajo-catalog-common</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tajo</groupId>
+ <artifactId>tajo-plan</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tajo</groupId>
+ <artifactId>tajo-storage-common</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <artifactId>zookeeper</artifactId>
+ <groupId>org.apache.zookeeper</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>slf4j-api</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>jersey-json</artifactId>
+ <groupId>com.sun.jersey</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>commons-el</groupId>
+ <artifactId>commons-el</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>tomcat</groupId>
+ <artifactId>jasper-runtime</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>tomcat</groupId>
+ <artifactId>jasper-compiler</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jsp-2.1-jetty</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jersey.jersey-test-framework</groupId>
+ <artifactId>jersey-test-framework-grizzly2</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-minicluster</artifactId>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>commons-el</groupId>
+ <artifactId>commons-el</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>tomcat</groupId>
+ <artifactId>jasper-runtime</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>tomcat</groupId>
+ <artifactId>jasper-compiler</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jsp-2.1-jetty</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jersey.jersey-test-framework</groupId>
+ <artifactId>jersey-test-framework-grizzly2</artifactId>
+ </exclusion>
+ <exclusion>
+ <artifactId>hadoop-yarn-server-tests</artifactId>
+ <groupId>org.apache.hadoop</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
+ <groupId>org.apache.hadoop</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>hadoop-mapreduce-client-app</artifactId>
+ <groupId>org.apache.hadoop</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>hadoop-yarn-api</artifactId>
+ <groupId>org.apache.hadoop</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>hadoop-mapreduce-client-hs</artifactId>
+ <groupId>org.apache.hadoop</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>hadoop-mapreduce-client-core</artifactId>
+ <groupId>org.apache.hadoop</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-core</artifactId>
+ <version>${hadoop.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-server</artifactId>
+ <version>${hbase.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-client</artifactId>
+ <version>${hbase.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+
+ <profiles>
+ <profile>
+ <id>docs</id>
+ <activation>
+ <activeByDefault>false</activeByDefault>
+ </activation>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-javadoc-plugin</artifactId>
+ <executions>
+ <execution>
+ <!-- build javadoc jars per jar for publishing to maven -->
+ <id>module-javadocs</id>
+ <phase>package</phase>
+ <goals>
+ <goal>jar</goal>
+ </goals>
+ <configuration>
+ <destDir>${project.build.directory}</destDir>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ </profiles>
+
+ <reporting>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-report-plugin</artifactId>
+ <version>2.15</version>
+ </plugin>
+ </plugins>
+ </reporting>
+</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AbstractHBaseAppender.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AbstractHBaseAppender.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AbstractHBaseAppender.java
new file mode 100644
index 0000000..8615235
--- /dev/null
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AbstractHBaseAppender.java
@@ -0,0 +1,223 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.hbase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.storage.Appender;
+import org.apache.tajo.storage.TableStatistics;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.util.TUtil;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * An abstract class for HBase appender.
+ */
+public abstract class AbstractHBaseAppender implements Appender {
+ protected Configuration conf;
+ protected Schema schema;
+ protected TableMeta meta;
+ protected QueryUnitAttemptId taskAttemptId;
+ protected Path stagingDir;
+ protected boolean inited = false;
+
+ protected ColumnMapping columnMapping;
+ protected TableStatistics stats;
+ protected boolean enabledStats;
+
+ protected int columnNum;
+
+ protected byte[][][] mappingColumnFamilies;
+ protected boolean[] isBinaryColumns;
+ protected boolean[] isRowKeyMappings;
+ protected boolean[] isColumnKeys;
+ protected boolean[] isColumnValues;
+ protected int[] rowKeyFieldIndexes;
+ protected int[] rowkeyColumnIndexes;
+ protected char rowKeyDelimiter;
+
+ // the following four variables are used for '<cfname>:key:' or '<cfname>:value:' mapping
+ protected int[] columnKeyValueDataIndexes;
+ protected byte[][] columnKeyDatas;
+ protected byte[][] columnValueDatas;
+ protected byte[][] columnKeyCfNames;
+
+ protected KeyValue[] keyValues;
+
+ public AbstractHBaseAppender(Configuration conf, QueryUnitAttemptId taskAttemptId,
+ Schema schema, TableMeta meta, Path stagingDir) {
+ this.conf = conf;
+ this.schema = schema;
+ this.meta = meta;
+ this.stagingDir = stagingDir;
+ this.taskAttemptId = taskAttemptId;
+ }
+
+ @Override
+ public void init() throws IOException {
+ if (inited) {
+ throw new IllegalStateException("FileAppender is already initialized.");
+ }
+ inited = true;
+ if (enabledStats) {
+ stats = new TableStatistics(this.schema);
+ }
+ columnMapping = new ColumnMapping(schema, meta);
+
+ mappingColumnFamilies = columnMapping.getMappingColumns();
+
+ isRowKeyMappings = columnMapping.getIsRowKeyMappings();
+ List<Integer> rowkeyColumnIndexList = new ArrayList<Integer>();
+ for (int i = 0; i < isRowKeyMappings.length; i++) {
+ if (isRowKeyMappings[i]) {
+ rowkeyColumnIndexList.add(i);
+ }
+ }
+ rowkeyColumnIndexes = TUtil.toArray(rowkeyColumnIndexList);
+
+ isBinaryColumns = columnMapping.getIsBinaryColumns();
+ isColumnKeys = columnMapping.getIsColumnKeys();
+ isColumnValues = columnMapping.getIsColumnValues();
+ rowKeyDelimiter = columnMapping.getRowKeyDelimiter();
+ rowKeyFieldIndexes = columnMapping.getRowKeyFieldIndexes();
+
+ this.columnNum = schema.size();
+
+ // In the case of '<cfname>:key:' or '<cfname>:value:' KeyValue object should be set with the qualifier and value
+ // which are mapped to the same column family.
+ columnKeyValueDataIndexes = new int[isColumnKeys.length];
+ int index = 0;
+ int numKeyValues = 0;
+ Map<String, Integer> cfNameIndexMap = new HashMap<String, Integer>();
+ for (int i = 0; i < isColumnKeys.length; i++) {
+ if (isRowKeyMappings[i]) {
+ continue;
+ }
+ if (isColumnKeys[i] || isColumnValues[i]) {
+ String cfName = new String(mappingColumnFamilies[i][0]);
+ if (!cfNameIndexMap.containsKey(cfName)) {
+ cfNameIndexMap.put(cfName, index);
+ columnKeyValueDataIndexes[i] = index;
+ index++;
+ numKeyValues++;
+ } else {
+ columnKeyValueDataIndexes[i] = cfNameIndexMap.get(cfName);
+ }
+ } else {
+ numKeyValues++;
+ }
+ }
+ columnKeyCfNames = new byte[cfNameIndexMap.size()][];
+ for (Map.Entry<String, Integer> entry: cfNameIndexMap.entrySet()) {
+ columnKeyCfNames[entry.getValue()] = entry.getKey().getBytes();
+ }
+ columnKeyDatas = new byte[cfNameIndexMap.size()][];
+ columnValueDatas = new byte[cfNameIndexMap.size()][];
+
+ keyValues = new KeyValue[numKeyValues];
+ }
+
+ private ByteArrayOutputStream bout = new ByteArrayOutputStream();
+
+ protected byte[] getRowKeyBytes(Tuple tuple) throws IOException {
+ Datum datum;
+ byte[] rowkey;
+ if (rowkeyColumnIndexes.length > 1) {
+ bout.reset();
+ for (int i = 0; i < rowkeyColumnIndexes.length; i++) {
+ datum = tuple.get(rowkeyColumnIndexes[i]);
+ if (isBinaryColumns[rowkeyColumnIndexes[i]]) {
+ rowkey = HBaseBinarySerializerDeserializer.serialize(schema.getColumn(rowkeyColumnIndexes[i]), datum);
+ } else {
+ rowkey = HBaseTextSerializerDeserializer.serialize(schema.getColumn(rowkeyColumnIndexes[i]), datum);
+ }
+ bout.write(rowkey);
+ if (i < rowkeyColumnIndexes.length - 1) {
+ bout.write(rowKeyDelimiter);
+ }
+ }
+ rowkey = bout.toByteArray();
+ } else {
+ int index = rowkeyColumnIndexes[0];
+ datum = tuple.get(index);
+ if (isBinaryColumns[index]) {
+ rowkey = HBaseBinarySerializerDeserializer.serialize(schema.getColumn(index), datum);
+ } else {
+ rowkey = HBaseTextSerializerDeserializer.serialize(schema.getColumn(index), datum);
+ }
+ }
+
+ return rowkey;
+ }
+
+ protected void readKeyValues(Tuple tuple, byte[] rowkey) throws IOException {
+ int keyValIndex = 0;
+ for (int i = 0; i < columnNum; i++) {
+ if (isRowKeyMappings[i]) {
+ continue;
+ }
+ Datum datum = tuple.get(i);
+ byte[] value;
+ if (isBinaryColumns[i]) {
+ value = HBaseBinarySerializerDeserializer.serialize(schema.getColumn(i), datum);
+ } else {
+ value = HBaseTextSerializerDeserializer.serialize(schema.getColumn(i), datum);
+ }
+
+ if (isColumnKeys[i]) {
+ columnKeyDatas[columnKeyValueDataIndexes[i]] = value;
+ } else if (isColumnValues[i]) {
+ columnValueDatas[columnKeyValueDataIndexes[i]] = value;
+ } else {
+ keyValues[keyValIndex] = new KeyValue(rowkey, mappingColumnFamilies[i][0], mappingColumnFamilies[i][1], value);
+ keyValIndex++;
+ }
+ }
+
+ for (int i = 0; i < columnKeyDatas.length; i++) {
+ keyValues[keyValIndex++] = new KeyValue(rowkey, columnKeyCfNames[i], columnKeyDatas[i], columnValueDatas[i]);
+ }
+ }
+
+ @Override
+ public void enableStats() {
+ enabledStats = true;
+ }
+
+ @Override
+ public TableStats getStats() {
+ if (enabledStats) {
+ return stats.getTableStat();
+ } else {
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AddSortForInsertRewriter.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AddSortForInsertRewriter.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AddSortForInsertRewriter.java
new file mode 100644
index 0000000..79161cc
--- /dev/null
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AddSortForInsertRewriter.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.hbase;
+
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.SortSpec;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.plan.LogicalPlan;
+import org.apache.tajo.plan.PlanningException;
+import org.apache.tajo.plan.logical.LogicalNode;
+import org.apache.tajo.plan.logical.LogicalRootNode;
+import org.apache.tajo.plan.logical.SortNode;
+import org.apache.tajo.plan.logical.SortNode.SortPurpose;
+import org.apache.tajo.plan.logical.UnaryNode;
+import org.apache.tajo.plan.rewrite.RewriteRule;
+import org.apache.tajo.plan.util.PlannerUtil;
+
+public class AddSortForInsertRewriter implements RewriteRule {
+ private int[] sortColumnIndexes;
+ private Column[] sortColumns;
+ public AddSortForInsertRewriter(TableDesc tableDesc, Column[] sortColumns) {
+ this.sortColumns = sortColumns;
+ this.sortColumnIndexes = new int[sortColumns.length];
+
+ Schema tableSchema = tableDesc.getSchema();
+ for (int i = 0; i < sortColumns.length; i++) {
+ sortColumnIndexes[i] = tableSchema.getColumnId(sortColumns[i].getQualifiedName());
+ }
+ }
+
+ @Override
+ public String getName() {
+ return "AddSortForInsertRewriter";
+ }
+
+ @Override
+ public boolean isEligible(LogicalPlan plan) {
+ StoreType storeType = PlannerUtil.getStoreType(plan);
+ return storeType != null;
+ }
+
+ @Override
+ public LogicalPlan rewrite(LogicalPlan plan) throws PlanningException {
+ LogicalRootNode rootNode = plan.getRootBlock().getRoot();
+ UnaryNode insertNode = rootNode.getChild();
+ LogicalNode childNode = insertNode.getChild();
+
+ Schema sortSchema = childNode.getOutSchema();
+ SortNode sortNode = plan.createNode(SortNode.class);
+ sortNode.setSortPurpose(SortPurpose.STORAGE_SPECIFIED);
+ sortNode.setInSchema(sortSchema);
+ sortNode.setOutSchema(sortSchema);
+
+ SortSpec[] sortSpecs = new SortSpec[sortColumns.length];
+ int index = 0;
+
+ for (int i = 0; i < sortColumnIndexes.length; i++) {
+ Column sortColumn = sortSchema.getColumn(sortColumnIndexes[i]);
+ if (sortColumn == null) {
+ throw new PlanningException("Can't fine proper sort column:" + sortColumns[i]);
+ }
+ sortSpecs[index++] = new SortSpec(sortColumn, true, true);
+ }
+ sortNode.setSortSpecs(sortSpecs);
+
+ sortNode.setChild(insertNode.getChild());
+ insertNode.setChild(sortNode);
+ plan.getRootBlock().registerNode(sortNode);
+
+ return plan;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/ColumnMapping.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/ColumnMapping.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/ColumnMapping.java
new file mode 100644
index 0000000..7ddf09a
--- /dev/null
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/ColumnMapping.java
@@ -0,0 +1,236 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.hbase;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.util.BytesUtils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class ColumnMapping {
+ private TableMeta tableMeta;
+ private Schema schema;
+ private char rowKeyDelimiter;
+
+ private String hbaseTableName;
+
+ private int[] rowKeyFieldIndexes;
+ private boolean[] isRowKeyMappings;
+ private boolean[] isBinaryColumns;
+ private boolean[] isColumnKeys;
+ private boolean[] isColumnValues;
+
+ // schema order -> 0: cf name, 1: column name -> name bytes
+ private byte[][][] mappingColumns;
+
+ private int numRowKeys;
+
+ public ColumnMapping(Schema schema, TableMeta tableMeta) throws IOException {
+ this.schema = schema;
+ this.tableMeta = tableMeta;
+
+ init();
+ }
+
+ public void init() throws IOException {
+ hbaseTableName = tableMeta.getOption(HBaseStorageConstants.META_TABLE_KEY);
+ String delim = tableMeta.getOption(HBaseStorageConstants.META_ROWKEY_DELIMITER, "").trim();
+ if (delim.length() > 0) {
+ rowKeyDelimiter = delim.charAt(0);
+ }
+ isRowKeyMappings = new boolean[schema.size()];
+ rowKeyFieldIndexes = new int[schema.size()];
+ isBinaryColumns = new boolean[schema.size()];
+ isColumnKeys = new boolean[schema.size()];
+ isColumnValues = new boolean[schema.size()];
+
+ mappingColumns = new byte[schema.size()][][];
+
+ for (int i = 0; i < schema.size(); i++) {
+ rowKeyFieldIndexes[i] = -1;
+ }
+
+ String columnMapping = tableMeta.getOption(HBaseStorageConstants.META_COLUMNS_KEY, "");
+ if (columnMapping == null || columnMapping.isEmpty()) {
+ throw new IOException("'columns' property is required.");
+ }
+
+ String[] columnMappingTokens = columnMapping.split(",");
+
+ if (columnMappingTokens.length != schema.getColumns().size()) {
+ throw new IOException("The number of mapped HBase columns is great than the number of Tajo table columns");
+ }
+
+ int index = 0;
+ for (String eachToken: columnMappingTokens) {
+ mappingColumns[index] = new byte[2][];
+
+ byte[][] mappingTokens = BytesUtils.splitPreserveAllTokens(eachToken.trim().getBytes(), ':');
+
+ if (mappingTokens.length == 3) {
+ if (mappingTokens[0].length == 0) {
+ // cfname
+ throw new IOException(eachToken + " 'column' attribute should be '<cfname>:key:' or '<cfname>:key:#b' " +
+ "or '<cfname>:value:' or '<cfname>:value:#b'");
+ }
+ //<cfname>:key: or <cfname>:value:
+ if (mappingTokens[2].length != 0) {
+ String binaryOption = new String(mappingTokens[2]);
+ if ("#b".equals(binaryOption)) {
+ isBinaryColumns[index] = true;
+ } else {
+ throw new IOException(eachToken + " 'column' attribute should be '<cfname>:key:' or '<cfname>:key:#b' " +
+ "or '<cfname>:value:' or '<cfname>:value:#b'");
+ }
+ }
+ mappingColumns[index][0] = mappingTokens[0];
+ String keyOrValue = new String(mappingTokens[1]);
+ if (HBaseStorageConstants.KEY_COLUMN_MAPPING.equalsIgnoreCase(keyOrValue)) {
+ isColumnKeys[index] = true;
+ } else if (HBaseStorageConstants.VALUE_COLUMN_MAPPING.equalsIgnoreCase(keyOrValue)) {
+ isColumnValues[index] = true;
+ } else {
+ throw new IOException(eachToken + " 'column' attribute should be '<cfname>:key:' or '<cfname>:value:'");
+ }
+ } else if (mappingTokens.length == 2) {
+ //<cfname>: or <cfname>:<qualifier> or :key
+ String cfName = new String(mappingTokens[0]);
+ String columnName = new String(mappingTokens[1]);
+ RowKeyMapping rowKeyMapping = getRowKeyMapping(cfName, columnName);
+ if (rowKeyMapping != null) {
+ isRowKeyMappings[index] = true;
+ numRowKeys++;
+ isBinaryColumns[index] = rowKeyMapping.isBinary();
+ if (!cfName.isEmpty()) {
+ if (rowKeyDelimiter == 0) {
+ throw new IOException("hbase.rowkey.delimiter is required.");
+ }
+ rowKeyFieldIndexes[index] = Integer.parseInt(cfName);
+ } else {
+ rowKeyFieldIndexes[index] = -1; //rowkey is mapped a single column.
+ }
+ } else {
+ if (cfName.isEmpty()) {
+ throw new IOException(eachToken + " 'column' attribute should be '<cfname>:key:' or '<cfname>:value:'");
+ }
+ if (cfName != null) {
+ mappingColumns[index][0] = Bytes.toBytes(cfName);
+ }
+
+ if (columnName != null && !columnName.isEmpty()) {
+ String[] columnNameTokens = columnName.split("#");
+ if (columnNameTokens[0].isEmpty()) {
+ mappingColumns[index][1] = null;
+ } else {
+ mappingColumns[index][1] = Bytes.toBytes(columnNameTokens[0]);
+ }
+ if (columnNameTokens.length == 2 && "b".equals(columnNameTokens[1])) {
+ isBinaryColumns[index] = true;
+ }
+ }
+ }
+ } else {
+ throw new IOException(eachToken + " 'column' attribute '[cfname]:[qualfier]:'");
+ }
+
+ index++;
+ } // for loop
+ }
+
+ public List<String> getColumnFamilyNames() {
+ List<String> cfNames = new ArrayList<String>();
+
+ for (byte[][] eachCfName: mappingColumns) {
+ if (eachCfName != null && eachCfName.length > 0 && eachCfName[0] != null) {
+ String cfName = new String(eachCfName[0]);
+ if (!cfNames.contains(cfName)) {
+ cfNames.add(cfName);
+ }
+ }
+ }
+
+ return cfNames;
+ }
+
+ private RowKeyMapping getRowKeyMapping(String cfName, String columnName) {
+ if (columnName == null || columnName.isEmpty()) {
+ return null;
+ }
+
+ String[] tokens = columnName.split("#");
+ if (!tokens[0].equalsIgnoreCase(HBaseStorageConstants.KEY_COLUMN_MAPPING)) {
+ return null;
+ }
+
+ RowKeyMapping rowKeyMapping = new RowKeyMapping();
+
+ if (tokens.length == 2 && "b".equals(tokens[1])) {
+ rowKeyMapping.setBinary(true);
+ }
+
+ if (cfName != null && !cfName.isEmpty()) {
+ rowKeyMapping.setKeyFieldIndex(Integer.parseInt(cfName));
+ }
+ return rowKeyMapping;
+ }
+
+ public char getRowKeyDelimiter() {
+ return rowKeyDelimiter;
+ }
+
+ public int[] getRowKeyFieldIndexes() {
+ return rowKeyFieldIndexes;
+ }
+
+ public boolean[] getIsRowKeyMappings() {
+ return isRowKeyMappings;
+ }
+
+ public byte[][][] getMappingColumns() {
+ return mappingColumns;
+ }
+
+ public Schema getSchema() {
+ return schema;
+ }
+
+ public boolean[] getIsBinaryColumns() {
+ return isBinaryColumns;
+ }
+
+ public String getHbaseTableName() {
+ return hbaseTableName;
+ }
+
+ public boolean[] getIsColumnKeys() {
+ return isColumnKeys;
+ }
+
+ public int getNumRowKeys() {
+ return numRowKeys;
+ }
+
+ public boolean[] getIsColumnValues() {
+ return isColumnValues;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseBinarySerializerDeserializer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseBinarySerializerDeserializer.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseBinarySerializerDeserializer.java
new file mode 100644
index 0000000..c05c5bb
--- /dev/null
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseBinarySerializerDeserializer.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.hbase;
+
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.util.Bytes;
+
+import java.io.IOException;
+
+public class HBaseBinarySerializerDeserializer {
+
+ public static Datum deserialize(Column col, byte[] bytes) throws IOException {
+ Datum datum;
+ switch (col.getDataType().getType()) {
+ case INT1:
+ case INT2:
+ datum = bytes == null || bytes.length == 0 ? NullDatum.get() : DatumFactory.createInt2(Bytes.toShort(bytes));
+ break;
+ case INT4:
+ datum = bytes == null || bytes.length == 0 ? NullDatum.get() : DatumFactory.createInt4(Bytes.toInt(bytes));
+ break;
+ case INT8:
+ if (bytes.length == 4) {
+ datum = bytes == null || bytes.length == 0 ? NullDatum.get() : DatumFactory.createInt8(Bytes.toInt(bytes));
+ } else {
+ datum = bytes == null || bytes.length == 0 ? NullDatum.get() : DatumFactory.createInt8(Bytes.toLong(bytes));
+ }
+ break;
+ case FLOAT4:
+ datum = bytes == null || bytes.length == 0 ? NullDatum.get() : DatumFactory.createFloat4(Bytes.toFloat(bytes));
+ break;
+ case FLOAT8:
+ datum = bytes == null || bytes.length == 0 ? NullDatum.get() : DatumFactory.createFloat8(Bytes.toDouble(bytes));
+ break;
+ case TEXT:
+ datum = bytes == null ? NullDatum.get() : DatumFactory.createText(bytes);
+ break;
+ default:
+ datum = NullDatum.get();
+ break;
+ }
+ return datum;
+ }
+
+ public static byte[] serialize(Column col, Datum datum) throws IOException {
+ if (datum == null || datum instanceof NullDatum) {
+ return null;
+ }
+
+ byte[] bytes;
+ switch (col.getDataType().getType()) {
+ case INT1:
+ case INT2:
+ bytes = Bytes.toBytes(datum.asInt2());
+ break;
+ case INT4:
+ bytes = Bytes.toBytes(datum.asInt4());
+ break;
+ case INT8:
+ bytes = Bytes.toBytes(datum.asInt8());
+ break;
+ case FLOAT4:
+ bytes = Bytes.toBytes(datum.asFloat4());
+ break;
+ case FLOAT8:
+ bytes = Bytes.toBytes(datum.asFloat8());
+ break;
+ case TEXT:
+ bytes = Bytes.toBytes(datum.asChars());
+ break;
+ default:
+ bytes = null;
+ break;
+ }
+
+ return bytes;
+ }
+}