You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hj...@apache.org on 2014/12/03 06:30:25 UTC
[10/30] tajo git commit: TAJO-1122: Refactor the tajo-storage project
structure.
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTextSerializerDeserializer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTextSerializerDeserializer.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTextSerializerDeserializer.java
new file mode 100644
index 0000000..a0ad492
--- /dev/null
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTextSerializerDeserializer.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.hbase;
+
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.util.NumberUtil;
+
+import java.io.IOException;
+
+public class HBaseTextSerializerDeserializer {
+ public static Datum deserialize(Column col, byte[] bytes) throws IOException {
+ Datum datum;
+ switch (col.getDataType().getType()) {
+ case INT1:
+ case INT2:
+ datum = bytes == null || bytes.length == 0 ? NullDatum.get() :
+ DatumFactory.createInt2((short)NumberUtil.parseInt(bytes, 0, bytes.length));
+ break;
+ case INT4:
+ datum = bytes == null || bytes.length == 0 ? NullDatum.get() :
+ DatumFactory.createInt4(NumberUtil.parseInt(bytes, 0, bytes.length));
+ break;
+ case INT8:
+ datum = bytes == null || bytes.length == 0 ? NullDatum.get() :
+ DatumFactory.createInt8(new String(bytes, 0, bytes.length));
+ break;
+ case FLOAT4:
+ datum = bytes == null || bytes.length == 0 ? NullDatum.get() :
+ DatumFactory.createFloat4(new String(bytes, 0, bytes.length));
+ break;
+ case FLOAT8:
+ datum = bytes == null || bytes.length == 0 ? NullDatum.get() :
+ DatumFactory.createFloat8(NumberUtil.parseDouble(bytes, 0, bytes.length));
+ break;
+ case TEXT:
+ datum = bytes == null ? NullDatum.get() : DatumFactory.createText(bytes);
+ break;
+ default:
+ datum = NullDatum.get();
+ break;
+ }
+ return datum;
+ }
+
+ public static byte[] serialize(Column col, Datum datum) throws IOException {
+ if (datum == null || datum instanceof NullDatum) {
+ return null;
+ }
+
+ return datum.asChars().getBytes();
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HFileAppender.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HFileAppender.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HFileAppender.java
new file mode 100644
index 0000000..07f7988
--- /dev/null
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HFileAppender.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.hbase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.TajoConstants;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.util.Bytes;
+
+import java.io.IOException;
+import java.util.TreeSet;
+
+public class HFileAppender extends AbstractHBaseAppender {
+ private static final Log LOG = LogFactory.getLog(HFileAppender.class);
+
+ private RecordWriter<ImmutableBytesWritable, Cell> writer;
+ private TaskAttemptContext writerContext;
+ private Path workingFilePath;
+ private FileOutputCommitter committer;
+
+ public HFileAppender(Configuration conf, QueryUnitAttemptId taskAttemptId,
+ Schema schema, TableMeta meta, Path stagingDir) {
+ super(conf, taskAttemptId, schema, meta, stagingDir);
+ }
+
+ @Override
+ public void init() throws IOException {
+ super.init();
+
+ Configuration taskConf = new Configuration();
+ Path stagingResultDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME);
+ taskConf.set(FileOutputFormat.OUTDIR, stagingResultDir.toString());
+
+ ExecutionBlockId ebId = taskAttemptId.getQueryUnitId().getExecutionBlockId();
+ writerContext = new TaskAttemptContextImpl(taskConf,
+ new TaskAttemptID(ebId.getQueryId().toString(), ebId.getId(), TaskType.MAP,
+ taskAttemptId.getQueryUnitId().getId(), taskAttemptId.getId()));
+
+ HFileOutputFormat2 hFileOutputFormat2 = new HFileOutputFormat2();
+ try {
+ writer = hFileOutputFormat2.getRecordWriter(writerContext);
+
+ committer = new FileOutputCommitter(FileOutputFormat.getOutputPath(writerContext), writerContext);
+ workingFilePath = committer.getWorkPath();
+ } catch (InterruptedException e) {
+ throw new IOException(e.getMessage(), e);
+ }
+
+ LOG.info("Created hbase file writer: " + workingFilePath);
+ }
+
+ long totalNumBytes = 0;
+ ImmutableBytesWritable keyWritable = new ImmutableBytesWritable();
+ boolean first = true;
+ TreeSet<KeyValue> kvSet = new TreeSet<KeyValue>(KeyValue.COMPARATOR);
+
+
+ @Override
+ public void addTuple(Tuple tuple) throws IOException {
+ Datum datum;
+
+ byte[] rowkey = getRowKeyBytes(tuple);
+
+ if (!first && !Bytes.equals(keyWritable.get(), 0, keyWritable.getLength(), rowkey, 0, rowkey.length)) {
+ try {
+ for (KeyValue kv : kvSet) {
+ writer.write(keyWritable, kv);
+ totalNumBytes += keyWritable.getLength() + kv.getLength();
+ }
+ kvSet.clear();
+ // Statistical section
+ if (enabledStats) {
+ stats.incrementRow();
+ }
+ } catch (InterruptedException e) {
+ LOG.error(e.getMessage(), e);
+ }
+ }
+
+ first = false;
+
+ keyWritable.set(rowkey);
+
+ readKeyValues(tuple, rowkey);
+ if (keyValues != null) {
+ for (KeyValue eachKeyVal: keyValues) {
+ kvSet.add(eachKeyVal);
+ }
+ }
+ }
+
+ @Override
+ public void flush() throws IOException {
+ }
+
+ @Override
+ public long getEstimatedOutputSize() throws IOException {
+ // StoreTableExec uses this value as rolling file length
+ // Not rolling
+ return 0;
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (!kvSet.isEmpty()) {
+ try {
+ for (KeyValue kv : kvSet) {
+ writer.write(keyWritable, kv);
+ totalNumBytes += keyWritable.getLength() + keyWritable.getLength();
+ }
+ kvSet.clear();
+ // Statistical section
+ if (enabledStats) {
+ stats.incrementRow();
+ }
+ } catch (InterruptedException e) {
+ LOG.error(e.getMessage(), e);
+ }
+ }
+
+ if (enabledStats) {
+ stats.setNumBytes(totalNumBytes);
+ }
+ if (writer != null) {
+ try {
+ writer.close(writerContext);
+ committer.commitTask(writerContext);
+ } catch (InterruptedException e) {
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/IndexPredication.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/IndexPredication.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/IndexPredication.java
new file mode 100644
index 0000000..3a58e50
--- /dev/null
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/IndexPredication.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.hbase;
+
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.datum.Datum;
+
+public class IndexPredication {
+ private Column column;
+ private int columnId;
+ private Datum startValue;
+ private Datum stopValue;
+
+ public Column getColumn() {
+ return column;
+ }
+
+ public void setColumn(Column column) {
+ this.column = column;
+ }
+
+ public int getColumnId() {
+ return columnId;
+ }
+
+ public void setColumnId(int columnId) {
+ this.columnId = columnId;
+ }
+
+ public Datum getStartValue() {
+ return startValue;
+ }
+
+ public void setStartValue(Datum startValue) {
+ this.startValue = startValue;
+ }
+
+ public Datum getStopValue() {
+ return stopValue;
+ }
+
+ public void setStopValue(Datum stopValue) {
+ this.stopValue = stopValue;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/RowKeyMapping.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/RowKeyMapping.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/RowKeyMapping.java
new file mode 100644
index 0000000..4577703
--- /dev/null
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/RowKeyMapping.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.hbase;
+
+public class RowKeyMapping {
+ private boolean isBinary;
+ private int keyFieldIndex;
+
+ public boolean isBinary() {
+ return isBinary;
+ }
+
+ public void setBinary(boolean isBinary) {
+ this.isBinary = isBinary;
+ }
+
+ public int getKeyFieldIndex() {
+ return keyFieldIndex;
+ }
+
+ public void setKeyFieldIndex(int keyFieldIndex) {
+ this.keyFieldIndex = keyFieldIndex;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hbase/src/main/proto/StorageFragmentProtos.proto
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/main/proto/StorageFragmentProtos.proto b/tajo-storage/tajo-storage-hbase/src/main/proto/StorageFragmentProtos.proto
new file mode 100644
index 0000000..668b116
--- /dev/null
+++ b/tajo-storage/tajo-storage-hbase/src/main/proto/StorageFragmentProtos.proto
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+option java_package = "org.apache.tajo.storage.hbase";
+option java_outer_classname = "StorageFragmentProtos";
+option optimize_for = SPEED;
+option java_generic_services = false;
+option java_generate_equals_and_hash = true;
+
+import "CatalogProtos.proto";
+
+message HBaseFragmentProto {
+ required string tableName = 1;
+ required string hbaseTableName = 2;
+ required bytes startRow = 3;
+ required bytes stopRow = 4;
+ required bool last = 5;
+ required int64 length = 6;
+ optional string regionLocation = 7;
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hbase/src/test/java/org/apache/tajo/storage/hbase/TestColumnMapping.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/test/java/org/apache/tajo/storage/hbase/TestColumnMapping.java b/tajo-storage/tajo-storage-hbase/src/test/java/org/apache/tajo/storage/hbase/TestColumnMapping.java
new file mode 100644
index 0000000..68939d6
--- /dev/null
+++ b/tajo-storage/tajo-storage-hbase/src/test/java/org/apache/tajo/storage/hbase/TestColumnMapping.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.hbase;
+
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.util.KeyValueSet;
+import org.junit.Test;
+
+import java.util.List;
+
+import static org.junit.Assert.*;
+
+public class TestColumnMapping {
+ @Test
+ public void testColumnKeyValueMapping() throws Exception {
+ KeyValueSet keyValueSet = new KeyValueSet();
+ keyValueSet.set(HBaseStorageConstants.META_TABLE_KEY, "test");
+ keyValueSet.set(HBaseStorageConstants.META_COLUMNS_KEY, ":key,col2:key:,col2:value:#b,col3:");
+
+ Schema schema = new Schema();
+ schema.addColumn("c1", Type.TEXT);
+ schema.addColumn("c2", Type.TEXT);
+ schema.addColumn("c3", Type.TEXT);
+ schema.addColumn("c4", Type.TEXT);
+
+ TableMeta tableMeta = new TableMeta(StoreType.HBASE, keyValueSet);
+
+ ColumnMapping columnMapping = new ColumnMapping(schema, tableMeta);
+
+ List<String> cfNames = columnMapping.getColumnFamilyNames();
+ assertEquals(2, cfNames.size());
+ assertEquals("col2", cfNames.get(0));
+ assertEquals("col3", cfNames.get(1));
+
+ for (int i = 0; i < columnMapping.getIsBinaryColumns().length; i++) {
+ if (i == 2) {
+ assertTrue(columnMapping.getIsBinaryColumns()[i]);
+ } else {
+ assertFalse(columnMapping.getIsBinaryColumns()[i]);
+ }
+ }
+
+ for (int i = 0; i < columnMapping.getIsRowKeyMappings().length; i++) {
+ if (i == 0) {
+ assertTrue(columnMapping.getIsRowKeyMappings()[i]);
+ } else {
+ assertFalse(columnMapping.getIsRowKeyMappings()[i]);
+ }
+ }
+
+ String[] expectedColumnNames = { null, null, null, null};
+ for (int i = 0; i < schema.size(); i++) {
+ String columnName = columnMapping.getMappingColumns()[i][1] == null ? null :
+ new String(columnMapping.getMappingColumns()[i][1]);
+ assertEquals(expectedColumnNames[i], columnName);
+ }
+
+ for (int i = 0; i < schema.size(); i++) {
+ if (i == 1) {
+ assertTrue(columnMapping.getIsColumnKeys()[i]);
+ } else {
+ assertFalse(columnMapping.getIsColumnKeys()[i]);
+ }
+ }
+
+ for (int i = 0; i < schema.size(); i++) {
+ if (i == 2) {
+ assertTrue(columnMapping.getIsColumnValues()[i]);
+ } else {
+ assertFalse(columnMapping.getIsColumnValues()[i]);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hbase/src/test/java/org/apache/tajo/storage/hbase/TestHBaseStorageManager.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/test/java/org/apache/tajo/storage/hbase/TestHBaseStorageManager.java b/tajo-storage/tajo-storage-hbase/src/test/java/org/apache/tajo/storage/hbase/TestHBaseStorageManager.java
new file mode 100644
index 0000000..1fc4065
--- /dev/null
+++ b/tajo-storage/tajo-storage-hbase/src/test/java/org/apache/tajo/storage/hbase/TestHBaseStorageManager.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.hbase;
+
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.TextDatum;
+import org.apache.tajo.plan.expr.*;
+import org.apache.tajo.plan.logical.ScanNode;
+import org.apache.tajo.storage.StorageManager;
+import org.apache.tajo.util.Pair;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class TestHBaseStorageManager {
+ @Test
+ public void testGetIndexPredications() throws Exception {
+ Column rowkeyColumn = new Column("rk", Type.TEXT);
+ // where rk >= '020' and rk <= '055'
+ ScanNode scanNode = new ScanNode(1);
+ EvalNode evalNode1 = new BinaryEval(EvalType.GEQ, new FieldEval(rowkeyColumn), new ConstEval(new TextDatum("020")));
+ EvalNode evalNode2 = new BinaryEval(EvalType.LEQ, new FieldEval(rowkeyColumn), new ConstEval(new TextDatum("055")));
+ EvalNode evalNodeA = new BinaryEval(EvalType.AND, evalNode1, evalNode2);
+ scanNode.setQual(evalNodeA);
+
+ HBaseStorageManager storageManager =
+ (HBaseStorageManager) StorageManager.getStorageManager(new TajoConf(), StoreType.HBASE);
+ List<Set<EvalNode>> indexEvals = storageManager.findIndexablePredicateSet(scanNode, new Column[]{rowkeyColumn});
+ assertNotNull(indexEvals);
+ assertEquals(1, indexEvals.size());
+ Pair<Datum, Datum> indexPredicateValue = storageManager.getIndexablePredicateValue(null, indexEvals.get(0));
+ assertEquals("020", indexPredicateValue.getFirst().asChars());
+ assertEquals("055", indexPredicateValue.getSecond().asChars());
+
+ // where (rk >= '020' and rk <= '055') or rk = '075'
+ EvalNode evalNode3 = new BinaryEval(EvalType.EQUAL, new FieldEval(rowkeyColumn),new ConstEval(new TextDatum("075")));
+ EvalNode evalNodeB = new BinaryEval(EvalType.OR, evalNodeA, evalNode3);
+ scanNode.setQual(evalNodeB);
+ indexEvals = storageManager.findIndexablePredicateSet(scanNode, new Column[]{rowkeyColumn});
+ assertEquals(2, indexEvals.size());
+ indexPredicateValue = storageManager.getIndexablePredicateValue(null, indexEvals.get(0));
+ assertEquals("020", indexPredicateValue.getFirst().asChars());
+ assertEquals("055", indexPredicateValue.getSecond().asChars());
+
+ indexPredicateValue = storageManager.getIndexablePredicateValue(null, indexEvals.get(1));
+ assertEquals("075", indexPredicateValue.getFirst().asChars());
+ assertEquals("075", indexPredicateValue.getSecond().asChars());
+
+ // where (rk >= '020' and rk <= '055') or (rk >= '072' and rk <= '078')
+ EvalNode evalNode4 = new BinaryEval(EvalType.GEQ, new FieldEval(rowkeyColumn), new ConstEval(new TextDatum("072")));
+ EvalNode evalNode5 = new BinaryEval(EvalType.LEQ, new FieldEval(rowkeyColumn), new ConstEval(new TextDatum("078")));
+ EvalNode evalNodeC = new BinaryEval(EvalType.AND, evalNode4, evalNode5);
+ EvalNode evalNodeD = new BinaryEval(EvalType.OR, evalNodeA, evalNodeC);
+ scanNode.setQual(evalNodeD);
+ indexEvals = storageManager.findIndexablePredicateSet(scanNode, new Column[]{rowkeyColumn});
+ assertEquals(2, indexEvals.size());
+
+ indexPredicateValue = storageManager.getIndexablePredicateValue(null, indexEvals.get(0));
+ assertEquals("020", indexPredicateValue.getFirst().asChars());
+ assertEquals("055", indexPredicateValue.getSecond().asChars());
+
+ indexPredicateValue = storageManager.getIndexablePredicateValue(null, indexEvals.get(1));
+ assertEquals("072", indexPredicateValue.getFirst().asChars());
+ assertEquals("078", indexPredicateValue.getSecond().asChars());
+
+ // where (rk >= '020' and rk <= '055') or (rk >= '072' and rk <= '078' and rk >= '073')
+ evalNode4 = new BinaryEval(EvalType.GEQ, new FieldEval(rowkeyColumn), new ConstEval(new TextDatum("072")));
+ evalNode5 = new BinaryEval(EvalType.LEQ, new FieldEval(rowkeyColumn), new ConstEval(new TextDatum("078")));
+ evalNodeC = new BinaryEval(EvalType.AND, evalNode4, evalNode5);
+ EvalNode evalNode6 = new BinaryEval(EvalType.GEQ, new FieldEval(rowkeyColumn), new ConstEval(new TextDatum("073")));
+ evalNodeD = new BinaryEval(EvalType.AND, evalNodeC, evalNode6);
+ EvalNode evalNodeE = new BinaryEval(EvalType.OR, evalNodeA, evalNodeD);
+ scanNode.setQual(evalNodeE);
+ indexEvals = storageManager.findIndexablePredicateSet(scanNode, new Column[]{rowkeyColumn});
+ assertEquals(2, indexEvals.size());
+
+ indexPredicateValue = storageManager.getIndexablePredicateValue(null, indexEvals.get(0));
+ assertEquals("020", indexPredicateValue.getFirst().asChars());
+ assertEquals("055", indexPredicateValue.getSecond().asChars());
+
+ indexPredicateValue = storageManager.getIndexablePredicateValue(null, indexEvals.get(1));
+ assertEquals("073", indexPredicateValue.getFirst().asChars());
+ assertEquals("078", indexPredicateValue.getSecond().asChars());
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hdfs/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/pom.xml b/tajo-storage/tajo-storage-hdfs/pom.xml
new file mode 100644
index 0000000..5105ac5
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/pom.xml
@@ -0,0 +1,380 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Copyright 2012 Database Lab., Korea Univ.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>tajo-project</artifactId>
+ <groupId>org.apache.tajo</groupId>
+ <version>0.9.1-SNAPSHOT</version>
+ <relativePath>../../tajo-project</relativePath>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>tajo-storage-hdfs</artifactId>
+ <packaging>jar</packaging>
+ <name>Tajo HDFS Storage</name>
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+ <parquet.version>1.5.0</parquet.version>
+ <parquet.format.version>2.1.0</parquet.format.version>
+ </properties>
+
+ <repositories>
+ <repository>
+ <id>repository.jboss.org</id>
+ <url>https://repository.jboss.org/nexus/content/repositories/releases/
+ </url>
+ <snapshots>
+ <enabled>false</enabled>
+ </snapshots>
+ </repository>
+ </repositories>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <source>1.6</source>
+ <target>1.6</target>
+ <encoding>${project.build.sourceEncoding}</encoding>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.rat</groupId>
+ <artifactId>apache-rat-plugin</artifactId>
+ <executions>
+ <execution>
+ <phase>verify</phase>
+ <goals>
+ <goal>check</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <excludes>
+ <exclude>src/test/resources/testVariousTypes.avsc</exclude>
+ </excludes>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <systemProperties>
+ <tajo.test>TRUE</tajo.test>
+ </systemProperties>
+ <argLine>-Xms512m -Xmx1024m -XX:MaxPermSize=128m -Dfile.encoding=UTF-8</argLine>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <version>2.4</version>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-antrun-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>create-protobuf-generated-sources-directory</id>
+ <phase>initialize</phase>
+ <configuration>
+ <target>
+ <mkdir dir="target/generated-sources/proto" />
+ </target>
+ </configuration>
+ <goals>
+ <goal>run</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>exec-maven-plugin</artifactId>
+ <version>1.2</version>
+ <executions>
+ <execution>
+ <id>generate-sources</id>
+ <phase>generate-sources</phase>
+ <configuration>
+ <executable>protoc</executable>
+ <arguments>
+ <argument>-Isrc/main/proto/</argument>
+ <argument>--proto_path=../../tajo-common/src/main/proto</argument>
+ <argument>--proto_path=../../tajo-catalog/tajo-catalog-common/src/main/proto</argument>
+ <argument>--java_out=target/generated-sources/proto</argument>
+ <argument>src/main/proto/StorageFragmentProtos.proto</argument>
+ </arguments>
+ </configuration>
+ <goals>
+ <goal>exec</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <version>1.5</version>
+ <executions>
+ <execution>
+ <id>add-source</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>target/generated-sources/proto</source>
+ </sources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-report-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.tajo</groupId>
+ <artifactId>tajo-common</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tajo</groupId>
+ <artifactId>tajo-catalog-common</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tajo</groupId>
+ <artifactId>tajo-plan</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tajo</groupId>
+ <artifactId>tajo-storage-common</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>trevni-core</artifactId>
+ <version>1.7.3</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>trevni-avro</artifactId>
+ <version>1.7.3</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-core</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <artifactId>zookeeper</artifactId>
+ <groupId>org.apache.zookeeper</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>slf4j-api</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>jersey-json</artifactId>
+ <groupId>com.sun.jersey</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>commons-el</groupId>
+ <artifactId>commons-el</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>tomcat</groupId>
+ <artifactId>jasper-runtime</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>tomcat</groupId>
+ <artifactId>jasper-compiler</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jsp-2.1-jetty</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jersey.jersey-test-framework</groupId>
+ <artifactId>jersey-test-framework-grizzly2</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-minicluster</artifactId>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>commons-el</groupId>
+ <artifactId>commons-el</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>tomcat</groupId>
+ <artifactId>jasper-runtime</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>tomcat</groupId>
+ <artifactId>jasper-compiler</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jsp-2.1-jetty</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jersey.jersey-test-framework</groupId>
+ <artifactId>jersey-test-framework-grizzly2</artifactId>
+ </exclusion>
+ <exclusion>
+ <artifactId>hadoop-yarn-server-tests</artifactId>
+ <groupId>org.apache.hadoop</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
+ <groupId>org.apache.hadoop</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>hadoop-mapreduce-client-app</artifactId>
+ <groupId>org.apache.hadoop</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>hadoop-yarn-api</artifactId>
+ <groupId>org.apache.hadoop</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>hadoop-mapreduce-client-hs</artifactId>
+ <groupId>org.apache.hadoop</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>hadoop-mapreduce-client-core</artifactId>
+ <groupId>org.apache.hadoop</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-core</artifactId>
+ <version>${hadoop.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.twitter</groupId>
+ <artifactId>parquet-column</artifactId>
+ <version>${parquet.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.twitter</groupId>
+ <artifactId>parquet-hadoop</artifactId>
+ <version>${parquet.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.twitter</groupId>
+ <artifactId>parquet-format</artifactId>
+ <version>${parquet.format.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-buffer</artifactId>
+ </dependency>
+ </dependencies>
+
+ <profiles>
+ <profile>
+ <id>docs</id>
+ <activation>
+ <activeByDefault>false</activeByDefault>
+ </activation>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-javadoc-plugin</artifactId>
+ <executions>
+ <execution>
+ <!-- build javadoc jars per jar for publishing to maven -->
+ <id>module-javadocs</id>
+ <phase>package</phase>
+ <goals>
+ <goal>jar</goal>
+ </goals>
+ <configuration>
+ <destDir>${project.build.directory}</destDir>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ </profiles>
+
+ <reporting>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-report-plugin</artifactId>
+ <version>2.15</version>
+ </plugin>
+ </plugins>
+ </reporting>
+</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/CSVFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/CSVFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/CSVFile.java
new file mode 100644
index 0000000..4bf4c99
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/CSVFile.java
@@ -0,0 +1,587 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.commons.lang.StringEscapeUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.*;
+import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.exception.UnsupportedException;
+import org.apache.tajo.storage.compress.CodecPool;
+import org.apache.tajo.storage.exception.AlreadyExistsStorageException;
+import org.apache.tajo.storage.fragment.Fragment;
+import org.apache.tajo.storage.rcfile.NonSyncByteArrayOutputStream;
+import org.apache.tajo.util.BytesUtils;
+
+import java.io.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+
+public class CSVFile {
+
+ public static final byte LF = '\n';
+ public static int EOF = -1;
+
+ private static final Log LOG = LogFactory.getLog(CSVFile.class);
+
+ public static class CSVAppender extends FileAppender {
+ private final TableMeta meta;
+ private final Schema schema;
+ private final int columnNum;
+ private final FileSystem fs;
+ private FSDataOutputStream fos;
+ private DataOutputStream outputStream;
+ private CompressionOutputStream deflateFilter;
+ private char delimiter;
+ private TableStatistics stats = null;
+ private Compressor compressor;
+ private CompressionCodecFactory codecFactory;
+ private CompressionCodec codec;
+ private Path compressedPath;
+ private byte[] nullChars;
+ private int BUFFER_SIZE = 128 * 1024;
+ private int bufferedBytes = 0;
+ private long pos = 0;
+ private boolean isShuffle;
+
+ private NonSyncByteArrayOutputStream os = new NonSyncByteArrayOutputStream(BUFFER_SIZE);
+ private SerializerDeserializer serde;
+
+ public CSVAppender(Configuration conf, final QueryUnitAttemptId taskAttemptId,
+ final Schema schema, final TableMeta meta, final Path workDir) throws IOException {
+ super(conf, taskAttemptId, schema, meta, workDir);
+ this.fs = workDir.getFileSystem(conf);
+ this.meta = meta;
+ this.schema = schema;
+ this.delimiter = StringEscapeUtils.unescapeJava(
+ this.meta.getOption(StorageConstants.TEXT_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER)).charAt(0);
+
+ this.columnNum = schema.size();
+
+ String nullCharacters = StringEscapeUtils.unescapeJava(
+ this.meta.getOption(StorageConstants.TEXT_NULL, NullDatum.DEFAULT_TEXT));
+
+ if (StringUtils.isEmpty(nullCharacters)) {
+ nullChars = NullDatum.get().asTextBytes();
+ } else {
+ nullChars = nullCharacters.getBytes();
+ }
+ }
+
+ @Override
+ public void init() throws IOException {
+ if (!fs.exists(path.getParent())) {
+ throw new FileNotFoundException(path.getParent().toString());
+ }
+
+ //determine the intermediate file type
+ String store = conf.get(TajoConf.ConfVars.SHUFFLE_FILE_FORMAT.varname,
+ TajoConf.ConfVars.SHUFFLE_FILE_FORMAT.defaultVal);
+ if (enabledStats && CatalogProtos.StoreType.CSV == CatalogProtos.StoreType.valueOf(store.toUpperCase())) {
+ isShuffle = true;
+ } else {
+ isShuffle = false;
+ }
+
+ if(this.meta.containsOption(StorageConstants.COMPRESSION_CODEC)) {
+ String codecName = this.meta.getOption(StorageConstants.COMPRESSION_CODEC);
+ codecFactory = new CompressionCodecFactory(conf);
+ codec = codecFactory.getCodecByClassName(codecName);
+ compressor = CodecPool.getCompressor(codec);
+ if(compressor != null) compressor.reset(); //builtin gzip is null
+
+ String extension = codec.getDefaultExtension();
+ compressedPath = path.suffix(extension);
+
+ if (fs.exists(compressedPath)) {
+ throw new AlreadyExistsStorageException(compressedPath);
+ }
+
+ fos = fs.create(compressedPath);
+ deflateFilter = codec.createOutputStream(fos, compressor);
+ outputStream = new DataOutputStream(deflateFilter);
+
+ } else {
+ if (fs.exists(path)) {
+ throw new AlreadyExistsStorageException(path);
+ }
+ fos = fs.create(path);
+ outputStream = new DataOutputStream(new BufferedOutputStream(fos));
+ }
+
+ if (enabledStats) {
+ this.stats = new TableStatistics(this.schema);
+ }
+
+ try {
+ //It will be remove, because we will add custom serde in textfile
+ String serdeClass = this.meta.getOption(StorageConstants.CSVFILE_SERDE,
+ TextSerializerDeserializer.class.getName());
+ serde = (SerializerDeserializer) Class.forName(serdeClass).newInstance();
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ throw new IOException(e);
+ }
+
+ os.reset();
+ pos = fos.getPos();
+ bufferedBytes = 0;
+ super.init();
+ }
+
+
+ @Override
+ public void addTuple(Tuple tuple) throws IOException {
+ Datum datum;
+ int rowBytes = 0;
+
+ for (int i = 0; i < columnNum; i++) {
+ datum = tuple.get(i);
+ rowBytes += serde.serialize(schema.getColumn(i), datum, os, nullChars);
+
+ if(columnNum - 1 > i){
+ os.write((byte) delimiter);
+ rowBytes += 1;
+ }
+ if (isShuffle) {
+ // it is to calculate min/max values, and it is only used for the intermediate file.
+ stats.analyzeField(i, datum);
+ }
+ }
+ os.write(LF);
+ rowBytes += 1;
+
+ pos += rowBytes;
+ bufferedBytes += rowBytes;
+ if(bufferedBytes > BUFFER_SIZE){
+ flushBuffer();
+ }
+ // Statistical section
+ if (enabledStats) {
+ stats.incrementRow();
+ }
+ }
+
+ private void flushBuffer() throws IOException {
+ if(os.getLength() > 0) {
+ os.writeTo(outputStream);
+ os.reset();
+ bufferedBytes = 0;
+ }
+ }
+ @Override
+ public long getOffset() throws IOException {
+ return pos;
+ }
+
+ @Override
+ public void flush() throws IOException {
+ flushBuffer();
+ outputStream.flush();
+ }
+
+ @Override
+ public void close() throws IOException {
+
+ try {
+ flush();
+
+ // Statistical section
+ if (enabledStats) {
+ stats.setNumBytes(getOffset());
+ }
+
+ if(deflateFilter != null) {
+ deflateFilter.finish();
+ deflateFilter.resetState();
+ deflateFilter = null;
+ }
+
+ os.close();
+ } finally {
+ IOUtils.cleanup(LOG, fos);
+ if (compressor != null) {
+ CodecPool.returnCompressor(compressor);
+ compressor = null;
+ }
+ }
+ }
+
+ @Override
+ public TableStats getStats() {
+ if (enabledStats) {
+ return stats.getTableStat();
+ } else {
+ return null;
+ }
+ }
+
+ public boolean isCompress() {
+ return compressor != null;
+ }
+
+ public String getExtension() {
+ return codec != null ? codec.getDefaultExtension() : "";
+ }
+ }
+
+ public static class CSVScanner extends FileScanner implements SeekableScanner {
+ public CSVScanner(Configuration conf, final Schema schema, final TableMeta meta, final Fragment fragment)
+ throws IOException {
+ super(conf, schema, meta, fragment);
+ factory = new CompressionCodecFactory(conf);
+ codec = factory.getCodec(this.fragment.getPath());
+ if (codec == null || codec instanceof SplittableCompressionCodec) {
+ splittable = true;
+ }
+
+ //Delimiter
+ this.delimiter = StringEscapeUtils.unescapeJava(
+ meta.getOption(StorageConstants.TEXT_DELIMITER,
+ meta.getOption(StorageConstants.CSVFILE_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER))).charAt(0);
+
+ String nullCharacters = StringEscapeUtils.unescapeJava(
+ meta.getOption(StorageConstants.TEXT_NULL,
+ meta.getOption(StorageConstants.CSVFILE_NULL, NullDatum.DEFAULT_TEXT)));
+
+ if (StringUtils.isEmpty(nullCharacters)) {
+ nullChars = NullDatum.get().asTextBytes();
+ } else {
+ nullChars = nullCharacters.getBytes();
+ }
+ }
+
+ private final static int DEFAULT_PAGE_SIZE = 256 * 1024;
+ private char delimiter;
+ private FileSystem fs;
+ private FSDataInputStream fis;
+ private InputStream is; //decompressd stream
+ private CompressionCodecFactory factory;
+ private CompressionCodec codec;
+ private Decompressor decompressor;
+ private Seekable filePosition;
+ private boolean splittable = false;
+ private long startOffset, end, pos;
+ private int currentIdx = 0, validIdx = 0, recordCount = 0;
+ private int[] targetColumnIndexes;
+ private boolean eof = false;
+ private final byte[] nullChars;
+ private SplitLineReader reader;
+ private ArrayList<Long> fileOffsets;
+ private ArrayList<Integer> rowLengthList;
+ private ArrayList<Integer> startOffsets;
+ private NonSyncByteArrayOutputStream buffer;
+ private SerializerDeserializer serde;
+
+ @Override
+ public void init() throws IOException {
+ fileOffsets = new ArrayList<Long>();
+ rowLengthList = new ArrayList<Integer>();
+ startOffsets = new ArrayList<Integer>();
+ buffer = new NonSyncByteArrayOutputStream(DEFAULT_PAGE_SIZE);
+
+ // FileFragment information
+ if(fs == null) {
+ fs = FileScanner.getFileSystem((TajoConf)conf, fragment.getPath());
+ }
+ if(fis == null) fis = fs.open(fragment.getPath());
+
+ recordCount = 0;
+ pos = startOffset = fragment.getStartKey();
+ end = startOffset + fragment.getLength();
+
+ if (codec != null) {
+ decompressor = CodecPool.getDecompressor(codec);
+ if (codec instanceof SplittableCompressionCodec) {
+ SplitCompressionInputStream cIn = ((SplittableCompressionCodec) codec).createInputStream(
+ fis, decompressor, startOffset, end,
+ SplittableCompressionCodec.READ_MODE.BYBLOCK);
+
+ reader = new CompressedSplitLineReader(cIn, conf, null);
+ startOffset = cIn.getAdjustedStart();
+ end = cIn.getAdjustedEnd();
+ filePosition = cIn;
+ is = cIn;
+ } else {
+ is = new DataInputStream(codec.createInputStream(fis, decompressor));
+ reader = new SplitLineReader(is, null);
+ filePosition = fis;
+ }
+ } else {
+ fis.seek(startOffset);
+ filePosition = fis;
+ is = fis;
+ reader = new SplitLineReader(is, null);
+ }
+
+ if (targets == null) {
+ targets = schema.toArray();
+ }
+
+ targetColumnIndexes = new int[targets.length];
+ for (int i = 0; i < targets.length; i++) {
+ targetColumnIndexes[i] = schema.getColumnId(targets[i].getQualifiedName());
+ }
+
+ try {
+ //FIXME
+ String serdeClass = this.meta.getOption(StorageConstants.CSVFILE_SERDE,
+ TextSerializerDeserializer.class.getName());
+ serde = (SerializerDeserializer) Class.forName(serdeClass).newInstance();
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ throw new IOException(e);
+ }
+
+ super.init();
+ Arrays.sort(targetColumnIndexes);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("CSVScanner open:" + fragment.getPath() + "," + startOffset + "," + end +
+ "," + fs.getFileStatus(fragment.getPath()).getLen());
+ }
+
+ if (startOffset != 0) {
+ pos += reader.readLine(new Text(), 0, maxBytesToConsume(pos));
+ }
+ eof = false;
+ page();
+ }
+
+ private int maxBytesToConsume(long pos) {
+ return isCompress() ? Integer.MAX_VALUE : (int) Math.min(Integer.MAX_VALUE, end - pos);
+ }
+
+ private long fragmentable() throws IOException {
+ return end - getFilePosition();
+ }
+
+ private long getFilePosition() throws IOException {
+ long retVal;
+ if (isCompress()) {
+ retVal = filePosition.getPos();
+ } else {
+ retVal = pos;
+ }
+ return retVal;
+ }
+
+ private void page() throws IOException {
+// // Index initialization
+ currentIdx = 0;
+ validIdx = 0;
+ int currentBufferPos = 0;
+ int bufferedSize = 0;
+
+ buffer.reset();
+ startOffsets.clear();
+ rowLengthList.clear();
+ fileOffsets.clear();
+
+ if(eof) {
+ return;
+ }
+
+ while (DEFAULT_PAGE_SIZE >= bufferedSize){
+
+ int ret = reader.readDefaultLine(buffer, rowLengthList, Integer.MAX_VALUE, Integer.MAX_VALUE);
+
+ if(ret == 0){
+ break;
+ } else {
+ fileOffsets.add(pos);
+ pos += ret;
+ startOffsets.add(currentBufferPos);
+ currentBufferPos += rowLengthList.get(rowLengthList.size() - 1);
+ bufferedSize += ret;
+ validIdx++;
+ recordCount++;
+ }
+
+ if(getFilePosition() > end && !reader.needAdditionalRecordAfterSplit()){
+ eof = true;
+ break;
+ }
+ }
+ if (tableStats != null) {
+ tableStats.setReadBytes(pos - startOffset);
+ tableStats.setNumRows(recordCount);
+ }
+ }
+
+ @Override
+ public float getProgress() {
+ try {
+ if(eof) {
+ return 1.0f;
+ }
+ long filePos = getFilePosition();
+ if (startOffset == filePos) {
+ return 0.0f;
+ } else {
+ long readBytes = filePos - startOffset;
+ long remainingBytes = Math.max(end - filePos, 0);
+ return Math.min(1.0f, (float)(readBytes) / (float)(readBytes + remainingBytes));
+ }
+ } catch (IOException e) {
+ LOG.error(e.getMessage(), e);
+ return 0.0f;
+ }
+ }
+
+ @Override
+ public Tuple next() throws IOException {
+ try {
+ if (currentIdx == validIdx) {
+ if (eof) {
+ return null;
+ } else {
+ page();
+
+ if(currentIdx == validIdx){
+ return null;
+ }
+ }
+ }
+
+ long offset = -1;
+ if(!isCompress()){
+ offset = fileOffsets.get(currentIdx);
+ }
+
+ byte[][] cells = BytesUtils.splitPreserveAllTokens(buffer.getData(), startOffsets.get(currentIdx),
+ rowLengthList.get(currentIdx), delimiter, targetColumnIndexes);
+ currentIdx++;
+ return new LazyTuple(schema, cells, offset, nullChars, serde);
+ } catch (Throwable t) {
+ LOG.error("Tuple list length: " + (fileOffsets != null ? fileOffsets.size() : 0), t);
+ LOG.error("Tuple list current index: " + currentIdx, t);
+ throw new IOException(t);
+ }
+ }
+
+ private boolean isCompress() {
+ return codec != null;
+ }
+
+ @Override
+ public void reset() throws IOException {
+ if (decompressor != null) {
+ CodecPool.returnDecompressor(decompressor);
+ decompressor = null;
+ }
+
+ init();
+ }
+
+ @Override
+ public void close() throws IOException {
+ try {
+ if (tableStats != null) {
+ tableStats.setReadBytes(pos - startOffset); //Actual Processed Bytes. (decompressed bytes + overhead)
+ tableStats.setNumRows(recordCount);
+ }
+
+ IOUtils.cleanup(LOG, reader, is, fis);
+ fs = null;
+ is = null;
+ fis = null;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("CSVScanner processed record:" + recordCount);
+ }
+ } finally {
+ if (decompressor != null) {
+ CodecPool.returnDecompressor(decompressor);
+ decompressor = null;
+ }
+ }
+ }
+
+ @Override
+ public boolean isProjectable() {
+ return true;
+ }
+
+ @Override
+ public boolean isSelectable() {
+ return false;
+ }
+
+ @Override
+ public void setSearchCondition(Object expr) {
+ }
+
+ @Override
+ public void seek(long offset) throws IOException {
+ if(isCompress()) throw new UnsupportedException();
+
+ int tupleIndex = Arrays.binarySearch(fileOffsets.toArray(), offset);
+
+ if (tupleIndex > -1) {
+ this.currentIdx = tupleIndex;
+ } else if (isSplittable() && end >= offset || startOffset <= offset) {
+ eof = false;
+ fis.seek(offset);
+ pos = offset;
+ reader.reset();
+ this.currentIdx = 0;
+ this.validIdx = 0;
+ // pageBuffer();
+ } else {
+ throw new IOException("invalid offset " +
+ " < start : " + startOffset + " , " +
+ " end : " + end + " , " +
+ " filePos : " + filePosition.getPos() + " , " +
+ " input offset : " + offset + " >");
+ }
+ }
+
+ @Override
+ public long getNextOffset() throws IOException {
+ if(isCompress()) throw new UnsupportedException();
+
+ if (this.currentIdx == this.validIdx) {
+ if (fragmentable() <= 0) {
+ return -1;
+ } else {
+ page();
+ if(currentIdx == validIdx) return -1;
+ }
+ }
+ return fileOffsets.get(currentIdx);
+ }
+
+ @Override
+ public boolean isSplittable(){
+ return splittable;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/CompressedSplitLineReader.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/CompressedSplitLineReader.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/CompressedSplitLineReader.java
new file mode 100644
index 0000000..4f58e68
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/CompressedSplitLineReader.java
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.SplitCompressionInputStream;
+import org.apache.tajo.storage.rcfile.NonSyncByteArrayOutputStream;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+
+/**
+ * Line reader for compressed splits
+ *
+ * Reading records from a compressed split is tricky, as the
+ * LineRecordReader is using the reported compressed input stream
+ * position directly to determine when a split has ended. In addition the
+ * compressed input stream is usually faking the actual byte position, often
+ * updating it only after the first compressed block after the split is
+ * accessed.
+ *
+ * Depending upon where the last compressed block of the split ends relative
+ * to the record delimiters it can be easy to accidentally drop the last
+ * record or duplicate the last record between this split and the next.
+ *
+ * Split end scenarios:
+ *
+ * 1) Last block of split ends in the middle of a record
+ * Nothing special that needs to be done here, since the compressed input
+ * stream will report a position after the split end once the record
+ * is fully read. The consumer of the next split will discard the
+ * partial record at the start of the split normally, and no data is lost
+ * or duplicated between the splits.
+ *
+ * 2) Last block of split ends in the middle of a delimiter
+ * The line reader will continue to consume bytes into the next block to
+ * locate the end of the delimiter. If a custom delimiter is being used
+ * then the next record must be read by this split or it will be dropped.
+ * The consumer of the next split will not recognize the partial
+ * delimiter at the beginning of its split and will discard it along with
+ * the next record.
+ *
+ * However for the default delimiter processing there is a special case
+ * because CR, LF, and CRLF are all valid record delimiters. If the
+ * block ends with a CR then the reader must peek at the next byte to see
+ * if it is an LF and therefore part of the same record delimiter.
+ * Peeking at the next byte is an access to the next block and triggers
+ * the stream to report the end of the split. There are two cases based
+ * on the next byte:
+ *
+ * A) The next byte is LF
+ * The split needs to end after the current record is returned. The
+ * consumer of the next split will discard the first record, which
+ * is degenerate since LF is itself a delimiter, and start consuming
+ * records after that byte. If the current split tries to read
+ * another record then the record will be duplicated between splits.
+ *
+ * B) The next byte is not LF
+ * The current record will be returned but the stream will report
+ * the split has ended due to the peek into the next block. If the
+ * next record is not read then it will be lost, as the consumer of
+ * the next split will discard it before processing subsequent
+ * records. Therefore the next record beyond the reported split end
+ * must be consumed by this split to avoid data loss.
+ *
+ * 3) Last block of split ends at the beginning of a delimiter
+ * This is equivalent to case 1, as the reader will consume bytes into
+ * the next block and trigger the end of the split. No further records
+ * should be read as the consumer of the next split will discard the
+ * (degenerate) record at the beginning of its split.
+ *
+ * 4) Last block of split ends at the end of a delimiter
+ * Nothing special needs to be done here. The reader will not start
+ * examining the bytes into the next block until the next record is read,
+ * so the stream will not report the end of the split just yet. Once the
+ * next record is read then the next block will be accessed and the
+ * stream will indicate the end of the split. The consumer of the next
+ * split will correctly discard the first record of its split, and no
+ * data is lost or duplicated.
+ *
+ * If the default delimiter is used and the block ends at a CR then this
+ * is treated as case 2 since the reader does not yet know without
+ * looking at subsequent bytes whether the delimiter has ended.
+ *
+ * NOTE: It is assumed that compressed input streams *never* return bytes from
+ * multiple compressed blocks from a single read. Failure to do so will
+ * violate the buffering performed by this class, as it will access
+ * bytes into the next block after the split before returning all of the
+ * records from the previous block.
+ */
+
+public class CompressedSplitLineReader extends SplitLineReader {
+ SplitCompressionInputStream scin;
+ private boolean usingCRLF;
+ private boolean needAdditionalRecord = false;
+ private boolean finished = false;
+
+ public CompressedSplitLineReader(SplitCompressionInputStream in,
+ Configuration conf,
+ byte[] recordDelimiterBytes)
+ throws IOException {
+ super(in, conf, recordDelimiterBytes);
+ scin = in;
+ usingCRLF = (recordDelimiterBytes == null);
+ }
+
+ @Override
+ protected int fillBuffer(InputStream in, byte[] buffer, boolean inDelimiter)
+ throws IOException {
+ int bytesRead = in.read(buffer);
+
+ // If the split ended in the middle of a record delimiter then we need
+ // to read one additional record, as the consumer of the next split will
+ // not recognize the partial delimiter as a record.
+ // However if using the default delimiter and the next character is a
+ // linefeed then next split will treat it as a delimiter all by itself
+ // and the additional record read should not be performed.
+ if (inDelimiter && bytesRead > 0) {
+ if (usingCRLF) {
+ needAdditionalRecord = (buffer[0] != '\n');
+ } else {
+ needAdditionalRecord = true;
+ }
+ }
+ return bytesRead;
+ }
+
+ @Override
+ public int readLine(Text str, int maxLineLength, int maxBytesToConsume)
+ throws IOException {
+ int bytesRead = 0;
+ if (!finished) {
+ // only allow at most one more record to be read after the stream
+ // reports the split ended
+ if (scin.getPos() > scin.getAdjustedEnd()) {
+ finished = true;
+ }
+
+ bytesRead = super.readLine(str, maxLineLength, maxBytesToConsume);
+ }
+ return bytesRead;
+ }
+
+ @Override
+ public int readDefaultLine(NonSyncByteArrayOutputStream str, ArrayList<Integer> offsets, int maxLineLength
+ , int maxBytesToConsume) throws IOException {
+ int bytesRead = 0;
+ if (!finished) {
+ // only allow at most one more record to be read after the stream
+ // reports the split ended
+ if (scin.getPos() > scin.getAdjustedEnd()) {
+ finished = true;
+ }
+
+ bytesRead = super.readDefaultLine(str, offsets, maxLineLength, maxBytesToConsume);
+ }
+ return bytesRead;
+ }
+
+ @Override
+ public boolean needAdditionalRecordAfterSplit() {
+ return !finished && needAdditionalRecord;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileAppender.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileAppender.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileAppender.java
new file mode 100644
index 0000000..47f67c6
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileAppender.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.conf.TajoConf;
+
+import java.io.IOException;
+
+public abstract class FileAppender implements Appender {
+ private static final Log LOG = LogFactory.getLog(FileAppender.class);
+
+ protected boolean inited = false;
+
+ protected final Configuration conf;
+ protected final TableMeta meta;
+ protected final Schema schema;
+ protected final Path workDir;
+ protected final QueryUnitAttemptId taskAttemptId;
+
+ protected boolean enabledStats;
+ protected Path path;
+
+ public FileAppender(Configuration conf, QueryUnitAttemptId taskAttemptId, Schema schema,
+ TableMeta meta, Path workDir) {
+ this.conf = conf;
+ this.meta = meta;
+ this.schema = schema;
+ this.workDir = workDir;
+ this.taskAttemptId = taskAttemptId;
+
+ try {
+ if (taskAttemptId != null) {
+ this.path = ((FileStorageManager)StorageManager.getFileStorageManager((TajoConf) conf))
+ .getAppenderFilePath(taskAttemptId, workDir);
+ } else {
+ this.path = workDir;
+ }
+ } catch (IOException e) {
+ LOG.error(e.getMessage(), e);
+ throw new IllegalStateException("Error while opeining FileAppender: " + e.getMessage(), e);
+ }
+ }
+
+ public void init() throws IOException {
+ if (inited) {
+ throw new IllegalStateException("FileAppender is already initialized.");
+ }
+ inited = true;
+ }
+
+ public void enableStats() {
+ if (inited) {
+ throw new IllegalStateException("Should enable this option before init()");
+ }
+
+ this.enabledStats = true;
+ }
+
+ public long getEstimatedOutputSize() throws IOException {
+ return getOffset();
+ }
+
+ public abstract long getOffset() throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileScanner.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileScanner.java
new file mode 100644
index 0000000..038f0f4
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileScanner.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.statistics.ColumnStats;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.fragment.Fragment;
+
+import java.io.IOException;
+
+public abstract class FileScanner implements Scanner {
+ private static final Log LOG = LogFactory.getLog(FileScanner.class);
+
+ protected boolean inited = false;
+ protected final Configuration conf;
+ protected final TableMeta meta;
+ protected final Schema schema;
+ protected final FileFragment fragment;
+ protected final int columnNum;
+
+ protected Column [] targets;
+
+ protected float progress;
+
+ protected TableStats tableStats;
+
+ public FileScanner(Configuration conf, final Schema schema, final TableMeta meta, final Fragment fragment) {
+ this.conf = conf;
+ this.meta = meta;
+ this.schema = schema;
+ this.fragment = (FileFragment)fragment;
+ this.tableStats = new TableStats();
+ this.columnNum = this.schema.size();
+ }
+
+ public void init() throws IOException {
+ inited = true;
+ progress = 0.0f;
+
+ if (fragment != null) {
+ tableStats.setNumBytes(fragment.getLength());
+ tableStats.setNumBlocks(1);
+ }
+
+ if (schema != null) {
+ for(Column eachColumn: schema.getColumns()) {
+ ColumnStats columnStats = new ColumnStats(eachColumn);
+ tableStats.addColumnStat(columnStats);
+ }
+ }
+ }
+
+ @Override
+ public Schema getSchema() {
+ return schema;
+ }
+
+ @Override
+ public void setTarget(Column[] targets) {
+ if (inited) {
+ throw new IllegalStateException("Should be called before init()");
+ }
+ this.targets = targets;
+ }
+
+ public void setSearchCondition(Object expr) {
+ if (inited) {
+ throw new IllegalStateException("Should be called before init()");
+ }
+ }
+
+ public static FileSystem getFileSystem(TajoConf tajoConf, Path path) throws IOException {
+ String tajoUser = tajoConf.getVar(TajoConf.ConfVars.USERNAME);
+ FileSystem fs;
+ if(tajoUser != null) {
+ try {
+ fs = FileSystem.get(path.toUri(), tajoConf, tajoUser);
+ } catch (InterruptedException e) {
+ LOG.warn("Occur InterruptedException while FileSystem initiating with user[" + tajoUser + "]");
+ fs = FileSystem.get(path.toUri(), tajoConf);
+ }
+ } else {
+ fs = FileSystem.get(path.toUri(), tajoConf);
+ }
+
+ return fs;
+ }
+
+ @Override
+ public float getProgress() {
+ return progress;
+ }
+
+ @Override
+ public TableStats getInputStats() {
+ return tableStats;
+ }
+}