You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2014/12/12 09:22:32 UTC
[18/45] tajo git commit: TAJO-1233: Merge hbase_storage branch to the
master branch. (Hyoungjun Kim via hyunsik)
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseFragment.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseFragment.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseFragment.java
new file mode 100644
index 0000000..2674511
--- /dev/null
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseFragment.java
@@ -0,0 +1,198 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.hbase;
+
+import com.google.common.base.Objects;
+import com.google.gson.annotations.Expose;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.storage.fragment.Fragment;
+import org.apache.tajo.storage.hbase.StorageFragmentProtos.*;
+
+public class HBaseFragment implements Fragment, Comparable<HBaseFragment>, Cloneable {
+ @Expose
+ private String tableName;
+ @Expose
+ private String hbaseTableName;
+ @Expose
+ private byte[] startRow;
+ @Expose
+ private byte[] stopRow;
+ @Expose
+ private String regionLocation;
+ @Expose
+ private boolean last;
+ @Expose
+ private long length;
+
+ public HBaseFragment(String tableName, String hbaseTableName, byte[] startRow, byte[] stopRow, String regionLocation) {
+ this.tableName = tableName;
+ this.hbaseTableName = hbaseTableName;
+ this.startRow = startRow;
+ this.stopRow = stopRow;
+ this.regionLocation = regionLocation;
+ this.last = false;
+ }
+
+ public HBaseFragment(ByteString raw) throws InvalidProtocolBufferException {
+ HBaseFragmentProto.Builder builder = HBaseFragmentProto.newBuilder();
+ builder.mergeFrom(raw);
+ builder.build();
+ init(builder.build());
+ }
+
+ private void init(HBaseFragmentProto proto) {
+ this.tableName = proto.getTableName();
+ this.hbaseTableName = proto.getHbaseTableName();
+ this.startRow = proto.getStartRow().toByteArray();
+ this.stopRow = proto.getStopRow().toByteArray();
+ this.regionLocation = proto.getRegionLocation();
+ this.length = proto.getLength();
+ this.last = proto.getLast();
+ }
+
+ @Override
+ public int compareTo(HBaseFragment t) {
+ return Bytes.compareTo(startRow, t.startRow);
+ }
+
+ @Override
+ public String getTableName() {
+ return tableName;
+ }
+
+ @Override
+ public String getKey() {
+ return new String(startRow);
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return startRow == null || stopRow == null;
+ }
+
+ @Override
+ public long getLength() {
+ return length;
+ }
+
+ public void setLength(long length) {
+ this.length = length;
+ }
+
+ @Override
+ public String[] getHosts() {
+ return new String[] {regionLocation};
+ }
+
+ public Object clone() throws CloneNotSupportedException {
+ HBaseFragment frag = (HBaseFragment) super.clone();
+ frag.tableName = tableName;
+ frag.hbaseTableName = hbaseTableName;
+ frag.startRow = startRow;
+ frag.stopRow = stopRow;
+ frag.regionLocation = regionLocation;
+ frag.last = last;
+ frag.length = length;
+ return frag;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o instanceof HBaseFragment) {
+ HBaseFragment t = (HBaseFragment) o;
+ if (tableName.equals(t.tableName)
+ && Bytes.equals(startRow, t.startRow)
+ && Bytes.equals(stopRow, t.stopRow)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(tableName, hbaseTableName, startRow, stopRow);
+ }
+
+ @Override
+ public String toString() {
+ return "\"fragment\": {\"tableName\": \""+ tableName + "\", hbaseTableName\": \"" + hbaseTableName + "\"" +
+ ", \"startRow\": \"" + new String(startRow) + "\"" +
+ ", \"stopRow\": \"" + new String(stopRow) + "\"" +
+ ", \"length\": \"" + length + "\"}" ;
+ }
+
+ @Override
+ public FragmentProto getProto() {
+ HBaseFragmentProto.Builder builder = HBaseFragmentProto.newBuilder();
+ builder.setTableName(tableName)
+ .setHbaseTableName(hbaseTableName)
+ .setStartRow(ByteString.copyFrom(startRow))
+ .setStopRow(ByteString.copyFrom(stopRow))
+ .setLast(last)
+ .setLength(length)
+ .setRegionLocation(regionLocation);
+
+ FragmentProto.Builder fragmentBuilder = FragmentProto.newBuilder();
+ fragmentBuilder.setId(this.tableName);
+ fragmentBuilder.setContents(builder.buildPartial().toByteString());
+ fragmentBuilder.setStoreType(StoreType.HBASE.name());
+ return fragmentBuilder.build();
+ }
+
+ public byte[] getStartRow() {
+ return startRow;
+ }
+
+ public byte[] getStopRow() {
+ return stopRow;
+ }
+
+ public String getRegionLocation() {
+ return regionLocation;
+ }
+
+ public boolean isLast() {
+ return last;
+ }
+
+ public void setLast(boolean last) {
+ this.last = last;
+ }
+
+ public String getHbaseTableName() {
+ return hbaseTableName;
+ }
+
+ public void setHbaseTableName(String hbaseTableName) {
+ this.hbaseTableName = hbaseTableName;
+ }
+
+ public void setStartRow(byte[] startRow) {
+ this.startRow = startRow;
+ }
+
+ public void setStopRow(byte[] stopRow) {
+ this.stopRow = stopRow;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java
new file mode 100644
index 0000000..50f61a8
--- /dev/null
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.hbase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.storage.StorageManager;
+import org.apache.tajo.storage.Tuple;
+
+import java.io.IOException;
+
+public class HBasePutAppender extends AbstractHBaseAppender {
+ private HTableInterface htable;
+ private long totalNumBytes;
+
+ public HBasePutAppender(Configuration conf, QueryUnitAttemptId taskAttemptId,
+ Schema schema, TableMeta meta, Path stagingDir) {
+ super(conf, taskAttemptId, schema, meta, stagingDir);
+ }
+
+ @Override
+ public void init() throws IOException {
+ super.init();
+
+ Configuration hbaseConf = HBaseStorageManager.getHBaseConfiguration(conf, meta);
+ HConnection hconn = ((HBaseStorageManager) StorageManager.getStorageManager((TajoConf)conf, StoreType.HBASE))
+ .getConnection(hbaseConf);
+ htable = hconn.getTable(columnMapping.getHbaseTableName());
+ htable.setAutoFlushTo(false);
+ htable.setWriteBufferSize(5 * 1024 * 1024);
+ }
+
+ @Override
+ public void addTuple(Tuple tuple) throws IOException {
+ byte[] rowkey = getRowKeyBytes(tuple);
+ totalNumBytes += rowkey.length;
+ Put put = new Put(rowkey);
+ readKeyValues(tuple, rowkey);
+
+ for (int i = 0; i < columnNum; i++) {
+ if (isRowKeyMappings[i]) {
+ continue;
+ }
+ Datum datum = tuple.get(i);
+ byte[] value;
+ if (isBinaryColumns[i]) {
+ value = HBaseBinarySerializerDeserializer.serialize(schema.getColumn(i), datum);
+ } else {
+ value = HBaseTextSerializerDeserializer.serialize(schema.getColumn(i), datum);
+ }
+
+ if (isColumnKeys[i]) {
+ columnKeyDatas[columnKeyValueDataIndexes[i]] = value;
+ } else if (isColumnValues[i]) {
+ columnValueDatas[columnKeyValueDataIndexes[i]] = value;
+ } else {
+ put.add(mappingColumnFamilies[i][0], mappingColumnFamilies[i][1], value);
+ totalNumBytes += value.length;
+ }
+ }
+
+ for (int i = 0; i < columnKeyDatas.length; i++) {
+ put.add(columnKeyCfNames[i], columnKeyDatas[i], columnValueDatas[i]);
+ totalNumBytes += columnKeyDatas[i].length + columnValueDatas[i].length;
+ }
+
+ htable.put(put);
+
+ if (enabledStats) {
+ stats.incrementRow();
+ stats.setNumBytes(totalNumBytes);
+ }
+ }
+
+ @Override
+ public void flush() throws IOException {
+ htable.flushCommits();
+ }
+
+ @Override
+ public long getEstimatedOutputSize() throws IOException {
+ return 0;
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (htable != null) {
+ htable.flushCommits();
+ htable.close();
+ }
+ if (enabledStats) {
+ stats.setNumBytes(totalNumBytes);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java
new file mode 100644
index 0000000..5cae077
--- /dev/null
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java
@@ -0,0 +1,449 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.hbase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.*;
+import org.apache.hadoop.hbase.filter.FilterList;
+import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
+import org.apache.hadoop.hbase.filter.InclusiveStopFilter;
+import org.apache.hadoop.hbase.filter.KeyOnlyFilter;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.catalog.statistics.ColumnStats;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.datum.TextDatum;
+import org.apache.tajo.storage.Scanner;
+import org.apache.tajo.storage.StorageManager;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.storage.fragment.Fragment;
+import org.apache.tajo.util.BytesUtils;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.NavigableMap;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class HBaseScanner implements Scanner {
+ private static final Log LOG = LogFactory.getLog(HBaseScanner.class);
+ private static final int DEFAULT_FETCH_SIZE = 1000;
+ private static final int MAX_LIST_SIZE = 100;
+
+ protected boolean inited = false;
+ private TajoConf conf;
+ private Schema schema;
+ private TableMeta meta;
+ private HBaseFragment fragment;
+ private Scan scan;
+ private HTableInterface htable;
+ private Configuration hbaseConf;
+ private Column[] targets;
+ private TableStats tableStats;
+ private ResultScanner scanner;
+ private AtomicBoolean finished = new AtomicBoolean(false);
+ private float progress = 0.0f;
+ private int scanFetchSize;
+ private Result[] scanResults;
+ private int scanResultIndex = -1;
+ private Column[] schemaColumns;
+
+ private ColumnMapping columnMapping;
+ private int[] targetIndexes;
+
+ private int numRows = 0;
+
+ private byte[][][] mappingColumnFamilies;
+ private boolean[] isRowKeyMappings;
+ private boolean[] isBinaryColumns;
+ private boolean[] isColumnKeys;
+ private boolean[] isColumnValues;
+
+ private int[] rowKeyFieldIndexes;
+ private char rowKeyDelimiter;
+
+ public HBaseScanner (Configuration conf, Schema schema, TableMeta meta, Fragment fragment) throws IOException {
+ this.conf = (TajoConf)conf;
+ this.schema = schema;
+ this.meta = meta;
+ this.fragment = (HBaseFragment)fragment;
+ this.tableStats = new TableStats();
+ }
+
+ @Override
+ public void init() throws IOException {
+ inited = true;
+ schemaColumns = schema.toArray();
+ if (fragment != null) {
+ tableStats.setNumBytes(0);
+ tableStats.setNumBlocks(1);
+ }
+ if (schema != null) {
+ for(Column eachColumn: schema.getColumns()) {
+ ColumnStats columnStats = new ColumnStats(eachColumn);
+ tableStats.addColumnStat(columnStats);
+ }
+ }
+
+ scanFetchSize = Integer.parseInt(
+ meta.getOption(HBaseStorageConstants.META_FETCH_ROWNUM_KEY, "" + DEFAULT_FETCH_SIZE));
+ if (targets == null) {
+ targets = schema.toArray();
+ }
+
+ columnMapping = new ColumnMapping(schema, meta);
+ targetIndexes = new int[targets.length];
+ int index = 0;
+ for (Column eachTargetColumn: targets) {
+ targetIndexes[index++] = schema.getColumnId(eachTargetColumn.getQualifiedName());
+ }
+
+ mappingColumnFamilies = columnMapping.getMappingColumns();
+ isRowKeyMappings = columnMapping.getIsRowKeyMappings();
+ isBinaryColumns = columnMapping.getIsBinaryColumns();
+ isColumnKeys = columnMapping.getIsColumnKeys();
+ isColumnValues = columnMapping.getIsColumnValues();
+
+ rowKeyDelimiter = columnMapping.getRowKeyDelimiter();
+ rowKeyFieldIndexes = columnMapping.getRowKeyFieldIndexes();
+
+ hbaseConf = HBaseStorageManager.getHBaseConfiguration(conf, meta);
+
+ initScanner();
+ }
+
+ private void initScanner() throws IOException {
+ scan = new Scan();
+ scan.setBatch(scanFetchSize);
+ scan.setCacheBlocks(false);
+ scan.setCaching(scanFetchSize);
+
+ FilterList filters = null;
+ if (targetIndexes == null || targetIndexes.length == 0) {
+ filters = new FilterList(FilterList.Operator.MUST_PASS_ALL);
+ filters.addFilter(new FirstKeyOnlyFilter());
+ filters.addFilter(new KeyOnlyFilter());
+ } else {
+ boolean[] isRowKeyMappings = columnMapping.getIsRowKeyMappings();
+ for (int eachIndex : targetIndexes) {
+ if (isRowKeyMappings[eachIndex]) {
+ continue;
+ }
+ byte[][] mappingColumn = columnMapping.getMappingColumns()[eachIndex];
+ if (mappingColumn[1] == null) {
+ scan.addFamily(mappingColumn[0]);
+ } else {
+ scan.addColumn(mappingColumn[0], mappingColumn[1]);
+ }
+ }
+ }
+
+ scan.setStartRow(fragment.getStartRow());
+ if (fragment.isLast() && fragment.getStopRow() != null &&
+ fragment.getStopRow().length > 0) {
+ // last and stopRow is not empty
+ if (filters == null) {
+ filters = new FilterList();
+ }
+ filters.addFilter(new InclusiveStopFilter(fragment.getStopRow()));
+ } else {
+ scan.setStopRow(fragment.getStopRow());
+ }
+
+ if (filters != null) {
+ scan.setFilter(filters);
+ }
+
+ if (htable == null) {
+ HConnection hconn = ((HBaseStorageManager)StorageManager.getStorageManager(conf, StoreType.HBASE))
+ .getConnection(hbaseConf);
+ htable = hconn.getTable(fragment.getHbaseTableName());
+ }
+ scanner = htable.getScanner(scan);
+ }
+
+ @Override
+ public Tuple next() throws IOException {
+ if (finished.get()) {
+ return null;
+ }
+
+ if (scanResults == null || scanResultIndex >= scanResults.length) {
+ scanResults = scanner.next(scanFetchSize);
+ if (scanResults == null || scanResults.length == 0) {
+ finished.set(true);
+ progress = 1.0f;
+ return null;
+ }
+ scanResultIndex = 0;
+ }
+
+ Result result = scanResults[scanResultIndex++];
+ Tuple resultTuple = new VTuple(schema.size());
+ for (int i = 0; i < targetIndexes.length; i++) {
+ resultTuple.put(targetIndexes[i], getDatum(result, targetIndexes[i]));
+ }
+ numRows++;
+ return resultTuple;
+ }
+
+ private Datum getDatum(Result result, int fieldId) throws IOException {
+ byte[] value = null;
+ if (isRowKeyMappings[fieldId]) {
+ value = result.getRow();
+ if (!isBinaryColumns[fieldId] && rowKeyFieldIndexes[fieldId] >= 0) {
+ int rowKeyFieldIndex = rowKeyFieldIndexes[fieldId];
+
+ byte[][] rowKeyFields = BytesUtils.splitPreserveAllTokens(value, rowKeyDelimiter);
+
+ if (rowKeyFields.length < rowKeyFieldIndex) {
+ return NullDatum.get();
+ } else {
+ value = rowKeyFields[rowKeyFieldIndex];
+ }
+ }
+ } else {
+ if (isColumnKeys[fieldId]) {
+ NavigableMap<byte[], byte[]> cfMap = result.getFamilyMap(mappingColumnFamilies[fieldId][0]);
+ if (cfMap != null) {
+ Set<byte[]> keySet = cfMap.keySet();
+ if (keySet.size() == 1) {
+ try {
+ return HBaseTextSerializerDeserializer.deserialize(schemaColumns[fieldId], keySet.iterator().next());
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ throw new RuntimeException(e.getMessage(), e);
+ }
+ } else {
+ StringBuilder sb = new StringBuilder();
+ sb.append("[");
+ int count = 0;
+ for (byte[] eachKey : keySet) {
+ if (count > 0) {
+ sb.append(", ");
+ }
+ Datum datum = HBaseTextSerializerDeserializer.deserialize(schemaColumns[fieldId], eachKey);
+ sb.append("\"").append(datum.asChars()).append("\"");
+ count++;
+ if (count > MAX_LIST_SIZE) {
+ break;
+ }
+ }
+ sb.append("]");
+ return new TextDatum(sb.toString());
+ }
+ }
+ } else if (isColumnValues[fieldId]) {
+ NavigableMap<byte[], byte[]> cfMap = result.getFamilyMap(mappingColumnFamilies[fieldId][0]);
+ if (cfMap != null) {
+ Collection<byte[]> valueList = cfMap.values();
+ if (valueList.size() == 1) {
+ try {
+ return HBaseTextSerializerDeserializer.deserialize(schemaColumns[fieldId], valueList.iterator().next());
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ throw new RuntimeException(e.getMessage(), e);
+ }
+ } else {
+ StringBuilder sb = new StringBuilder();
+ sb.append("[");
+ int count = 0;
+ for (byte[] eachValue : valueList) {
+ if (count > 0) {
+ sb.append(", ");
+ }
+ Datum datum = HBaseTextSerializerDeserializer.deserialize(schemaColumns[fieldId], eachValue);
+ sb.append("\"").append(datum.asChars()).append("\"");
+ count++;
+ if (count > MAX_LIST_SIZE) {
+ break;
+ }
+ }
+ sb.append("]");
+ return new TextDatum(sb.toString());
+ }
+ }
+ } else {
+ if (mappingColumnFamilies[fieldId][1] == null) {
+ NavigableMap<byte[], byte[]> cfMap = result.getFamilyMap(mappingColumnFamilies[fieldId][0]);
+ if (cfMap != null && !cfMap.isEmpty()) {
+ int count = 0;
+ String delim = "";
+
+ if (cfMap.size() == 0) {
+ return NullDatum.get();
+ } else if (cfMap.size() == 1) {
+ // If a column family is mapped without column name like "cf1:" and the number of cells is one,
+ // return value is flat format not json format.
+ NavigableMap.Entry<byte[], byte[]> entry = cfMap.entrySet().iterator().next();
+ byte[] entryKey = entry.getKey();
+ byte[] entryValue = entry.getValue();
+ if (entryKey == null || entryKey.length == 0) {
+ try {
+ if (isBinaryColumns[fieldId]) {
+ return HBaseBinarySerializerDeserializer.deserialize(schemaColumns[fieldId], entryValue);
+ } else {
+ return HBaseTextSerializerDeserializer.deserialize(schemaColumns[fieldId], entryValue);
+ }
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ throw new RuntimeException(e.getMessage(), e);
+ }
+ }
+ }
+ StringBuilder sb = new StringBuilder();
+ sb.append("{");
+ for (NavigableMap.Entry<byte[], byte[]> entry : cfMap.entrySet()) {
+ byte[] entryKey = entry.getKey();
+ byte[] entryValue = entry.getValue();
+
+ String keyText = new String(entryKey);
+ String valueText = null;
+ if (entryValue != null) {
+ try {
+ if (isBinaryColumns[fieldId]) {
+ valueText = HBaseBinarySerializerDeserializer.deserialize(schemaColumns[fieldId], entryValue).asChars();
+ } else {
+ valueText = HBaseTextSerializerDeserializer.deserialize(schemaColumns[fieldId], entryValue).asChars();
+ }
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ throw new RuntimeException(e.getMessage(), e);
+ }
+ }
+ sb.append(delim).append("\"").append(keyText).append("\":\"").append(valueText).append("\"");
+ delim = ", ";
+ count++;
+ if (count > MAX_LIST_SIZE) {
+ break;
+ }
+ } //end of for
+ sb.append("}");
+ return new TextDatum(sb.toString());
+ } else {
+ value = null;
+ }
+ } else {
+ value = result.getValue(mappingColumnFamilies[fieldId][0], mappingColumnFamilies[fieldId][1]);
+ }
+ }
+ }
+
+ if (value == null) {
+ return NullDatum.get();
+ } else {
+ try {
+ if (isBinaryColumns[fieldId]) {
+ return HBaseBinarySerializerDeserializer.deserialize(schemaColumns[fieldId], value);
+ } else {
+ return HBaseTextSerializerDeserializer.deserialize(schemaColumns[fieldId], value);
+ }
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ throw new RuntimeException(e.getMessage(), e);
+ }
+ }
+ }
+
+ @Override
+ public void reset() throws IOException {
+ progress = 0.0f;
+ scanResultIndex = -1;
+ scanResults = null;
+ finished.set(false);
+ tableStats = new TableStats();
+
+ if (scanner != null) {
+ scanner.close();
+ scanner = null;
+ }
+
+ initScanner();
+ }
+
+ @Override
+ public void close() throws IOException {
+ progress = 1.0f;
+ finished.set(true);
+ if (scanner != null) {
+ try {
+ scanner.close();
+ scanner = null;
+ } catch (Exception e) {
+ LOG.warn("Error while closing hbase scanner: " + e.getMessage(), e);
+ }
+ }
+ if (htable != null) {
+ htable.close();
+ htable = null;
+ }
+ }
+
+ @Override
+ public boolean isProjectable() {
+ return true;
+ }
+
+ @Override
+ public void setTarget(Column[] targets) {
+ if (inited) {
+ throw new IllegalStateException("Should be called before init()");
+ }
+ this.targets = targets;
+ }
+
+ @Override
+ public boolean isSelectable() {
+ return false;
+ }
+
+ @Override
+ public void setSearchCondition(Object expr) {
+ // TODO implements adding column filter to scanner.
+ }
+
+ @Override
+ public boolean isSplittable() {
+ return true;
+ }
+
+ @Override
+ public float getProgress() {
+ return progress;
+ }
+
+ @Override
+ public TableStats getInputStats() {
+ tableStats.setNumRows(numRows);
+ return tableStats;
+ }
+
+ @Override
+ public Schema getSchema() {
+ return schema;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseStorageConstants.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseStorageConstants.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseStorageConstants.java
new file mode 100644
index 0000000..2c525a1
--- /dev/null
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseStorageConstants.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.hbase;
+
+public interface HBaseStorageConstants {
+ public static final String KEY_COLUMN_MAPPING = "key";
+ public static final String VALUE_COLUMN_MAPPING = "value";
+ public static final String META_FETCH_ROWNUM_KEY = "fetch.rownum";
+ public static final String META_TABLE_KEY = "table";
+ public static final String META_COLUMNS_KEY = "columns";
+ public static final String META_SPLIT_ROW_KEYS_KEY = "hbase.split.rowkeys";
+ public static final String META_SPLIT_ROW_KEYS_FILE_KEY = "hbase.split.rowkeys.file";
+ public static final String META_ZK_QUORUM_KEY = "hbase.zookeeper.quorum";
+ public static final String META_ROWKEY_DELIMITER = "hbase.rowkey.delimiter";
+
+ public static final String INSERT_PUT_MODE = "tajo.hbase.insert.put.mode";
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseStorageManager.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseStorageManager.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseStorageManager.java
new file mode 100644
index 0000000..a6e7a81
--- /dev/null
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseStorageManager.java
@@ -0,0 +1,1135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.hbase;
+
+import com.google.common.collect.Sets;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.*;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
+import org.apache.hadoop.mapreduce.task.JobContextImpl;
+import org.apache.tajo.*;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.TextDatum;
+import org.apache.tajo.plan.LogicalPlan;
+import org.apache.tajo.plan.expr.*;
+import org.apache.tajo.plan.logical.CreateTableNode;
+import org.apache.tajo.plan.logical.LogicalNode;
+import org.apache.tajo.plan.logical.NodeType;
+import org.apache.tajo.plan.logical.ScanNode;
+import org.apache.tajo.plan.rewrite.RewriteRule;
+import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.fragment.Fragment;
+import org.apache.tajo.util.Bytes;
+import org.apache.tajo.util.BytesUtils;
+import org.apache.tajo.util.Pair;
+import org.apache.tajo.util.TUtil;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.*;
+
+/**
+ * StorageManager for HBase table.
+ */
+public class HBaseStorageManager extends StorageManager {
+ private static final Log LOG = LogFactory.getLog(HBaseStorageManager.class);
+
+ private Map<HConnectionKey, HConnection> connMap = new HashMap<HConnectionKey, HConnection>();
+
+ public HBaseStorageManager (StoreType storeType) {
+ super(storeType);
+ }
+
+ @Override
+ public void storageInit() throws IOException {
+ }
+
+ @Override
+ public void closeStorageManager() {
+ synchronized (connMap) {
+ for (HConnection eachConn: connMap.values()) {
+ try {
+ eachConn.close();
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void createTable(TableDesc tableDesc, boolean ifNotExists) throws IOException {
+ createTable(tableDesc.getMeta(), tableDesc.getSchema(), tableDesc.isExternal(), ifNotExists);
+ TableStats stats = new TableStats();
+ stats.setNumRows(TajoConstants.UNKNOWN_ROW_NUMBER);
+ tableDesc.setStats(stats);
+ }
+
+ private void createTable(TableMeta tableMeta, Schema schema,
+ boolean isExternal, boolean ifNotExists) throws IOException {
+ String hbaseTableName = tableMeta.getOption(HBaseStorageConstants.META_TABLE_KEY, "");
+ if (hbaseTableName == null || hbaseTableName.trim().isEmpty()) {
+ throw new IOException("HBase mapped table is required a '" +
+ HBaseStorageConstants.META_TABLE_KEY + "' attribute.");
+ }
+ TableName hTableName = TableName.valueOf(hbaseTableName);
+
+ String mappedColumns = tableMeta.getOption(HBaseStorageConstants.META_COLUMNS_KEY, "");
+ if (mappedColumns != null && mappedColumns.split(",").length > schema.size()) {
+ throw new IOException("Columns property has more entry than Tajo table columns");
+ }
+
+ ColumnMapping columnMapping = new ColumnMapping(schema, tableMeta);
+ int numRowKeys = 0;
+ boolean[] isRowKeyMappings = columnMapping.getIsRowKeyMappings();
+ for (int i = 0; i < isRowKeyMappings.length; i++) {
+ if (isRowKeyMappings[i]) {
+ numRowKeys++;
+ }
+ }
+ if (numRowKeys > 1) {
+ for (int i = 0; i < isRowKeyMappings.length; i++) {
+ if (isRowKeyMappings[i] && schema.getColumn(i).getDataType().getType() != Type.TEXT) {
+ throw new IOException("Key field type should be TEXT type.");
+ }
+ }
+ }
+
+ for (int i = 0; i < isRowKeyMappings.length; i++) {
+ if (columnMapping.getIsColumnKeys()[i] && schema.getColumn(i).getDataType().getType() != Type.TEXT) {
+ throw new IOException("Column key field('<cfname>:key:') type should be TEXT type.");
+ }
+ if (columnMapping.getIsColumnValues()[i] && schema.getColumn(i).getDataType().getType() != Type.TEXT) {
+ throw new IOException("Column value field(('<cfname>:value:') type should be TEXT type.");
+ }
+ }
+
+ Configuration hConf = getHBaseConfiguration(conf, tableMeta);
+ HBaseAdmin hAdmin = new HBaseAdmin(hConf);
+
+ try {
+ if (isExternal) {
+ // If tajo table is external table, only check validation.
+ if (mappedColumns == null || mappedColumns.isEmpty()) {
+ throw new IOException("HBase mapped table is required a '" +
+ HBaseStorageConstants.META_COLUMNS_KEY + "' attribute.");
+ }
+ if (!hAdmin.tableExists(hTableName)) {
+ throw new IOException("HBase table [" + hbaseTableName + "] not exists. " +
+ "External table should be a existed table.");
+ }
+ HTableDescriptor hTableDescriptor = hAdmin.getTableDescriptor(hTableName);
+ Set<String> tableColumnFamilies = new HashSet<String>();
+ for (HColumnDescriptor eachColumn : hTableDescriptor.getColumnFamilies()) {
+ tableColumnFamilies.add(eachColumn.getNameAsString());
+ }
+
+ Collection<String> mappingColumnFamilies =columnMapping.getColumnFamilyNames();
+ if (mappingColumnFamilies.isEmpty()) {
+ throw new IOException("HBase mapped table is required a '" +
+ HBaseStorageConstants.META_COLUMNS_KEY + "' attribute.");
+ }
+
+ for (String eachMappingColumnFamily : mappingColumnFamilies) {
+ if (!tableColumnFamilies.contains(eachMappingColumnFamily)) {
+ throw new IOException("There is no " + eachMappingColumnFamily + " column family in " + hbaseTableName);
+ }
+ }
+ } else {
+ if (hAdmin.tableExists(hbaseTableName)) {
+ if (ifNotExists) {
+ return;
+ } else {
+ throw new IOException("HBase table [" + hbaseTableName + "] already exists.");
+ }
+ }
+ // Creating hbase table
+ HTableDescriptor hTableDescriptor = parseHTableDescriptor(tableMeta, schema);
+
+ byte[][] splitKeys = getSplitKeys(conf, schema, tableMeta);
+ if (splitKeys == null) {
+ hAdmin.createTable(hTableDescriptor);
+ } else {
+ hAdmin.createTable(hTableDescriptor, splitKeys);
+ }
+ }
+ } finally {
+ hAdmin.close();
+ }
+ }
+
+ /**
+ * Returns initial region split keys.
+ *
+ * @param conf
+ * @param schema
+ * @param meta
+ * @return
+ * @throws java.io.IOException
+ */
+ private byte[][] getSplitKeys(TajoConf conf, Schema schema, TableMeta meta) throws IOException {
+ String splitRowKeys = meta.getOption(HBaseStorageConstants.META_SPLIT_ROW_KEYS_KEY, "");
+ String splitRowKeysFile = meta.getOption(HBaseStorageConstants.META_SPLIT_ROW_KEYS_FILE_KEY, "");
+
+ if ((splitRowKeys == null || splitRowKeys.isEmpty()) &&
+ (splitRowKeysFile == null || splitRowKeysFile.isEmpty())) {
+ return null;
+ }
+
+ ColumnMapping columnMapping = new ColumnMapping(schema, meta);
+ boolean[] isBinaryColumns = columnMapping.getIsBinaryColumns();
+ boolean[] isRowKeys = columnMapping.getIsRowKeyMappings();
+
+ boolean rowkeyBinary = false;
+ int numRowKeys = 0;
+ Column rowKeyColumn = null;
+ for (int i = 0; i < isBinaryColumns.length; i++) {
+ if (isBinaryColumns[i] && isRowKeys[i]) {
+ rowkeyBinary = true;
+ }
+ if (isRowKeys[i]) {
+ numRowKeys++;
+ rowKeyColumn = schema.getColumn(i);
+ }
+ }
+
+ if (rowkeyBinary && numRowKeys > 1) {
+ throw new IOException("If rowkey is mapped to multi column and a rowkey is binary, " +
+ "Multiple region for creation is not support.");
+ }
+
+ if (splitRowKeys != null && !splitRowKeys.isEmpty()) {
+ String[] splitKeyTokens = splitRowKeys.split(",");
+ byte[][] splitKeys = new byte[splitKeyTokens.length][];
+ for (int i = 0; i < splitKeyTokens.length; i++) {
+ if (numRowKeys == 1 && rowkeyBinary) {
+ splitKeys[i] = HBaseBinarySerializerDeserializer.serialize(rowKeyColumn, new TextDatum(splitKeyTokens[i]));
+ } else {
+ splitKeys[i] = HBaseTextSerializerDeserializer.serialize(rowKeyColumn, new TextDatum(splitKeyTokens[i]));
+ }
+ }
+ return splitKeys;
+ }
+
+ if (splitRowKeysFile != null && !splitRowKeysFile.isEmpty()) {
+ // If there is many split keys, Tajo allows to define in the file.
+ Path path = new Path(splitRowKeysFile);
+ FileSystem fs = path.getFileSystem(conf);
+ if (!fs.exists(path)) {
+ throw new IOException("hbase.split.rowkeys.file=" + path.toString() + " not exists.");
+ }
+
+ SortedSet<String> splitKeySet = new TreeSet<String>();
+ BufferedReader reader = null;
+ try {
+ reader = new BufferedReader(new InputStreamReader(fs.open(path)));
+ String line = null;
+ while ( (line = reader.readLine()) != null ) {
+ if (line.isEmpty()) {
+ continue;
+ }
+ splitKeySet.add(line);
+ }
+ } finally {
+ if (reader != null) {
+ reader.close();
+ }
+ }
+
+ if (splitKeySet.isEmpty()) {
+ return null;
+ }
+
+ byte[][] splitKeys = new byte[splitKeySet.size()][];
+ int index = 0;
+ for (String eachKey: splitKeySet) {
+ if (numRowKeys == 1 && rowkeyBinary) {
+ splitKeys[index++] = HBaseBinarySerializerDeserializer.serialize(rowKeyColumn, new TextDatum(eachKey));
+ } else {
+ splitKeys[index++] = HBaseTextSerializerDeserializer.serialize(rowKeyColumn, new TextDatum(eachKey));
+ }
+ }
+
+ return splitKeys;
+ }
+
+ return null;
+ }
+
+ /**
+ * Creates Configuration instance and sets with hbase connection options.
+ *
+ * @param conf
+ * @param tableMeta
+ * @return
+ * @throws java.io.IOException
+ */
+ public static Configuration getHBaseConfiguration(Configuration conf, TableMeta tableMeta) throws IOException {
+ String zkQuorum = tableMeta.getOption(HBaseStorageConstants.META_ZK_QUORUM_KEY, "");
+ if (zkQuorum == null || zkQuorum.trim().isEmpty()) {
+ throw new IOException("HBase mapped table is required a '" +
+ HBaseStorageConstants.META_ZK_QUORUM_KEY + "' attribute.");
+ }
+
+ Configuration hbaseConf = (conf == null) ? HBaseConfiguration.create() : HBaseConfiguration.create(conf);
+ hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, zkQuorum);
+
+ for (Map.Entry<String, String> eachOption: tableMeta.getOptions().getAllKeyValus().entrySet()) {
+ String key = eachOption.getKey();
+ if (key.startsWith(HConstants.ZK_CFG_PROPERTY_PREFIX)) {
+ hbaseConf.set(key, eachOption.getValue());
+ }
+ }
+ return hbaseConf;
+ }
+
+ /**
+ * Creates HTableDescription using table meta data.
+ *
+ * @param tableMeta
+ * @param schema
+ * @return
+ * @throws java.io.IOException
+ */
+ public static HTableDescriptor parseHTableDescriptor(TableMeta tableMeta, Schema schema) throws IOException {
+ String hbaseTableName = tableMeta.getOption(HBaseStorageConstants.META_TABLE_KEY, "");
+ if (hbaseTableName == null || hbaseTableName.trim().isEmpty()) {
+ throw new IOException("HBase mapped table is required a '" +
+ HBaseStorageConstants.META_TABLE_KEY + "' attribute.");
+ }
+ TableName hTableName = TableName.valueOf(hbaseTableName);
+
+ ColumnMapping columnMapping = new ColumnMapping(schema, tableMeta);
+
+ HTableDescriptor hTableDescriptor = new HTableDescriptor(hTableName);
+
+ Collection<String> columnFamilies = columnMapping.getColumnFamilyNames();
+ //If 'columns' attribute is empty, Tajo table columns are mapped to all HBase table column.
+ if (columnFamilies.isEmpty()) {
+ for (Column eachColumn: schema.getColumns()) {
+ columnFamilies.add(eachColumn.getSimpleName());
+ }
+ }
+
+ for (String eachColumnFamily: columnFamilies) {
+ hTableDescriptor.addFamily(new HColumnDescriptor(eachColumnFamily));
+ }
+
+ return hTableDescriptor;
+ }
+
+ @Override
+ public void purgeTable(TableDesc tableDesc) throws IOException {
+ HBaseAdmin hAdmin = new HBaseAdmin(getHBaseConfiguration(conf, tableDesc.getMeta()));
+
+ try {
+ HTableDescriptor hTableDesc = parseHTableDescriptor(tableDesc.getMeta(), tableDesc.getSchema());
+ LOG.info("Deleting hbase table: " + new String(hTableDesc.getName()));
+ hAdmin.disableTable(hTableDesc.getName());
+ hAdmin.deleteTable(hTableDesc.getName());
+ } finally {
+ hAdmin.close();
+ }
+ }
+
+ /**
+ * Returns columns which are mapped to the rowkey of the hbase table.
+ *
+ * @param tableDesc
+ * @return
+ * @throws java.io.IOException
+ */
+ private Column[] getIndexableColumns(TableDesc tableDesc) throws IOException {
+ ColumnMapping columnMapping = new ColumnMapping(tableDesc.getSchema(), tableDesc.getMeta());
+ boolean[] isRowKeyMappings = columnMapping.getIsRowKeyMappings();
+ int[] rowKeyIndexes = columnMapping.getRowKeyFieldIndexes();
+
+ Column indexColumn = null;
+ for (int i = 0; i < isRowKeyMappings.length; i++) {
+ if (isRowKeyMappings[i]) {
+ if (columnMapping.getNumRowKeys() == 1 ||
+ rowKeyIndexes[i] == 0) {
+ indexColumn = tableDesc.getSchema().getColumn(i);
+ }
+ }
+ }
+ return new Column[]{indexColumn};
+ }
+
+ @Override
+ public List<Fragment> getSplits(String fragmentId, TableDesc tableDesc, ScanNode scanNode) throws IOException {
+ ColumnMapping columnMapping = new ColumnMapping(tableDesc.getSchema(), tableDesc.getMeta());
+
+ List<IndexPredication> indexPredications = getIndexPredications(columnMapping, tableDesc, scanNode);
+ Configuration hconf = getHBaseConfiguration(conf, tableDesc.getMeta());
+ HTable htable = null;
+ HBaseAdmin hAdmin = null;
+
+ try {
+ htable = new HTable(hconf, tableDesc.getMeta().getOption(HBaseStorageConstants.META_TABLE_KEY));
+
+ org.apache.hadoop.hbase.util.Pair<byte[][], byte[][]> keys = htable.getStartEndKeys();
+ if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) {
+ HRegionLocation regLoc = htable.getRegionLocation(HConstants.EMPTY_BYTE_ARRAY, false);
+ if (null == regLoc) {
+ throw new IOException("Expecting at least one region.");
+ }
+ List<Fragment> fragments = new ArrayList<Fragment>(1);
+ Fragment fragment = new HBaseFragment(fragmentId, htable.getName().getNameAsString(),
+ HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, regLoc.getHostname());
+ fragments.add(fragment);
+ return fragments;
+ }
+
+ List<byte[]> startRows;
+ List<byte[]> stopRows;
+
+ if (indexPredications != null && !indexPredications.isEmpty()) {
+ // indexPredications is Disjunctive set
+ startRows = new ArrayList<byte[]>();
+ stopRows = new ArrayList<byte[]>();
+ for (IndexPredication indexPredication: indexPredications) {
+ byte[] startRow;
+ byte[] stopRow;
+ if (indexPredication.getStartValue() != null) {
+ startRow = serialize(columnMapping, indexPredication, indexPredication.getStartValue());
+ } else {
+ startRow = HConstants.EMPTY_START_ROW;
+ }
+ if (indexPredication.getStopValue() != null) {
+ stopRow = serialize(columnMapping, indexPredication, indexPredication.getStopValue());
+ } else {
+ stopRow = HConstants.EMPTY_END_ROW;
+ }
+ startRows.add(startRow);
+ stopRows.add(stopRow);
+ }
+ } else {
+ startRows = TUtil.newList(HConstants.EMPTY_START_ROW);
+ stopRows = TUtil.newList(HConstants.EMPTY_END_ROW);
+ }
+
+ hAdmin = new HBaseAdmin(hconf);
+ Map<ServerName, ServerLoad> serverLoadMap = new HashMap<ServerName, ServerLoad>();
+
+ // region startkey -> HBaseFragment
+ Map<byte[], HBaseFragment> fragmentMap = new HashMap<byte[], HBaseFragment>();
+ for (int i = 0; i < keys.getFirst().length; i++) {
+ HRegionLocation location = htable.getRegionLocation(keys.getFirst()[i], false);
+
+ byte[] regionStartKey = keys.getFirst()[i];
+ byte[] regionStopKey = keys.getSecond()[i];
+
+ int startRowsSize = startRows.size();
+ for (int j = 0; j < startRowsSize; j++) {
+ byte[] startRow = startRows.get(j);
+ byte[] stopRow = stopRows.get(j);
+ // determine if the given start an stop key fall into the region
+ if ((startRow.length == 0 || regionStopKey.length == 0 || Bytes.compareTo(startRow, regionStopKey) < 0)
+ && (stopRow.length == 0 || Bytes.compareTo(stopRow, regionStartKey) > 0)) {
+ byte[] fragmentStart = (startRow.length == 0 || Bytes.compareTo(regionStartKey, startRow) >= 0) ?
+ regionStartKey : startRow;
+
+ byte[] fragmentStop = (stopRow.length == 0 || Bytes.compareTo(regionStopKey, stopRow) <= 0) &&
+ regionStopKey.length > 0 ? regionStopKey : stopRow;
+
+ String regionName = location.getRegionInfo().getRegionNameAsString();
+
+ ServerLoad serverLoad = serverLoadMap.get(location.getServerName());
+ if (serverLoad == null) {
+ serverLoad = hAdmin.getClusterStatus().getLoad(location.getServerName());
+ serverLoadMap.put(location.getServerName(), serverLoad);
+ }
+
+ if (fragmentMap.containsKey(regionStartKey)) {
+ HBaseFragment prevFragment = fragmentMap.get(regionStartKey);
+ if (Bytes.compareTo(fragmentStart, prevFragment.getStartRow()) < 0) {
+ prevFragment.setStartRow(fragmentStart);
+ }
+ if (Bytes.compareTo(fragmentStop, prevFragment.getStopRow()) > 0) {
+ prevFragment.setStopRow(fragmentStop);
+ }
+ } else {
+ HBaseFragment fragment = new HBaseFragment(fragmentId, htable.getName().getNameAsString(),
+ fragmentStart, fragmentStop, location.getHostname());
+
+ // get region size
+ boolean foundLength = false;
+ for (Map.Entry<byte[], RegionLoad> entry : serverLoad.getRegionsLoad().entrySet()) {
+ if (regionName.equals(Bytes.toString(entry.getKey()))) {
+ RegionLoad regionLoad = entry.getValue();
+ long storeFileSize = (regionLoad.getStorefileSizeMB() + regionLoad.getMemStoreSizeMB()) * 1024L * 1024L;
+ fragment.setLength(storeFileSize);
+ foundLength = true;
+ break;
+ }
+ }
+
+ if (!foundLength) {
+ fragment.setLength(TajoConstants.UNKNOWN_LENGTH);
+ }
+
+ fragmentMap.put(regionStartKey, fragment);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("getFragments: fragment -> " + i + " -> " + fragment);
+ }
+ }
+ }
+ }
+ }
+
+ List<HBaseFragment> fragments = new ArrayList<HBaseFragment>(fragmentMap.values());
+ Collections.sort(fragments);
+ if (!fragments.isEmpty()) {
+ fragments.get(fragments.size() - 1).setLast(true);
+ }
+ return (ArrayList<Fragment>) (ArrayList) fragments;
+ } finally {
+ if (htable != null) {
+ htable.close();
+ }
+ if (hAdmin != null) {
+ hAdmin.close();
+ }
+ }
+ }
+
+ private byte[] serialize(ColumnMapping columnMapping,
+ IndexPredication indexPredication, Datum datum) throws IOException {
+ if (columnMapping.getIsBinaryColumns()[indexPredication.getColumnId()]) {
+ return HBaseBinarySerializerDeserializer.serialize(indexPredication.getColumn(), datum);
+ } else {
+ return HBaseTextSerializerDeserializer.serialize(indexPredication.getColumn(), datum);
+ }
+ }
+
+ @Override
+ public Appender getAppender(OverridableConf queryContext,
+ QueryUnitAttemptId taskAttemptId, TableMeta meta, Schema schema, Path workDir)
+ throws IOException {
+ if ("true".equalsIgnoreCase(queryContext.get(HBaseStorageConstants.INSERT_PUT_MODE, "false"))) {
+ return new HBasePutAppender(conf, taskAttemptId, schema, meta, workDir);
+ } else {
+ return super.getAppender(queryContext, taskAttemptId, meta, schema, workDir);
+ }
+ }
+
+ @Override
+ public List<Fragment> getNonForwardSplit(TableDesc tableDesc, int currentPage, int numFragments)
+ throws IOException {
+ Configuration hconf = getHBaseConfiguration(conf, tableDesc.getMeta());
+ HTable htable = null;
+ HBaseAdmin hAdmin = null;
+ try {
+ htable = new HTable(hconf, tableDesc.getMeta().getOption(HBaseStorageConstants.META_TABLE_KEY));
+
+ org.apache.hadoop.hbase.util.Pair<byte[][], byte[][]> keys = htable.getStartEndKeys();
+ if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) {
+ return new ArrayList<Fragment>(1);
+ }
+ hAdmin = new HBaseAdmin(hconf);
+ Map<ServerName, ServerLoad> serverLoadMap = new HashMap<ServerName, ServerLoad>();
+
+ List<Fragment> fragments = new ArrayList<Fragment>(keys.getFirst().length);
+
+ int start = currentPage * numFragments;
+ if (start >= keys.getFirst().length) {
+ return new ArrayList<Fragment>(1);
+ }
+ int end = (currentPage + 1) * numFragments;
+ if (end > keys.getFirst().length) {
+ end = keys.getFirst().length;
+ }
+ for (int i = start; i < end; i++) {
+ HRegionLocation location = htable.getRegionLocation(keys.getFirst()[i], false);
+
+ String regionName = location.getRegionInfo().getRegionNameAsString();
+ ServerLoad serverLoad = serverLoadMap.get(location.getServerName());
+ if (serverLoad == null) {
+ serverLoad = hAdmin.getClusterStatus().getLoad(location.getServerName());
+ serverLoadMap.put(location.getServerName(), serverLoad);
+ }
+
+ HBaseFragment fragment = new HBaseFragment(tableDesc.getName(), htable.getName().getNameAsString(),
+ location.getRegionInfo().getStartKey(), location.getRegionInfo().getEndKey(), location.getHostname());
+
+ // get region size
+ boolean foundLength = false;
+ for (Map.Entry<byte[], RegionLoad> entry : serverLoad.getRegionsLoad().entrySet()) {
+ if (regionName.equals(Bytes.toString(entry.getKey()))) {
+ RegionLoad regionLoad = entry.getValue();
+ long storeLength = (regionLoad.getStorefileSizeMB() + regionLoad.getMemStoreSizeMB()) * 1024L * 1024L;
+ if (storeLength == 0) {
+ // If store size is smaller than 1 MB, storeLength is zero
+ storeLength = 1 * 1024 * 1024; //default 1MB
+ }
+ fragment.setLength(storeLength);
+ foundLength = true;
+ break;
+ }
+ }
+
+ if (!foundLength) {
+ fragment.setLength(TajoConstants.UNKNOWN_LENGTH);
+ }
+
+ fragments.add(fragment);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("getFragments: fragment -> " + i + " -> " + fragment);
+ }
+ }
+
+ if (!fragments.isEmpty()) {
+ ((HBaseFragment) fragments.get(fragments.size() - 1)).setLast(true);
+ }
+ return fragments;
+ } finally {
+ if (htable != null) {
+ htable.close();
+ }
+ if (hAdmin != null) {
+ hAdmin.close();
+ }
+ }
+ }
+
+ public HConnection getConnection(Configuration hbaseConf) throws IOException {
+ synchronized(connMap) {
+ HConnectionKey key = new HConnectionKey(hbaseConf);
+ HConnection conn = connMap.get(key);
+ if (conn == null) {
+ conn = HConnectionManager.createConnection(hbaseConf);
+ connMap.put(key, conn);
+ }
+
+ return conn;
+ }
+ }
+
+ static class HConnectionKey {
+ final static String[] CONNECTION_PROPERTIES = new String[] {
+ HConstants.ZOOKEEPER_QUORUM, HConstants.ZOOKEEPER_ZNODE_PARENT,
+ HConstants.ZOOKEEPER_CLIENT_PORT,
+ HConstants.ZOOKEEPER_RECOVERABLE_WAITTIME,
+ HConstants.HBASE_CLIENT_PAUSE, HConstants.HBASE_CLIENT_RETRIES_NUMBER,
+ HConstants.HBASE_RPC_TIMEOUT_KEY,
+ HConstants.HBASE_CLIENT_PREFETCH_LIMIT,
+ HConstants.HBASE_META_SCANNER_CACHING,
+ HConstants.HBASE_CLIENT_INSTANCE_ID,
+ HConstants.RPC_CODEC_CONF_KEY };
+
+ private Map<String, String> properties;
+ private String username;
+
+ HConnectionKey(Configuration conf) {
+ Map<String, String> m = new HashMap<String, String>();
+ if (conf != null) {
+ for (String property : CONNECTION_PROPERTIES) {
+ String value = conf.get(property);
+ if (value != null) {
+ m.put(property, value);
+ }
+ }
+ }
+ this.properties = Collections.unmodifiableMap(m);
+
+ try {
+ UserProvider provider = UserProvider.instantiate(conf);
+ User currentUser = provider.getCurrent();
+ if (currentUser != null) {
+ username = currentUser.getName();
+ }
+ } catch (IOException ioe) {
+ LOG.warn("Error obtaining current user, skipping username in HConnectionKey", ioe);
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ if (username != null) {
+ result = username.hashCode();
+ }
+ for (String property : CONNECTION_PROPERTIES) {
+ String value = properties.get(property);
+ if (value != null) {
+ result = prime * result + value.hashCode();
+ }
+ }
+
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ HConnectionKey that = (HConnectionKey) obj;
+ if (this.username != null && !this.username.equals(that.username)) {
+ return false;
+ } else if (this.username == null && that.username != null) {
+ return false;
+ }
+ if (this.properties == null) {
+ if (that.properties != null) {
+ return false;
+ }
+ } else {
+ if (that.properties == null) {
+ return false;
+ }
+ for (String property : CONNECTION_PROPERTIES) {
+ String thisValue = this.properties.get(property);
+ String thatValue = that.properties.get(property);
+ //noinspection StringEquality
+ if (thisValue == thatValue) {
+ continue;
+ }
+ if (thisValue == null || !thisValue.equals(thatValue)) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ return "HConnectionKey{" +
+ "properties=" + properties +
+ ", username='" + username + '\'' +
+ '}';
+ }
+ }
+
+ public List<IndexPredication> getIndexPredications(ColumnMapping columnMapping,
+ TableDesc tableDesc, ScanNode scanNode) throws IOException {
+ List<IndexPredication> indexPredications = new ArrayList<IndexPredication>();
+ Column[] indexableColumns = getIndexableColumns(tableDesc);
+ if (indexableColumns != null && indexableColumns.length == 1) {
+ // Currently supports only single index column.
+ List<Set<EvalNode>> indexablePredicateList = findIndexablePredicateSet(scanNode, indexableColumns);
+ for (Set<EvalNode> eachEvalSet: indexablePredicateList) {
+ Pair<Datum, Datum> indexPredicationValues = getIndexablePredicateValue(columnMapping, eachEvalSet);
+ if (indexPredicationValues != null) {
+ IndexPredication indexPredication = new IndexPredication();
+ indexPredication.setColumn(indexableColumns[0]);
+ indexPredication.setColumnId(tableDesc.getLogicalSchema().getColumnId(indexableColumns[0].getQualifiedName()));
+ indexPredication.setStartValue(indexPredicationValues.getFirst());
+ indexPredication.setStopValue(indexPredicationValues.getSecond());
+
+ indexPredications.add(indexPredication);
+ }
+ }
+ }
+ return indexPredications;
+ }
+
+ public List<Set<EvalNode>> findIndexablePredicateSet(ScanNode scanNode, Column[] indexableColumns) throws IOException {
+ List<Set<EvalNode>> indexablePredicateList = new ArrayList<Set<EvalNode>>();
+
+ // if a query statement has a search condition, try to find indexable predicates
+ if (indexableColumns != null && scanNode.getQual() != null) {
+ EvalNode[] disjunctiveForms = AlgebraicUtil.toDisjunctiveNormalFormArray(scanNode.getQual());
+
+ // add qualifier to schema for qual
+ for (Column column : indexableColumns) {
+ for (EvalNode disjunctiveExpr : disjunctiveForms) {
+ EvalNode[] conjunctiveForms = AlgebraicUtil.toConjunctiveNormalFormArray(disjunctiveExpr);
+ Set<EvalNode> indexablePredicateSet = Sets.newHashSet();
+ for (EvalNode conjunctiveExpr : conjunctiveForms) {
+ if (checkIfIndexablePredicateOnTargetColumn(conjunctiveExpr, column)) {
+ indexablePredicateSet.add(conjunctiveExpr);
+ }
+ }
+ if (!indexablePredicateSet.isEmpty()) {
+ indexablePredicateList.add(indexablePredicateSet);
+ }
+ }
+ }
+ }
+
+ return indexablePredicateList;
+ }
+
+ private boolean checkIfIndexablePredicateOnTargetColumn(EvalNode evalNode, Column targetColumn) {
+ if (checkIfIndexablePredicate(evalNode) || checkIfConjunctiveButOneVariable(evalNode)) {
+ Set<Column> variables = EvalTreeUtil.findUniqueColumns(evalNode);
+ // if it contains only single variable matched to a target column
+ return variables.size() == 1 && variables.contains(targetColumn);
+ } else {
+ return false;
+ }
+ }
+
+ /**
+ *
+ * @param evalNode The expression to be checked
+ * @return true if an conjunctive expression, consisting of indexable expressions
+ */
+ private boolean checkIfConjunctiveButOneVariable(EvalNode evalNode) {
+ if (evalNode.getType() == EvalType.AND) {
+ BinaryEval orEval = (BinaryEval) evalNode;
+ boolean indexable =
+ checkIfIndexablePredicate(orEval.getLeftExpr()) &&
+ checkIfIndexablePredicate(orEval.getRightExpr());
+
+ boolean sameVariable =
+ EvalTreeUtil.findUniqueColumns(orEval.getLeftExpr())
+ .equals(EvalTreeUtil.findUniqueColumns(orEval.getRightExpr()));
+
+ return indexable && sameVariable;
+ } else {
+ return false;
+ }
+ }
+
+ /**
+ * Check if an expression consists of one variable and one constant and
+ * the expression is a comparison operator.
+ *
+ * @param evalNode The expression to be checked
+ * @return true if an expression consists of one variable and one constant
+ * and the expression is a comparison operator. Other, false.
+ */
+ private boolean checkIfIndexablePredicate(EvalNode evalNode) {
+ return AlgebraicUtil.containSingleVar(evalNode) && isIndexableOperator(evalNode);
+ }
+
+ public static boolean isIndexableOperator(EvalNode expr) {
+ return expr.getType() == EvalType.EQUAL ||
+ expr.getType() == EvalType.LEQ ||
+ expr.getType() == EvalType.LTH ||
+ expr.getType() == EvalType.GEQ ||
+ expr.getType() == EvalType.GTH ||
+ expr.getType() == EvalType.BETWEEN;
+ }
+
+ public Pair<Datum, Datum> getIndexablePredicateValue(ColumnMapping columnMapping,
+ Set<EvalNode> evalNodes) {
+ Datum startDatum = null;
+ Datum endDatum = null;
+ for (EvalNode evalNode: evalNodes) {
+ if (evalNode instanceof BinaryEval) {
+ BinaryEval binaryEval = (BinaryEval) evalNode;
+ EvalNode left = binaryEval.getLeftExpr();
+ EvalNode right = binaryEval.getRightExpr();
+
+ Datum constValue = null;
+ if (left.getType() == EvalType.CONST) {
+ constValue = ((ConstEval) left).getValue();
+ } else if (right.getType() == EvalType.CONST) {
+ constValue = ((ConstEval) right).getValue();
+ }
+
+ if (constValue != null) {
+ if (evalNode.getType() == EvalType.EQUAL ||
+ evalNode.getType() == EvalType.GEQ ||
+ evalNode.getType() == EvalType.GTH) {
+ if (startDatum != null) {
+ if (constValue.compareTo(startDatum) > 0) {
+ startDatum = constValue;
+ }
+ } else {
+ startDatum = constValue;
+ }
+ }
+
+ if (evalNode.getType() == EvalType.EQUAL ||
+ evalNode.getType() == EvalType.LEQ ||
+ evalNode.getType() == EvalType.LTH) {
+ if (endDatum != null) {
+ if (constValue.compareTo(endDatum) < 0) {
+ endDatum = constValue;
+ }
+ } else {
+ endDatum = constValue;
+ }
+ }
+ }
+ } else if (evalNode instanceof BetweenPredicateEval) {
+ BetweenPredicateEval betweenEval = (BetweenPredicateEval) evalNode;
+ if (betweenEval.getBegin().getType() == EvalType.CONST && betweenEval.getEnd().getType() == EvalType.CONST) {
+ Datum value = ((ConstEval) betweenEval.getBegin()).getValue();
+ if (startDatum != null) {
+ if (value.compareTo(startDatum) > 0) {
+ startDatum = value;
+ }
+ } else {
+ startDatum = value;
+ }
+
+ value = ((ConstEval) betweenEval.getEnd()).getValue();
+ if (endDatum != null) {
+ if (value.compareTo(endDatum) < 0) {
+ endDatum = value;
+ }
+ } else {
+ endDatum = value;
+ }
+ }
+ }
+ }
+
+ if (endDatum != null && columnMapping != null && columnMapping.getNumRowKeys() > 1) {
+ endDatum = new TextDatum(endDatum.asChars() +
+ new String(new char[]{columnMapping.getRowKeyDelimiter(), Character.MAX_VALUE}));
+ }
+ if (startDatum != null || endDatum != null) {
+ return new Pair<Datum, Datum>(startDatum, endDatum);
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public Path commitOutputData(OverridableConf queryContext, ExecutionBlockId finalEbId,
+ LogicalPlan plan, Schema schema,
+ TableDesc tableDesc) throws IOException {
+ if (tableDesc == null) {
+ throw new IOException("TableDesc is null while calling loadIncrementalHFiles: " + finalEbId);
+ }
+ Path stagingDir = new Path(queryContext.get(QueryVars.STAGING_DIR));
+ Path stagingResultDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME);
+
+ Configuration hbaseConf = HBaseStorageManager.getHBaseConfiguration(queryContext.getConf(), tableDesc.getMeta());
+ hbaseConf.set("hbase.loadincremental.threads.max", "2");
+
+ JobContextImpl jobContext = new JobContextImpl(hbaseConf,
+ new JobID(finalEbId.getQueryId().toString(), finalEbId.getId()));
+
+ FileOutputCommitter committer = new FileOutputCommitter(stagingResultDir, jobContext);
+ Path jobAttemptPath = committer.getJobAttemptPath(jobContext);
+ FileSystem fs = jobAttemptPath.getFileSystem(queryContext.getConf());
+ if (!fs.exists(jobAttemptPath) || fs.listStatus(jobAttemptPath) == null) {
+ LOG.warn("No query attempt file in " + jobAttemptPath);
+ return stagingResultDir;
+ }
+ committer.commitJob(jobContext);
+
+ if (tableDesc.getName() == null && tableDesc.getPath() != null) {
+
+ // insert into location
+ return super.commitOutputData(queryContext, finalEbId, plan, schema, tableDesc, false);
+ } else {
+ // insert into table
+ String tableName = tableDesc.getMeta().getOption(HBaseStorageConstants.META_TABLE_KEY);
+
+ HTable htable = new HTable(hbaseConf, tableName);
+ try {
+ LoadIncrementalHFiles loadIncrementalHFiles = null;
+ try {
+ loadIncrementalHFiles = new LoadIncrementalHFiles(hbaseConf);
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ throw new IOException(e.getMessage(), e);
+ }
+ loadIncrementalHFiles.doBulkLoad(stagingResultDir, htable);
+
+ return stagingResultDir;
+ } finally {
+ htable.close();
+ }
+ }
+ }
+
+ @Override
+ public TupleRange[] getInsertSortRanges(OverridableConf queryContext, TableDesc tableDesc,
+ Schema inputSchema, SortSpec[] sortSpecs, TupleRange dataRange)
+ throws IOException {
+ try {
+ int[] sortKeyIndexes = new int[sortSpecs.length];
+ for (int i = 0; i < sortSpecs.length; i++) {
+ sortKeyIndexes[i] = inputSchema.getColumnId(sortSpecs[i].getSortKey().getQualifiedName());
+ }
+
+ ColumnMapping columnMapping = new ColumnMapping(tableDesc.getSchema(), tableDesc.getMeta());
+ Configuration hbaseConf = HBaseStorageManager.getHBaseConfiguration(queryContext.getConf(), tableDesc.getMeta());
+
+ HTable htable = new HTable(hbaseConf, columnMapping.getHbaseTableName());
+ try {
+ byte[][] endKeys = htable.getEndKeys();
+ if (endKeys.length == 1) {
+ return new TupleRange[]{dataRange};
+ }
+ List<TupleRange> tupleRanges = new ArrayList<TupleRange>(endKeys.length);
+
+ TupleComparator comparator = new BaseTupleComparator(inputSchema, sortSpecs);
+ Tuple previousTuple = dataRange.getStart();
+
+ for (byte[] eachEndKey : endKeys) {
+ Tuple endTuple = new VTuple(sortSpecs.length);
+ byte[][] rowKeyFields;
+ if (sortSpecs.length > 1) {
+ byte[][] splitValues = BytesUtils.splitPreserveAllTokens(eachEndKey, columnMapping.getRowKeyDelimiter());
+ if (splitValues.length == sortSpecs.length) {
+ rowKeyFields = splitValues;
+ } else {
+ rowKeyFields = new byte[sortSpecs.length][];
+ for (int j = 0; j < sortSpecs.length; j++) {
+ if (j < splitValues.length) {
+ rowKeyFields[j] = splitValues[j];
+ } else {
+ rowKeyFields[j] = null;
+ }
+ }
+ }
+
+ } else {
+ rowKeyFields = new byte[1][];
+ rowKeyFields[0] = eachEndKey;
+ }
+
+ for (int i = 0; i < sortSpecs.length; i++) {
+ if (columnMapping.getIsBinaryColumns()[sortKeyIndexes[i]]) {
+ endTuple.put(i,
+ HBaseBinarySerializerDeserializer.deserialize(inputSchema.getColumn(sortKeyIndexes[i]),
+ rowKeyFields[i]));
+ } else {
+ endTuple.put(i,
+ HBaseTextSerializerDeserializer.deserialize(inputSchema.getColumn(sortKeyIndexes[i]),
+ rowKeyFields[i]));
+ }
+ }
+ tupleRanges.add(new TupleRange(sortSpecs, previousTuple, endTuple));
+ previousTuple = endTuple;
+ }
+
+ // Last region endkey is empty. Tajo ignores empty key, so endkey is replaced with max data value.
+ if (comparator.compare(dataRange.getEnd(), tupleRanges.get(tupleRanges.size() - 1).getStart()) >= 0) {
+ tupleRanges.get(tupleRanges.size() - 1).setEnd(dataRange.getEnd());
+ } else {
+ tupleRanges.remove(tupleRanges.size() - 1);
+ }
+ return tupleRanges.toArray(new TupleRange[]{});
+ } finally {
+ htable.close();
+ }
+ } catch (Throwable t) {
+ LOG.error(t.getMessage(), t);
+ throw new IOException(t.getMessage(), t);
+ }
+ }
+
+ public List<RewriteRule> getRewriteRules(OverridableConf queryContext, TableDesc tableDesc) throws IOException {
+ if ("false".equalsIgnoreCase(queryContext.get(HBaseStorageConstants.INSERT_PUT_MODE, "false"))) {
+ List<RewriteRule> rules = new ArrayList<RewriteRule>();
+ rules.add(new AddSortForInsertRewriter(tableDesc, getIndexColumns(tableDesc)));
+ return rules;
+ } else {
+ return null;
+ }
+ }
+
+ private Column[] getIndexColumns(TableDesc tableDesc) throws IOException {
+ List<Column> indexColumns = new ArrayList<Column>();
+
+ ColumnMapping columnMapping = new ColumnMapping(tableDesc.getSchema(), tableDesc.getMeta());
+
+ boolean[] isRowKeys = columnMapping.getIsRowKeyMappings();
+ for (int i = 0; i < isRowKeys.length; i++) {
+ if (isRowKeys[i]) {
+ indexColumns.add(tableDesc.getSchema().getColumn(i));
+ }
+ }
+
+ return indexColumns.toArray(new Column[]{});
+ }
+
+ @Override
+ public StorageProperty getStorageProperty() {
+ StorageProperty storageProperty = new StorageProperty();
+ storageProperty.setSortedInsert(true);
+ storageProperty.setSupportsInsertInto(true);
+ return storageProperty;
+ }
+
+ public void beforeInsertOrCATS(LogicalNode node) throws IOException {
+ if (node.getType() == NodeType.CREATE_TABLE) {
+ CreateTableNode cNode = (CreateTableNode)node;
+ if (!cNode.isExternal()) {
+ TableMeta tableMeta = new TableMeta(cNode.getStorageType(), cNode.getOptions());
+ createTable(tableMeta, cNode.getTableSchema(), cNode.isExternal(), cNode.isIfNotExists());
+ }
+ }
+ }
+
+ @Override
+ public void rollbackOutputCommit(LogicalNode node) throws IOException {
+ if (node.getType() == NodeType.CREATE_TABLE) {
+ CreateTableNode cNode = (CreateTableNode)node;
+ if (cNode.isExternal()) {
+ return;
+ }
+ TableMeta tableMeta = new TableMeta(cNode.getStorageType(), cNode.getOptions());
+ HBaseAdmin hAdmin = new HBaseAdmin(getHBaseConfiguration(conf, tableMeta));
+
+ try {
+ HTableDescriptor hTableDesc = parseHTableDescriptor(tableMeta, cNode.getTableSchema());
+ LOG.info("Delete table cause query failed:" + hTableDesc.getName());
+ hAdmin.disableTable(hTableDesc.getName());
+ hAdmin.deleteTable(hTableDesc.getName());
+ } finally {
+ hAdmin.close();
+ }
+ }
+ }
+
+ @Override
+ public void verifyInsertTableSchema(TableDesc tableDesc, Schema outSchema) throws IOException {
+ if (tableDesc != null) {
+ Schema tableSchema = tableDesc.getSchema();
+ if (tableSchema.size() != outSchema.size()) {
+ throw new IOException("The number of table columns is different from SELECT columns");
+ }
+
+ for (int i = 0; i < tableSchema.size(); i++) {
+ if (!tableSchema.getColumn(i).getDataType().equals(outSchema.getColumn(i).getDataType())) {
+ throw new IOException(outSchema.getColumn(i).getQualifiedName() +
+ "(" + outSchema.getColumn(i).getDataType().getType() + ")" +
+ " is different column type with " + tableSchema.getColumn(i).getSimpleName() +
+ "(" + tableSchema.getColumn(i).getDataType().getType() + ")");
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTextSerializerDeserializer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTextSerializerDeserializer.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTextSerializerDeserializer.java
new file mode 100644
index 0000000..a0ad492
--- /dev/null
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTextSerializerDeserializer.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.hbase;
+
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.util.NumberUtil;
+
+import java.io.IOException;
+
+public class HBaseTextSerializerDeserializer {
+ public static Datum deserialize(Column col, byte[] bytes) throws IOException {
+ Datum datum;
+ switch (col.getDataType().getType()) {
+ case INT1:
+ case INT2:
+ datum = bytes == null || bytes.length == 0 ? NullDatum.get() :
+ DatumFactory.createInt2((short)NumberUtil.parseInt(bytes, 0, bytes.length));
+ break;
+ case INT4:
+ datum = bytes == null || bytes.length == 0 ? NullDatum.get() :
+ DatumFactory.createInt4(NumberUtil.parseInt(bytes, 0, bytes.length));
+ break;
+ case INT8:
+ datum = bytes == null || bytes.length == 0 ? NullDatum.get() :
+ DatumFactory.createInt8(new String(bytes, 0, bytes.length));
+ break;
+ case FLOAT4:
+ datum = bytes == null || bytes.length == 0 ? NullDatum.get() :
+ DatumFactory.createFloat4(new String(bytes, 0, bytes.length));
+ break;
+ case FLOAT8:
+ datum = bytes == null || bytes.length == 0 ? NullDatum.get() :
+ DatumFactory.createFloat8(NumberUtil.parseDouble(bytes, 0, bytes.length));
+ break;
+ case TEXT:
+ datum = bytes == null ? NullDatum.get() : DatumFactory.createText(bytes);
+ break;
+ default:
+ datum = NullDatum.get();
+ break;
+ }
+ return datum;
+ }
+
+ public static byte[] serialize(Column col, Datum datum) throws IOException {
+ if (datum == null || datum instanceof NullDatum) {
+ return null;
+ }
+
+ return datum.asChars().getBytes();
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HFileAppender.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HFileAppender.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HFileAppender.java
new file mode 100644
index 0000000..07f7988
--- /dev/null
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HFileAppender.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.hbase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.TajoConstants;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.util.Bytes;
+
+import java.io.IOException;
+import java.util.TreeSet;
+
+public class HFileAppender extends AbstractHBaseAppender {
+ private static final Log LOG = LogFactory.getLog(HFileAppender.class);
+
+ private RecordWriter<ImmutableBytesWritable, Cell> writer;
+ private TaskAttemptContext writerContext;
+ private Path workingFilePath;
+ private FileOutputCommitter committer;
+
+ public HFileAppender(Configuration conf, QueryUnitAttemptId taskAttemptId,
+ Schema schema, TableMeta meta, Path stagingDir) {
+ super(conf, taskAttemptId, schema, meta, stagingDir);
+ }
+
+ @Override
+ public void init() throws IOException {
+ super.init();
+
+ Configuration taskConf = new Configuration();
+ Path stagingResultDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME);
+ taskConf.set(FileOutputFormat.OUTDIR, stagingResultDir.toString());
+
+ ExecutionBlockId ebId = taskAttemptId.getQueryUnitId().getExecutionBlockId();
+ writerContext = new TaskAttemptContextImpl(taskConf,
+ new TaskAttemptID(ebId.getQueryId().toString(), ebId.getId(), TaskType.MAP,
+ taskAttemptId.getQueryUnitId().getId(), taskAttemptId.getId()));
+
+ HFileOutputFormat2 hFileOutputFormat2 = new HFileOutputFormat2();
+ try {
+ writer = hFileOutputFormat2.getRecordWriter(writerContext);
+
+ committer = new FileOutputCommitter(FileOutputFormat.getOutputPath(writerContext), writerContext);
+ workingFilePath = committer.getWorkPath();
+ } catch (InterruptedException e) {
+ throw new IOException(e.getMessage(), e);
+ }
+
+ LOG.info("Created hbase file writer: " + workingFilePath);
+ }
+
+ long totalNumBytes = 0;
+ ImmutableBytesWritable keyWritable = new ImmutableBytesWritable();
+ boolean first = true;
+ TreeSet<KeyValue> kvSet = new TreeSet<KeyValue>(KeyValue.COMPARATOR);
+
+
+ @Override
+ public void addTuple(Tuple tuple) throws IOException {
+ Datum datum;
+
+ byte[] rowkey = getRowKeyBytes(tuple);
+
+ if (!first && !Bytes.equals(keyWritable.get(), 0, keyWritable.getLength(), rowkey, 0, rowkey.length)) {
+ try {
+ for (KeyValue kv : kvSet) {
+ writer.write(keyWritable, kv);
+ totalNumBytes += keyWritable.getLength() + kv.getLength();
+ }
+ kvSet.clear();
+ // Statistical section
+ if (enabledStats) {
+ stats.incrementRow();
+ }
+ } catch (InterruptedException e) {
+ LOG.error(e.getMessage(), e);
+ }
+ }
+
+ first = false;
+
+ keyWritable.set(rowkey);
+
+ readKeyValues(tuple, rowkey);
+ if (keyValues != null) {
+ for (KeyValue eachKeyVal: keyValues) {
+ kvSet.add(eachKeyVal);
+ }
+ }
+ }
+
+ @Override
+ public void flush() throws IOException {
+ }
+
+ @Override
+ public long getEstimatedOutputSize() throws IOException {
+ // StoreTableExec uses this value as rolling file length
+ // Not rolling
+ return 0;
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (!kvSet.isEmpty()) {
+ try {
+ for (KeyValue kv : kvSet) {
+ writer.write(keyWritable, kv);
+ totalNumBytes += keyWritable.getLength() + keyWritable.getLength();
+ }
+ kvSet.clear();
+ // Statistical section
+ if (enabledStats) {
+ stats.incrementRow();
+ }
+ } catch (InterruptedException e) {
+ LOG.error(e.getMessage(), e);
+ }
+ }
+
+ if (enabledStats) {
+ stats.setNumBytes(totalNumBytes);
+ }
+ if (writer != null) {
+ try {
+ writer.close(writerContext);
+ committer.commitTask(writerContext);
+ } catch (InterruptedException e) {
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/IndexPredication.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/IndexPredication.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/IndexPredication.java
new file mode 100644
index 0000000..3a58e50
--- /dev/null
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/IndexPredication.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.hbase;
+
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.datum.Datum;
+
+public class IndexPredication {
+ private Column column;
+ private int columnId;
+ private Datum startValue;
+ private Datum stopValue;
+
+ public Column getColumn() {
+ return column;
+ }
+
+ public void setColumn(Column column) {
+ this.column = column;
+ }
+
+ public int getColumnId() {
+ return columnId;
+ }
+
+ public void setColumnId(int columnId) {
+ this.columnId = columnId;
+ }
+
+ public Datum getStartValue() {
+ return startValue;
+ }
+
+ public void setStartValue(Datum startValue) {
+ this.startValue = startValue;
+ }
+
+ public Datum getStopValue() {
+ return stopValue;
+ }
+
+ public void setStopValue(Datum stopValue) {
+ this.stopValue = stopValue;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/RowKeyMapping.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/RowKeyMapping.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/RowKeyMapping.java
new file mode 100644
index 0000000..4577703
--- /dev/null
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/RowKeyMapping.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.hbase;
+
+public class RowKeyMapping {
+ private boolean isBinary;
+ private int keyFieldIndex;
+
+ public boolean isBinary() {
+ return isBinary;
+ }
+
+ public void setBinary(boolean isBinary) {
+ this.isBinary = isBinary;
+ }
+
+ public int getKeyFieldIndex() {
+ return keyFieldIndex;
+ }
+
+ public void setKeyFieldIndex(int keyFieldIndex) {
+ this.keyFieldIndex = keyFieldIndex;
+ }
+}