You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2015/06/24 00:49:15 UTC
[2/7] tajo git commit: TAJO-1616: Implement TablespaceManager to load
Tablespaces.
http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java
index 845c2d7..5fac0cf 100644
--- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java
@@ -18,7 +18,6 @@
package org.apache.tajo.storage.hbase;
-import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -44,12 +43,12 @@ import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.datum.Datum;
import org.apache.tajo.datum.TextDatum;
import org.apache.tajo.plan.LogicalPlan;
+import org.apache.tajo.plan.PlanningException;
import org.apache.tajo.plan.expr.*;
import org.apache.tajo.plan.logical.CreateTableNode;
import org.apache.tajo.plan.logical.LogicalNode;
import org.apache.tajo.plan.logical.NodeType;
import org.apache.tajo.plan.logical.ScanNode;
-import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule;
import org.apache.tajo.storage.*;
import org.apache.tajo.storage.fragment.Fragment;
import org.apache.tajo.util.Bytes;
@@ -60,6 +59,7 @@ import org.apache.tajo.util.TUtil;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
+import java.net.URI;
import java.util.*;
/**
@@ -68,14 +68,44 @@ import java.util.*;
public class HBaseTablespace extends Tablespace {
private static final Log LOG = LogFactory.getLog(HBaseTablespace.class);
+ public static final StorageProperty HBASE_STORAGE_PROPERTIES = new StorageProperty(false, true, true, false);
+
+ public static final FormatProperty HFILE_FORMAT_PROPERTIES = new FormatProperty(true);
+
+ private Configuration hbaseConf;
+
+ private final static SortedInsertRewriter REWRITE_RULE = new SortedInsertRewriter();
+
private Map<HConnectionKey, HConnection> connMap = new HashMap<HConnectionKey, HConnection>();
- public HBaseTablespace(String storeType) {
- super(storeType);
+ public HBaseTablespace(String spaceName, URI uri) {
+ super(spaceName, uri);
}
@Override
public void storageInit() throws IOException {
+ this.hbaseConf = HBaseConfiguration.create(conf);
+ String zkQuorum = extractQuorum(uri);
+ String [] splits = zkQuorum.split(":");
+ hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, splits[0]);
+ hbaseConf.set(HConstants.ZOOKEEPER_CLIENT_PORT, splits[1]);
+ }
+
+ @Override
+ public void setConfig(String name, String value) {
+ }
+
+ @Override
+ public void setConfigs(Map<String, String> configs) {
+ }
+
+ public Configuration getHbaseConf() {
+ return hbaseConf;
+ }
+
+ @Override
+ public long getTableVolume(URI uri) throws IOException {
+ return 0;
}
@Override
@@ -93,13 +123,13 @@ public class HBaseTablespace extends Tablespace {
@Override
public void createTable(TableDesc tableDesc, boolean ifNotExists) throws IOException {
- createTable(tableDesc.getMeta(), tableDesc.getSchema(), tableDesc.isExternal(), ifNotExists);
+ createTable(tableDesc.getUri(), tableDesc.getMeta(), tableDesc.getSchema(), tableDesc.isExternal(), ifNotExists);
TableStats stats = new TableStats();
stats.setNumRows(TajoConstants.UNKNOWN_ROW_NUMBER);
tableDesc.setStats(stats);
}
- private void createTable(TableMeta tableMeta, Schema schema,
+ private void createTable(URI uri, TableMeta tableMeta, Schema schema,
boolean isExternal, boolean ifNotExists) throws IOException {
String hbaseTableName = tableMeta.getOption(HBaseStorageConstants.META_TABLE_KEY, "");
if (hbaseTableName == null || hbaseTableName.trim().isEmpty()) {
@@ -113,7 +143,7 @@ public class HBaseTablespace extends Tablespace {
throw new IOException("Columns property has more entry than Tajo table columns");
}
- ColumnMapping columnMapping = new ColumnMapping(schema, tableMeta);
+ ColumnMapping columnMapping = new ColumnMapping(schema, tableMeta.getOptions());
int numRowKeys = 0;
boolean[] isRowKeyMappings = columnMapping.getIsRowKeyMappings();
for (int i = 0; i < isRowKeyMappings.length; i++) {
@@ -138,8 +168,7 @@ public class HBaseTablespace extends Tablespace {
}
}
- Configuration hConf = getHBaseConfiguration(conf, tableMeta);
- HBaseAdmin hAdmin = new HBaseAdmin(hConf);
+ HBaseAdmin hAdmin = new HBaseAdmin(getHbaseConf());
try {
if (isExternal) {
@@ -210,7 +239,7 @@ public class HBaseTablespace extends Tablespace {
return null;
}
- ColumnMapping columnMapping = new ColumnMapping(schema, meta);
+ ColumnMapping columnMapping = new ColumnMapping(schema, meta.getOptions());
boolean[] isBinaryColumns = columnMapping.getIsBinaryColumns();
boolean[] isRowKeys = columnMapping.getIsRowKeyMappings();
@@ -291,45 +320,23 @@ public class HBaseTablespace extends Tablespace {
}
/**
- * Creates Configuration instance and sets with hbase connection options.
+ * It extracts quorum addresses from a Hbase Tablespace URI.
+ * For example, consider an example URI 'hbase:zk://host1:2171,host2:2172,host3:2173/table1'.
+ * <code>extractQuorum</code> will extract only 'host1:2171,host2:2172,host3:2173'.
*
- * @param conf
- * @param tableMeta
- * @return
- * @throws java.io.IOException
+ * @param uri Hbase Tablespace URI
+ * @return Quorum addresses
*/
- public static Configuration getHBaseConfiguration(Configuration conf, TableMeta tableMeta) throws IOException {
- Configuration hbaseConf = (conf == null) ? HBaseConfiguration.create() : HBaseConfiguration.create(conf);
-
- String zkQuorum = hbaseConf.get(HConstants.ZOOKEEPER_QUORUM);
- if (tableMeta.containsOption(HBaseStorageConstants.META_ZK_QUORUM_KEY)) {
- zkQuorum = tableMeta.getOption(HBaseStorageConstants.META_ZK_QUORUM_KEY, "");
- hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, zkQuorum);
- }
-
- if (zkQuorum == null || zkQuorum.trim().isEmpty()) {
- throw new IOException("HBase mapped table is required a '" +
- HBaseStorageConstants.META_ZK_QUORUM_KEY + "' attribute.");
- }
+ static String extractQuorum(URI uri) {
+ String uriStr = uri.toString();
+ int start = uriStr.indexOf("/") + 2;
+ int pathIndex = uriStr.lastIndexOf("/");
- String zkPort = hbaseConf.get(HConstants.ZOOKEEPER_CLIENT_PORT);
- if (tableMeta.containsOption(HBaseStorageConstants.META_ZK_CLIENT_PORT)) {
- zkPort = tableMeta.getOption(HBaseStorageConstants.META_ZK_CLIENT_PORT, "");
- hbaseConf.set(HConstants.ZOOKEEPER_CLIENT_PORT, zkPort);
- }
-
- if (zkPort == null || zkPort.trim().isEmpty()) {
- throw new IOException("HBase mapped table is required a '" +
- HBaseStorageConstants.META_ZK_CLIENT_PORT + "' attribute.");
- }
-
- for (Map.Entry<String, String> eachOption: tableMeta.getOptions().getAllKeyValus().entrySet()) {
- String key = eachOption.getKey();
- if (key.startsWith(HConstants.ZK_CFG_PROPERTY_PREFIX)) {
- hbaseConf.set(key, eachOption.getValue());
- }
+ if (pathIndex < start) {
+ return uriStr.substring(start);
+ } else {
+ return uriStr.substring(start, pathIndex);
}
- return hbaseConf;
}
/**
@@ -348,7 +355,7 @@ public class HBaseTablespace extends Tablespace {
}
TableName hTableName = TableName.valueOf(hbaseTableName);
- ColumnMapping columnMapping = new ColumnMapping(schema, tableMeta);
+ ColumnMapping columnMapping = new ColumnMapping(schema, tableMeta.getOptions());
HTableDescriptor hTableDescriptor = new HTableDescriptor(hTableName);
@@ -369,7 +376,7 @@ public class HBaseTablespace extends Tablespace {
@Override
public void purgeTable(TableDesc tableDesc) throws IOException {
- HBaseAdmin hAdmin = new HBaseAdmin(getHBaseConfiguration(conf, tableDesc.getMeta()));
+ HBaseAdmin hAdmin = new HBaseAdmin(hbaseConf);
try {
HTableDescriptor hTableDesc = parseHTableDescriptor(tableDesc.getMeta(), tableDesc.getSchema());
@@ -381,6 +388,11 @@ public class HBaseTablespace extends Tablespace {
}
}
+ @Override
+ public URI getTableUri(String databaseName, String tableName) {
+ return URI.create(uri.toString() + "/" + tableName);
+ }
+
/**
* Returns columns which are mapped to the rowkey of the hbase table.
*
@@ -389,7 +401,7 @@ public class HBaseTablespace extends Tablespace {
* @throws java.io.IOException
*/
private Column[] getIndexableColumns(TableDesc tableDesc) throws IOException {
- ColumnMapping columnMapping = new ColumnMapping(tableDesc.getSchema(), tableDesc.getMeta());
+ ColumnMapping columnMapping = new ColumnMapping(tableDesc.getSchema(), tableDesc.getMeta().getOptions());
boolean[] isRowKeyMappings = columnMapping.getIsRowKeyMappings();
int[] rowKeyIndexes = columnMapping.getRowKeyFieldIndexes();
@@ -407,15 +419,14 @@ public class HBaseTablespace extends Tablespace {
@Override
public List<Fragment> getSplits(String fragmentId, TableDesc tableDesc, ScanNode scanNode) throws IOException {
- ColumnMapping columnMapping = new ColumnMapping(tableDesc.getSchema(), tableDesc.getMeta());
+ ColumnMapping columnMapping = new ColumnMapping(tableDesc.getSchema(), tableDesc.getMeta().getOptions());
List<IndexPredication> indexPredications = getIndexPredications(columnMapping, tableDesc, scanNode);
- Configuration hconf = getHBaseConfiguration(conf, tableDesc.getMeta());
HTable htable = null;
HBaseAdmin hAdmin = null;
try {
- htable = new HTable(hconf, tableDesc.getMeta().getOption(HBaseStorageConstants.META_TABLE_KEY));
+ htable = new HTable(hbaseConf, tableDesc.getMeta().getOption(HBaseStorageConstants.META_TABLE_KEY));
org.apache.hadoop.hbase.util.Pair<byte[][], byte[][]> keys = htable.getStartEndKeys();
if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) {
@@ -424,8 +435,12 @@ public class HBaseTablespace extends Tablespace {
throw new IOException("Expecting at least one region.");
}
List<Fragment> fragments = new ArrayList<Fragment>(1);
- Fragment fragment = new HBaseFragment(fragmentId, htable.getName().getNameAsString(),
- HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, regLoc.getHostname());
+ Fragment fragment = new HBaseFragment(
+ tableDesc.getUri(),
+ fragmentId, htable.getName().getNameAsString(),
+ HConstants.EMPTY_BYTE_ARRAY,
+ HConstants.EMPTY_BYTE_ARRAY,
+ regLoc.getHostname());
fragments.add(fragment);
return fragments;
}
@@ -458,7 +473,7 @@ public class HBaseTablespace extends Tablespace {
stopRows = TUtil.newList(HConstants.EMPTY_END_ROW);
}
- hAdmin = new HBaseAdmin(hconf);
+ hAdmin = new HBaseAdmin(hbaseConf);
Map<ServerName, ServerLoad> serverLoadMap = new HashMap<ServerName, ServerLoad>();
// region startkey -> HBaseFragment
@@ -499,8 +514,12 @@ public class HBaseTablespace extends Tablespace {
prevFragment.setStopRow(fragmentStop);
}
} else {
- HBaseFragment fragment = new HBaseFragment(fragmentId, htable.getName().getNameAsString(),
- fragmentStart, fragmentStop, location.getHostname());
+ HBaseFragment fragment = new HBaseFragment(tableDesc.getUri(),
+ fragmentId,
+ htable.getName().getNameAsString(),
+ fragmentStart,
+ fragmentStop,
+ location.getHostname());
// get region size
boolean foundLength = false;
@@ -557,7 +576,7 @@ public class HBaseTablespace extends Tablespace {
TaskAttemptId taskAttemptId, TableMeta meta, Schema schema, Path workDir)
throws IOException {
if ("true".equalsIgnoreCase(queryContext.get(HBaseStorageConstants.INSERT_PUT_MODE, "false"))) {
- return new HBasePutAppender(conf, taskAttemptId, schema, meta, workDir);
+ return new HBasePutAppender(conf, uri, taskAttemptId, schema, meta, workDir);
} else {
return super.getAppender(queryContext, taskAttemptId, meta, schema, workDir);
}
@@ -566,17 +585,16 @@ public class HBaseTablespace extends Tablespace {
@Override
public List<Fragment> getNonForwardSplit(TableDesc tableDesc, int currentPage, int numFragments)
throws IOException {
- Configuration hconf = getHBaseConfiguration(conf, tableDesc.getMeta());
HTable htable = null;
HBaseAdmin hAdmin = null;
try {
- htable = new HTable(hconf, tableDesc.getMeta().getOption(HBaseStorageConstants.META_TABLE_KEY));
+ htable = new HTable(hbaseConf, tableDesc.getMeta().getOption(HBaseStorageConstants.META_TABLE_KEY));
org.apache.hadoop.hbase.util.Pair<byte[][], byte[][]> keys = htable.getStartEndKeys();
if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) {
return new ArrayList<Fragment>(1);
}
- hAdmin = new HBaseAdmin(hconf);
+ hAdmin = new HBaseAdmin(hbaseConf);
Map<ServerName, ServerLoad> serverLoadMap = new HashMap<ServerName, ServerLoad>();
List<Fragment> fragments = new ArrayList<Fragment>(keys.getFirst().length);
@@ -599,8 +617,13 @@ public class HBaseTablespace extends Tablespace {
serverLoadMap.put(location.getServerName(), serverLoad);
}
- HBaseFragment fragment = new HBaseFragment(tableDesc.getName(), htable.getName().getNameAsString(),
- location.getRegionInfo().getStartKey(), location.getRegionInfo().getEndKey(), location.getHostname());
+ HBaseFragment fragment = new HBaseFragment(
+ tableDesc.getUri(),
+ tableDesc.getName(),
+ htable.getName().getNameAsString(),
+ location.getRegionInfo().getStartKey(),
+ location.getRegionInfo().getEndKey(),
+ location.getHostname());
// get region size
boolean foundLength = false;
@@ -642,7 +665,7 @@ public class HBaseTablespace extends Tablespace {
}
}
- public HConnection getConnection(Configuration hbaseConf) throws IOException {
+ public HConnection getConnection() throws IOException {
synchronized(connMap) {
HConnectionKey key = new HConnectionKey(hbaseConf);
HConnection conn = connMap.get(key);
@@ -937,18 +960,17 @@ public class HBaseTablespace extends Tablespace {
}
@Override
- public Path commitOutputData(OverridableConf queryContext, ExecutionBlockId finalEbId,
- LogicalPlan plan, Schema schema,
- TableDesc tableDesc) throws IOException {
+ public Path commitTable(OverridableConf queryContext, ExecutionBlockId finalEbId,
+ LogicalPlan plan, Schema schema,
+ TableDesc tableDesc) throws IOException {
if (tableDesc == null) {
throw new IOException("TableDesc is null while calling loadIncrementalHFiles: " + finalEbId);
}
- Preconditions.checkArgument(tableDesc.getName() != null && tableDesc.getPath() == null);
Path stagingDir = new Path(queryContext.get(QueryVars.STAGING_DIR));
Path stagingResultDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME);
- Configuration hbaseConf = HBaseTablespace.getHBaseConfiguration(queryContext.getConf(), tableDesc.getMeta());
+ Configuration hbaseConf = HBaseConfiguration.create(this.hbaseConf);
hbaseConf.set("hbase.loadincremental.threads.max", "2");
JobContextImpl jobContext = new JobContextImpl(hbaseConf,
@@ -993,8 +1015,7 @@ public class HBaseTablespace extends Tablespace {
sortKeyIndexes[i] = inputSchema.getColumnId(sortSpecs[i].getSortKey().getQualifiedName());
}
- ColumnMapping columnMapping = new ColumnMapping(tableDesc.getSchema(), tableDesc.getMeta());
- Configuration hbaseConf = HBaseTablespace.getHBaseConfiguration(queryContext.getConf(), tableDesc.getMeta());
+ ColumnMapping columnMapping = new ColumnMapping(tableDesc.getSchema(), tableDesc.getMeta().getOptions());
HTable htable = new HTable(hbaseConf, columnMapping.getHbaseTableName());
try {
@@ -1062,59 +1083,45 @@ public class HBaseTablespace extends Tablespace {
}
}
- public List<LogicalPlanRewriteRule> getRewriteRules(OverridableConf queryContext, TableDesc tableDesc) throws IOException {
- if ("false".equalsIgnoreCase(queryContext.get(HBaseStorageConstants.INSERT_PUT_MODE, "false"))) {
- List<LogicalPlanRewriteRule> rules = new ArrayList<LogicalPlanRewriteRule>();
- rules.add(new AddSortForInsertRewriter(tableDesc, getIndexColumns(tableDesc)));
- return rules;
- } else {
- return null;
+ @Override
+ public void rewritePlan(OverridableConf context, LogicalPlan plan) throws PlanningException {
+ if (REWRITE_RULE.isEligible(context, plan)) {
+ REWRITE_RULE.rewrite(context, plan);
}
}
- private Column[] getIndexColumns(TableDesc tableDesc) throws IOException {
- List<Column> indexColumns = new ArrayList<Column>();
-
- ColumnMapping columnMapping = new ColumnMapping(tableDesc.getSchema(), tableDesc.getMeta());
-
- boolean[] isRowKeys = columnMapping.getIsRowKeyMappings();
- for (int i = 0; i < isRowKeys.length; i++) {
- if (isRowKeys[i]) {
- indexColumns.add(tableDesc.getSchema().getColumn(i));
- }
- }
-
- return indexColumns.toArray(new Column[]{});
+ @Override
+ public StorageProperty getProperty() {
+ return HBASE_STORAGE_PROPERTIES;
}
@Override
- public StorageProperty getStorageProperty() {
- StorageProperty storageProperty = new StorageProperty();
- storageProperty.setSortedInsert(true);
- storageProperty.setSupportsInsertInto(true);
- return storageProperty;
+ public FormatProperty getFormatProperty(String format) {
+ return HFILE_FORMAT_PROPERTIES;
}
- public void beforeInsertOrCATS(LogicalNode node) throws IOException {
+ public void prepareTable(LogicalNode node) throws IOException {
if (node.getType() == NodeType.CREATE_TABLE) {
CreateTableNode cNode = (CreateTableNode)node;
if (!cNode.isExternal()) {
TableMeta tableMeta = new TableMeta(cNode.getStorageType(), cNode.getOptions());
- createTable(tableMeta, cNode.getTableSchema(), cNode.isExternal(), cNode.isIfNotExists());
+ createTable(
+ ((CreateTableNode) node).getUri(), tableMeta, cNode.getTableSchema(),
+ cNode.isExternal(), cNode.isIfNotExists());
}
}
}
@Override
- public void rollbackOutputCommit(LogicalNode node) throws IOException {
+ public void rollbackTable(LogicalNode node) throws IOException {
if (node.getType() == NodeType.CREATE_TABLE) {
CreateTableNode cNode = (CreateTableNode)node;
if (cNode.isExternal()) {
return;
}
- TableMeta tableMeta = new TableMeta(cNode.getStorageType(), cNode.getOptions());
- HBaseAdmin hAdmin = new HBaseAdmin(getHBaseConfiguration(conf, tableMeta));
+ HBaseAdmin hAdmin = new HBaseAdmin(this.hbaseConf);
+ TableMeta tableMeta = new TableMeta(cNode.getStorageType(), cNode.getOptions());
try {
HTableDescriptor hTableDesc = parseHTableDescriptor(tableMeta, cNode.getTableSchema());
LOG.info("Delete table cause query failed:" + new String(hTableDesc.getName()));
@@ -1127,7 +1134,7 @@ public class HBaseTablespace extends Tablespace {
}
@Override
- public void verifyInsertTableSchema(TableDesc tableDesc, Schema outSchema) throws IOException {
+ public void verifySchemaToWrite(TableDesc tableDesc, Schema outSchema) throws IOException {
if (tableDesc != null) {
Schema tableSchema = tableDesc.getSchema();
if (tableSchema.size() != outSchema.size()) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/SortedInsertRewriter.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/SortedInsertRewriter.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/SortedInsertRewriter.java
new file mode 100644
index 0000000..ebf557e
--- /dev/null
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/SortedInsertRewriter.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.hbase;
+
+import org.apache.tajo.OverridableConf;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.SortSpec;
+import org.apache.tajo.plan.LogicalPlan;
+import org.apache.tajo.plan.PlanningException;
+import org.apache.tajo.plan.logical.*;
+import org.apache.tajo.plan.logical.SortNode.SortPurpose;
+import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule;
+import org.apache.tajo.util.KeyValueSet;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * This rewrite rule injects a sort operation to preserve the writing rows in
+ * an ascending order of HBase row keys, required by HFile.
+ */
+public class SortedInsertRewriter implements LogicalPlanRewriteRule {
+
+ @Override
+ public String getName() {
+ return "SortedInsertRewriter";
+ }
+
+ @Override
+ public boolean isEligible(OverridableConf queryContext, LogicalPlan plan) {
+ boolean hbaseMode = "false".equalsIgnoreCase(queryContext.get(HBaseStorageConstants.INSERT_PUT_MODE, "false"));
+ LogicalRootNode rootNode = plan.getRootBlock().getRoot();
+ LogicalNode node = rootNode.getChild();
+ return hbaseMode && node.getType() == NodeType.CREATE_TABLE || node.getType() == NodeType.INSERT;
+ }
+
+ public static Column[] getIndexColumns(Schema tableSchema, KeyValueSet tableProperty) throws IOException {
+ List<Column> indexColumns = new ArrayList<Column>();
+
+ ColumnMapping columnMapping = new ColumnMapping(tableSchema, tableProperty);
+
+ boolean[] isRowKeys = columnMapping.getIsRowKeyMappings();
+ for (int i = 0; i < isRowKeys.length; i++) {
+ if (isRowKeys[i]) {
+ indexColumns.add(tableSchema.getColumn(i));
+ }
+ }
+
+ return indexColumns.toArray(new Column[]{});
+ }
+
+ @Override
+ public LogicalPlan rewrite(OverridableConf queryContext, LogicalPlan plan) throws PlanningException {
+ LogicalRootNode rootNode = plan.getRootBlock().getRoot();
+
+ StoreTableNode storeTable = rootNode.getChild();
+ Schema tableSchema = storeTable.getTableSchema();
+
+ Column[] sortColumns;
+ try {
+ sortColumns = getIndexColumns(tableSchema, storeTable.getOptions());
+ } catch (IOException e) {
+ throw new PlanningException(e);
+ }
+
+ int[] sortColumnIndexes = new int[sortColumns.length];
+ for (int i = 0; i < sortColumns.length; i++) {
+ sortColumnIndexes[i] = tableSchema.getColumnId(sortColumns[i].getQualifiedName());
+ }
+
+ UnaryNode insertNode = rootNode.getChild();
+ LogicalNode childNode = insertNode.getChild();
+
+ Schema sortSchema = childNode.getOutSchema();
+ SortNode sortNode = plan.createNode(SortNode.class);
+ sortNode.setSortPurpose(SortPurpose.STORAGE_SPECIFIED);
+ sortNode.setInSchema(sortSchema);
+ sortNode.setOutSchema(sortSchema);
+
+ SortSpec[] sortSpecs = new SortSpec[sortColumns.length];
+ int index = 0;
+
+ for (int i = 0; i < sortColumnIndexes.length; i++) {
+ Column sortColumn = sortSchema.getColumn(sortColumnIndexes[i]);
+ if (sortColumn == null) {
+ throw new PlanningException("Can't fine proper sort column:" + sortColumns[i]);
+ }
+ sortSpecs[index++] = new SortSpec(sortColumn, true, true);
+ }
+ sortNode.setSortSpecs(sortSpecs);
+
+ sortNode.setChild(insertNode.getChild());
+ insertNode.setChild(sortNode);
+ plan.getRootBlock().registerNode(sortNode);
+
+ return plan;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-storage/tajo-storage-hbase/src/main/proto/StorageFragmentProtos.proto
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/main/proto/StorageFragmentProtos.proto b/tajo-storage/tajo-storage-hbase/src/main/proto/StorageFragmentProtos.proto
index 668b116..33d45b3 100644
--- a/tajo-storage/tajo-storage-hbase/src/main/proto/StorageFragmentProtos.proto
+++ b/tajo-storage/tajo-storage-hbase/src/main/proto/StorageFragmentProtos.proto
@@ -25,11 +25,12 @@ option java_generate_equals_and_hash = true;
import "CatalogProtos.proto";
message HBaseFragmentProto {
- required string tableName = 1;
- required string hbaseTableName = 2;
- required bytes startRow = 3;
- required bytes stopRow = 4;
- required bool last = 5;
- required int64 length = 6;
- optional string regionLocation = 7;
+ required string uri = 1;
+ required string tableName = 2;
+ required string hbaseTableName = 3;
+ required bytes startRow = 4;
+ required bytes stopRow = 5;
+ required bool last = 6;
+ required int64 length = 7;
+ optional string regionLocation = 8;
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-storage/tajo-storage-hbase/src/test/java/org/apache/tajo/storage/hbase/TestColumnMapping.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/test/java/org/apache/tajo/storage/hbase/TestColumnMapping.java b/tajo-storage/tajo-storage-hbase/src/test/java/org/apache/tajo/storage/hbase/TestColumnMapping.java
index dd52324..5b1e2bd 100644
--- a/tajo-storage/tajo-storage-hbase/src/test/java/org/apache/tajo/storage/hbase/TestColumnMapping.java
+++ b/tajo-storage/tajo-storage-hbase/src/test/java/org/apache/tajo/storage/hbase/TestColumnMapping.java
@@ -43,7 +43,7 @@ public class TestColumnMapping {
TableMeta tableMeta = new TableMeta("HBASE", keyValueSet);
- ColumnMapping columnMapping = new ColumnMapping(schema, tableMeta);
+ ColumnMapping columnMapping = new ColumnMapping(schema, tableMeta.getOptions());
List<String> cfNames = columnMapping.getColumnFamilyNames();
assertEquals(2, cfNames.size());
http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-storage/tajo-storage-hbase/src/test/java/org/apache/tajo/storage/hbase/TestHBaseStorageManager.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/test/java/org/apache/tajo/storage/hbase/TestHBaseStorageManager.java b/tajo-storage/tajo-storage-hbase/src/test/java/org/apache/tajo/storage/hbase/TestHBaseStorageManager.java
deleted file mode 100644
index b59fe7b..0000000
--- a/tajo-storage/tajo-storage-hbase/src/test/java/org/apache/tajo/storage/hbase/TestHBaseStorageManager.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.hbase;
-
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.common.TajoDataTypes.Type;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.datum.Datum;
-import org.apache.tajo.datum.TextDatum;
-import org.apache.tajo.plan.expr.*;
-import org.apache.tajo.plan.logical.ScanNode;
-import org.apache.tajo.storage.TableSpaceManager;
-import org.apache.tajo.util.Pair;
-import org.junit.Test;
-
-import java.util.List;
-import java.util.Set;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-
-public class TestHBaseStorageManager {
- @Test
- public void testGetIndexPredications() throws Exception {
- Column rowkeyColumn = new Column("rk", Type.TEXT);
- // where rk >= '020' and rk <= '055'
- ScanNode scanNode = new ScanNode(1);
- EvalNode evalNode1 = new BinaryEval(EvalType.GEQ, new FieldEval(rowkeyColumn), new ConstEval(new TextDatum("020")));
- EvalNode evalNode2 = new BinaryEval(EvalType.LEQ, new FieldEval(rowkeyColumn), new ConstEval(new TextDatum("055")));
- EvalNode evalNodeA = new BinaryEval(EvalType.AND, evalNode1, evalNode2);
- scanNode.setQual(evalNodeA);
-
- HBaseTablespace storageManager =
- (HBaseTablespace) TableSpaceManager.getStorageManager(new TajoConf(), "HBASE");
- List<Set<EvalNode>> indexEvals = storageManager.findIndexablePredicateSet(scanNode, new Column[]{rowkeyColumn});
- assertNotNull(indexEvals);
- assertEquals(1, indexEvals.size());
- Pair<Datum, Datum> indexPredicateValue = storageManager.getIndexablePredicateValue(null, indexEvals.get(0));
- assertEquals("020", indexPredicateValue.getFirst().asChars());
- assertEquals("055", indexPredicateValue.getSecond().asChars());
-
- // where (rk >= '020' and rk <= '055') or rk = '075'
- EvalNode evalNode3 = new BinaryEval(EvalType.EQUAL, new FieldEval(rowkeyColumn),new ConstEval(new TextDatum("075")));
- EvalNode evalNodeB = new BinaryEval(EvalType.OR, evalNodeA, evalNode3);
- scanNode.setQual(evalNodeB);
- indexEvals = storageManager.findIndexablePredicateSet(scanNode, new Column[]{rowkeyColumn});
- assertEquals(2, indexEvals.size());
- indexPredicateValue = storageManager.getIndexablePredicateValue(null, indexEvals.get(0));
- assertEquals("020", indexPredicateValue.getFirst().asChars());
- assertEquals("055", indexPredicateValue.getSecond().asChars());
-
- indexPredicateValue = storageManager.getIndexablePredicateValue(null, indexEvals.get(1));
- assertEquals("075", indexPredicateValue.getFirst().asChars());
- assertEquals("075", indexPredicateValue.getSecond().asChars());
-
- // where (rk >= '020' and rk <= '055') or (rk >= '072' and rk <= '078')
- EvalNode evalNode4 = new BinaryEval(EvalType.GEQ, new FieldEval(rowkeyColumn), new ConstEval(new TextDatum("072")));
- EvalNode evalNode5 = new BinaryEval(EvalType.LEQ, new FieldEval(rowkeyColumn), new ConstEval(new TextDatum("078")));
- EvalNode evalNodeC = new BinaryEval(EvalType.AND, evalNode4, evalNode5);
- EvalNode evalNodeD = new BinaryEval(EvalType.OR, evalNodeA, evalNodeC);
- scanNode.setQual(evalNodeD);
- indexEvals = storageManager.findIndexablePredicateSet(scanNode, new Column[]{rowkeyColumn});
- assertEquals(2, indexEvals.size());
-
- indexPredicateValue = storageManager.getIndexablePredicateValue(null, indexEvals.get(0));
- assertEquals("020", indexPredicateValue.getFirst().asChars());
- assertEquals("055", indexPredicateValue.getSecond().asChars());
-
- indexPredicateValue = storageManager.getIndexablePredicateValue(null, indexEvals.get(1));
- assertEquals("072", indexPredicateValue.getFirst().asChars());
- assertEquals("078", indexPredicateValue.getSecond().asChars());
-
- // where (rk >= '020' and rk <= '055') or (rk >= '072' and rk <= '078' and rk >= '073')
- evalNode4 = new BinaryEval(EvalType.GEQ, new FieldEval(rowkeyColumn), new ConstEval(new TextDatum("072")));
- evalNode5 = new BinaryEval(EvalType.LEQ, new FieldEval(rowkeyColumn), new ConstEval(new TextDatum("078")));
- evalNodeC = new BinaryEval(EvalType.AND, evalNode4, evalNode5);
- EvalNode evalNode6 = new BinaryEval(EvalType.GEQ, new FieldEval(rowkeyColumn), new ConstEval(new TextDatum("073")));
- evalNodeD = new BinaryEval(EvalType.AND, evalNodeC, evalNode6);
- EvalNode evalNodeE = new BinaryEval(EvalType.OR, evalNodeA, evalNodeD);
- scanNode.setQual(evalNodeE);
- indexEvals = storageManager.findIndexablePredicateSet(scanNode, new Column[]{rowkeyColumn});
- assertEquals(2, indexEvals.size());
-
- indexPredicateValue = storageManager.getIndexablePredicateValue(null, indexEvals.get(0));
- assertEquals("020", indexPredicateValue.getFirst().asChars());
- assertEquals("055", indexPredicateValue.getSecond().asChars());
-
- indexPredicateValue = storageManager.getIndexablePredicateValue(null, indexEvals.get(1));
- assertEquals("073", indexPredicateValue.getFirst().asChars());
- assertEquals("078", indexPredicateValue.getSecond().asChars());
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-storage/tajo-storage-hbase/src/test/java/org/apache/tajo/storage/hbase/TestHBaseTableSpace.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/test/java/org/apache/tajo/storage/hbase/TestHBaseTableSpace.java b/tajo-storage/tajo-storage-hbase/src/test/java/org/apache/tajo/storage/hbase/TestHBaseTableSpace.java
new file mode 100644
index 0000000..f7cbb5a
--- /dev/null
+++ b/tajo-storage/tajo-storage-hbase/src/test/java/org/apache/tajo/storage/hbase/TestHBaseTableSpace.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.hbase;
+
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.TextDatum;
+import org.apache.tajo.plan.expr.*;
+import org.apache.tajo.plan.logical.ScanNode;
+import org.apache.tajo.storage.TableSpaceManager;
+import org.apache.tajo.util.Pair;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.List;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+public class TestHBaseTableSpace {
+ @BeforeClass
+ public static void setUp() throws IOException {
+ String tableSpaceUri = "hbase:zk://host1:2171";
+ HBaseTablespace hBaseTablespace = new HBaseTablespace("cluster1", URI.create(tableSpaceUri));
+ hBaseTablespace.init(new TajoConf());
+ TableSpaceManager.addTableSpaceForTest(hBaseTablespace);
+ }
+
+ @Test
+ public void testExtractQuorum() {
+ assertEquals("host1:2171", HBaseTablespace.extractQuorum(URI.create("hbase:zk://host1:2171")));
+ assertEquals("host1:2171", HBaseTablespace.extractQuorum(URI.create("hbase:zk://host1:2171/table1")));
+ assertEquals("host1:2171,host2:2172",
+ HBaseTablespace.extractQuorum(URI.create("hbase:zk://host1:2171,host2:2172/table1")));
+ }
+
+ @Test
+ public void testTablespaceHandler() throws Exception {
+ assertTrue((TableSpaceManager.getByName("cluster1").get()) instanceof HBaseTablespace);
+ assertTrue((TableSpaceManager.get(URI.create("hbase:zk://host1:2171")).get())
+ instanceof HBaseTablespace);
+ }
+
+ @Test
+ public void testGetIndexPredications() throws Exception {
+ Column rowkeyColumn = new Column("rk", Type.TEXT);
+ // where rk >= '020' and rk <= '055'
+ ScanNode scanNode = new ScanNode(1);
+ EvalNode evalNode1 = new BinaryEval(EvalType.GEQ, new FieldEval(rowkeyColumn), new ConstEval(new TextDatum("020")));
+ EvalNode evalNode2 = new BinaryEval(EvalType.LEQ, new FieldEval(rowkeyColumn), new ConstEval(new TextDatum("055")));
+ EvalNode evalNodeA = new BinaryEval(EvalType.AND, evalNode1, evalNode2);
+ scanNode.setQual(evalNodeA);
+
+ HBaseTablespace storageManager = (HBaseTablespace) TableSpaceManager.getByName("cluster1").get();
+ List<Set<EvalNode>> indexEvals = storageManager.findIndexablePredicateSet(scanNode, new Column[]{rowkeyColumn});
+ assertNotNull(indexEvals);
+ assertEquals(1, indexEvals.size());
+ Pair<Datum, Datum> indexPredicateValue = storageManager.getIndexablePredicateValue(null, indexEvals.get(0));
+ assertEquals("020", indexPredicateValue.getFirst().asChars());
+ assertEquals("055", indexPredicateValue.getSecond().asChars());
+
+ // where (rk >= '020' and rk <= '055') or rk = '075'
+ EvalNode evalNode3 = new BinaryEval(EvalType.EQUAL, new FieldEval(rowkeyColumn),new ConstEval(new TextDatum("075")));
+ EvalNode evalNodeB = new BinaryEval(EvalType.OR, evalNodeA, evalNode3);
+ scanNode.setQual(evalNodeB);
+ indexEvals = storageManager.findIndexablePredicateSet(scanNode, new Column[]{rowkeyColumn});
+ assertEquals(2, indexEvals.size());
+ indexPredicateValue = storageManager.getIndexablePredicateValue(null, indexEvals.get(0));
+ assertEquals("020", indexPredicateValue.getFirst().asChars());
+ assertEquals("055", indexPredicateValue.getSecond().asChars());
+
+ indexPredicateValue = storageManager.getIndexablePredicateValue(null, indexEvals.get(1));
+ assertEquals("075", indexPredicateValue.getFirst().asChars());
+ assertEquals("075", indexPredicateValue.getSecond().asChars());
+
+ // where (rk >= '020' and rk <= '055') or (rk >= '072' and rk <= '078')
+ EvalNode evalNode4 = new BinaryEval(EvalType.GEQ, new FieldEval(rowkeyColumn), new ConstEval(new TextDatum("072")));
+ EvalNode evalNode5 = new BinaryEval(EvalType.LEQ, new FieldEval(rowkeyColumn), new ConstEval(new TextDatum("078")));
+ EvalNode evalNodeC = new BinaryEval(EvalType.AND, evalNode4, evalNode5);
+ EvalNode evalNodeD = new BinaryEval(EvalType.OR, evalNodeA, evalNodeC);
+ scanNode.setQual(evalNodeD);
+ indexEvals = storageManager.findIndexablePredicateSet(scanNode, new Column[]{rowkeyColumn});
+ assertEquals(2, indexEvals.size());
+
+ indexPredicateValue = storageManager.getIndexablePredicateValue(null, indexEvals.get(0));
+ assertEquals("020", indexPredicateValue.getFirst().asChars());
+ assertEquals("055", indexPredicateValue.getSecond().asChars());
+
+ indexPredicateValue = storageManager.getIndexablePredicateValue(null, indexEvals.get(1));
+ assertEquals("072", indexPredicateValue.getFirst().asChars());
+ assertEquals("078", indexPredicateValue.getSecond().asChars());
+
+ // where (rk >= '020' and rk <= '055') or (rk >= '072' and rk <= '078' and rk >= '073')
+ evalNode4 = new BinaryEval(EvalType.GEQ, new FieldEval(rowkeyColumn), new ConstEval(new TextDatum("072")));
+ evalNode5 = new BinaryEval(EvalType.LEQ, new FieldEval(rowkeyColumn), new ConstEval(new TextDatum("078")));
+ evalNodeC = new BinaryEval(EvalType.AND, evalNode4, evalNode5);
+ EvalNode evalNode6 = new BinaryEval(EvalType.GEQ, new FieldEval(rowkeyColumn), new ConstEval(new TextDatum("073")));
+ evalNodeD = new BinaryEval(EvalType.AND, evalNodeC, evalNode6);
+ EvalNode evalNodeE = new BinaryEval(EvalType.OR, evalNodeA, evalNodeD);
+ scanNode.setQual(evalNodeE);
+ indexEvals = storageManager.findIndexablePredicateSet(scanNode, new Column[]{rowkeyColumn});
+ assertEquals(2, indexEvals.size());
+
+ indexPredicateValue = storageManager.getIndexablePredicateValue(null, indexEvals.get(0));
+ assertEquals("020", indexPredicateValue.getFirst().asChars());
+ assertEquals("055", indexPredicateValue.getSecond().asChars());
+
+ indexPredicateValue = storageManager.getIndexablePredicateValue(null, indexEvals.get(1));
+ assertEquals("073", indexPredicateValue.getFirst().asChars());
+ assertEquals("078", indexPredicateValue.getSecond().asChars());
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-storage/tajo-storage-hdfs/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/pom.xml b/tajo-storage/tajo-storage-hdfs/pom.xml
index 3a59ec9..9b98b0d 100644
--- a/tajo-storage/tajo-storage-hdfs/pom.xml
+++ b/tajo-storage/tajo-storage-hdfs/pom.xml
@@ -74,6 +74,8 @@
<configuration>
<excludes>
<exclude>src/test/resources/dataset/**</exclude>
+ <exclude>src/main/resources/*.json</exclude>
+ <exclude>src/test/resources/*.json</exclude>
</excludes>
</configuration>
</plugin>
@@ -350,10 +352,6 @@
<groupId>io.netty</groupId>
<artifactId>netty-buffer</artifactId>
</dependency>
- <dependency>
- <groupId>net.minidev</groupId>
- <artifactId>json-smart</artifactId>
- </dependency>
</dependencies>
<profiles>
http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileAppender.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileAppender.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileAppender.java
index 5c8242f..081fa3f 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileAppender.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileAppender.java
@@ -18,6 +18,7 @@
package org.apache.tajo.storage;
+import com.google.common.base.Optional;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -51,20 +52,23 @@ public abstract class FileAppender implements Appender {
this.workDir = workDir;
this.taskAttemptId = taskAttemptId;
- try {
- if (taskAttemptId != null) {
- if (!(conf instanceof TajoConf)) {
- throw new IllegalArgumentException("Configuration must be an instance of TajoConf");
- }
- this.path = ((FileTablespace) TableSpaceManager.getFileStorageManager((TajoConf) conf))
- .getAppenderFilePath(taskAttemptId, workDir);
- } else {
- this.path = workDir;
+ if (taskAttemptId != null) {
+ if (!(conf instanceof TajoConf)) {
+ throw new IllegalArgumentException("Configuration must be an instance of TajoConf");
}
- } catch (IOException e) {
- LOG.error(e.getMessage(), e);
- throw new IllegalStateException("Error while opeining FileAppender: " + e.getMessage(), e);
+
+ Optional<FileTablespace> spaceResult = TableSpaceManager.get(workDir.toUri());
+
+ if (!spaceResult.isPresent()) {
+ throw new IllegalStateException("No TableSpace for " + workDir.toUri());
+ }
+
+ FileTablespace space = spaceResult.get();
+ this.path = space.getAppenderFilePath(taskAttemptId, workDir);
+
+ } else {
+ this.path = workDir;
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java
index 6ab8574..2ce1f09 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java
@@ -35,18 +35,28 @@ import org.apache.tajo.plan.LogicalPlan;
import org.apache.tajo.plan.logical.LogicalNode;
import org.apache.tajo.plan.logical.NodeType;
import org.apache.tajo.plan.logical.ScanNode;
-import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule;
import org.apache.tajo.storage.fragment.FileFragment;
import org.apache.tajo.storage.fragment.Fragment;
import org.apache.tajo.util.Bytes;
import org.apache.tajo.util.TUtil;
import java.io.IOException;
+import java.net.URI;
import java.text.NumberFormat;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT;
+
public class FileTablespace extends Tablespace {
+
+ public static final PathFilter hiddenFileFilter = new PathFilter() {
+ public boolean accept(Path p) {
+ String name = p.getName();
+ return !name.startsWith("_") && !name.startsWith(".");
+ }
+ };
private final Log LOG = LogFactory.getLog(FileTablespace.class);
static final String OUTPUT_FILE_PREFIX="part-";
@@ -83,27 +93,54 @@ public class FileTablespace extends Tablespace {
};
protected FileSystem fs;
- protected Path tableBaseDir;
+ protected Path basePath;
protected boolean blocksMetadataEnabled;
private static final HdfsVolumeId zeroVolumeId = new HdfsVolumeId(Bytes.toBytes(0));
- public FileTablespace(String storeType) {
- super(storeType);
+ public FileTablespace(String spaceName, URI uri) {
+ super(spaceName, uri);
}
@Override
protected void storageInit() throws IOException {
- this.tableBaseDir = TajoConf.getWarehouseDir(conf);
- this.fs = tableBaseDir.getFileSystem(conf);
- this.blocksMetadataEnabled = conf.getBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED,
- DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT);
- if (!this.blocksMetadataEnabled)
+ this.basePath = new Path(uri);
+ this.fs = basePath.getFileSystem(conf);
+ this.conf.set(DFSConfigKeys.FS_DEFAULT_NAME_KEY, fs.getUri().toString());
+
+ this.blocksMetadataEnabled =
+ conf.getBoolean(DFS_HDFS_BLOCKS_METADATA_ENABLED, DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT);
+
+ if (!this.blocksMetadataEnabled) {
LOG.warn("does not support block metadata. ('dfs.datanode.hdfs-blocks-metadata.enabled')");
+ }
+ }
+
+ @Override
+ public void setConfig(String name, String value) {
+ conf.set(name, value);
+ }
+
+ @Override
+ public void setConfigs(Map<String, String> configs) {
+ for (Map.Entry<String, String> c : configs.entrySet()) {
+ conf.set(c.getKey(), c.getValue());
+ }
+ }
+
+ @Override
+ public long getTableVolume(URI uri) throws IOException {
+ Path path = new Path(uri);
+ ContentSummary summary = fs.getContentSummary(path);
+ return summary.getLength();
+ }
+
+ @Override
+ public URI getRootUri() {
+ return fs.getUri();
}
public Scanner getFileScanner(TableMeta meta, Schema schema, Path path)
throws IOException {
- FileSystem fs = path.getFileSystem(conf);
FileStatus status = fs.getFileStatus(path);
return getFileScanner(meta, schema, path, status);
}
@@ -128,8 +165,9 @@ public class FileTablespace extends Tablespace {
return fileSystem.exists(path);
}
- public Path getTablePath(String tableName) {
- return new Path(tableBaseDir, tableName);
+ @Override
+ public URI getTableUri(String databaseName, String tableName) {
+ return StorageUtil.concatPath(basePath, databaseName, tableName).toUri();
}
private String partitionPath = "";
@@ -154,12 +192,12 @@ public class FileTablespace extends Tablespace {
}
public FileFragment[] split(String tableName) throws IOException {
- Path tablePath = new Path(tableBaseDir, tableName);
+ Path tablePath = new Path(basePath, tableName);
return split(tableName, tablePath, fs.getDefaultBlockSize());
}
public FileFragment[] split(String tableName, long fragmentSize) throws IOException {
- Path tablePath = new Path(tableBaseDir, tableName);
+ Path tablePath = new Path(basePath, tableName);
return split(tableName, tablePath, fragmentSize);
}
@@ -314,7 +352,6 @@ public class FileTablespace extends Tablespace {
for (int i = 0; i < dirs.length; ++i) {
Path p = dirs[i];
- FileSystem fs = p.getFileSystem(conf);
FileStatus[] matches = fs.globStatus(p, inputFilter);
if (matches == null) {
errors.add(new IOException("Input path does not exist: " + p));
@@ -323,8 +360,7 @@ public class FileTablespace extends Tablespace {
} else {
for (FileStatus globStat : matches) {
if (globStat.isDirectory()) {
- for (FileStatus stat : fs.listStatus(globStat.getPath(),
- inputFilter)) {
+ for (FileStatus stat : fs.listStatus(globStat.getPath(), inputFilter)) {
result.add(stat);
}
} else {
@@ -492,8 +528,6 @@ public class FileTablespace extends Tablespace {
List<BlockLocation> blockLocations = Lists.newArrayList();
for (Path p : inputs) {
- FileSystem fs = p.getFileSystem(conf);
-
ArrayList<FileStatus> files = Lists.newArrayList();
if (fs.isFile(p)) {
files.addAll(Lists.newArrayList(fs.getFileStatus(p)));
@@ -586,7 +620,7 @@ public class FileTablespace extends Tablespace {
return;
}
- DistributedFileSystem fs = (DistributedFileSystem)DistributedFileSystem.get(conf);
+ DistributedFileSystem fs = (DistributedFileSystem) this.fs;
int lsLimit = conf.getInt(DFSConfigKeys.DFS_LIST_LIMIT, DFSConfigKeys.DFS_LIST_LIMIT_DEFAULT);
int blockLocationIdx = 0;
@@ -629,7 +663,7 @@ public class FileTablespace extends Tablespace {
@Override
public List<Fragment> getSplits(String tableName, TableDesc table, ScanNode scanNode) throws IOException {
- return getSplits(tableName, table.getMeta(), table.getSchema(), new Path(table.getPath()));
+ return getSplits(tableName, table.getMeta(), table.getSchema(), new Path(table.getUri()));
}
@Override
@@ -640,13 +674,13 @@ public class FileTablespace extends Tablespace {
String simpleTableName = splitted[1];
// create a table directory (i.e., ${WAREHOUSE_DIR}/${DATABASE_NAME}/${TABLE_NAME} )
- Path tablePath = StorageUtil.concatPath(tableBaseDir, databaseName, simpleTableName);
- tableDesc.setPath(tablePath.toUri());
+ Path tablePath = StorageUtil.concatPath(basePath, databaseName, simpleTableName);
+ tableDesc.setUri(tablePath.toUri());
} else {
- Preconditions.checkState(tableDesc.getPath() != null, "ERROR: LOCATION must be given.");
+ Preconditions.checkState(tableDesc.getUri() != null, "ERROR: LOCATION must be given.");
}
- Path path = new Path(tableDesc.getPath());
+ Path path = new Path(tableDesc.getUri());
FileSystem fs = path.getFileSystem(conf);
TableStats stats = new TableStats();
@@ -679,7 +713,7 @@ public class FileTablespace extends Tablespace {
@Override
public void purgeTable(TableDesc tableDesc) throws IOException {
try {
- Path path = new Path(tableDesc.getPath());
+ Path path = new Path(tableDesc.getUri());
FileSystem fs = path.getFileSystem(conf);
LOG.info("Delete table data dir: " + path);
fs.delete(path, true);
@@ -692,7 +726,7 @@ public class FileTablespace extends Tablespace {
public List<Fragment> getNonForwardSplit(TableDesc tableDesc, int currentPage, int numResultFragments) throws IOException {
// Listing table data file which is not empty.
// If the table is a partitioned table, return file list which has same partition key.
- Path tablePath = new Path(tableDesc.getPath());
+ Path tablePath = new Path(tableDesc.getUri());
FileSystem fs = tablePath.getFileSystem(conf);
//In the case of partitioned table, we should return same partition key data files.
@@ -704,7 +738,7 @@ public class FileTablespace extends Tablespace {
List<FileStatus> nonZeroLengthFiles = new ArrayList<FileStatus>();
if (fs.exists(tablePath)) {
if (!partitionPath.isEmpty()) {
- Path partPath = new Path(tableDesc.getPath() + partitionPath);
+ Path partPath = new Path(tableDesc.getUri() + partitionPath);
if (fs.exists(partPath)) {
getNonZeroLengthDataFiles(fs, partPath, nonZeroLengthFiles, currentPage, numResultFragments,
new AtomicInteger(0), tableDesc.hasPartition(), this.currentDepth, partitionDepth);
@@ -768,7 +802,7 @@ public class FileTablespace extends Tablespace {
// Intermediate directory
if (fs.isDirectory(path)) {
- FileStatus[] files = fs.listStatus(path, Tablespace.hiddenFileFilter);
+ FileStatus[] files = fs.listStatus(path, hiddenFileFilter);
if (files != null && files.length > 0) {
@@ -817,44 +851,43 @@ public class FileTablespace extends Tablespace {
}
}
- @Override
- public StorageProperty getStorageProperty() {
- StorageProperty storageProperty = new StorageProperty();
- storageProperty.setSortedInsert(false);
- if (storeType.equalsIgnoreCase("RAW")) {
- storageProperty.setSupportsInsertInto(false);
- } else {
- storageProperty.setSupportsInsertInto(true);
- }
+ private static final StorageProperty FileStorageProperties = new StorageProperty(true, true, true, true);
+ private static final FormatProperty GeneralFileProperties = new FormatProperty(false);
+ private static final FormatProperty HFileProperties = new FormatProperty(true);
- return storageProperty;
+ @Override
+ public StorageProperty getProperty() {
+ return FileStorageProperties;
}
@Override
- public void close() {
+ public FormatProperty getFormatProperty(String format) {
+ if (format.equalsIgnoreCase("hbase")) {
+ return HFileProperties;
+ } else {
+ return GeneralFileProperties;
+ }
}
@Override
- public void beforeInsertOrCATS(LogicalNode node) throws IOException {
+ public void close() {
}
@Override
- public void rollbackOutputCommit(LogicalNode node) throws IOException {
+ public void prepareTable(LogicalNode node) throws IOException {
}
@Override
- public void verifyInsertTableSchema(TableDesc tableDesc, Schema outSchema) throws IOException {
+ public void rollbackTable(LogicalNode node) throws IOException {
}
@Override
- public List<LogicalPlanRewriteRule> getRewriteRules(OverridableConf queryContext, TableDesc tableDesc)
- throws IOException {
- return null;
+ public void verifySchemaToWrite(TableDesc tableDesc, Schema outSchema) throws IOException {
}
@Override
- public Path commitOutputData(OverridableConf queryContext, ExecutionBlockId finalEbId, LogicalPlan plan,
- Schema schema, TableDesc tableDesc) throws IOException {
+ public Path commitTable(OverridableConf queryContext, ExecutionBlockId finalEbId, LogicalPlan plan,
+ Schema schema, TableDesc tableDesc) throws IOException {
return commitOutputData(queryContext, true);
}
@@ -879,8 +912,8 @@ public class FileTablespace extends Tablespace {
Path stagingDir = new Path(queryContext.get(QueryVars.STAGING_DIR));
Path stagingResultDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME);
Path finalOutputDir;
- if (!queryContext.get(QueryVars.OUTPUT_TABLE_PATH, "").isEmpty()) {
- finalOutputDir = new Path(queryContext.get(QueryVars.OUTPUT_TABLE_PATH));
+ if (!queryContext.get(QueryVars.OUTPUT_TABLE_URI, "").isEmpty()) {
+ finalOutputDir = new Path(queryContext.get(QueryVars.OUTPUT_TABLE_URI));
try {
FileSystem fs = stagingResultDir.getFileSystem(conf);
@@ -949,7 +982,7 @@ public class FileTablespace extends Tablespace {
if (fs.exists(finalOutputDir)) {
fs.mkdirs(oldTableDir);
- for (FileStatus status : fs.listStatus(finalOutputDir, Tablespace.hiddenFileFilter)) {
+ for (FileStatus status : fs.listStatus(finalOutputDir, hiddenFileFilter)) {
fs.rename(status.getPath(), oldTableDir);
}
@@ -971,7 +1004,7 @@ public class FileTablespace extends Tablespace {
if (movedToOldTable && !committed) {
// if commit is failed, recover the old data
- for (FileStatus status : fs.listStatus(finalOutputDir, Tablespace.hiddenFileFilter)) {
+ for (FileStatus status : fs.listStatus(finalOutputDir, hiddenFileFilter)) {
fs.delete(status.getPath(), true);
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java
index 66c7f13..49485f5 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java
@@ -82,8 +82,9 @@ public class HashShuffleAppenderManager {
if (!fs.exists(dataFile.getParent())) {
fs.mkdirs(dataFile.getParent());
}
- FileAppender appender = (FileAppender)((FileTablespace) TableSpaceManager.getFileStorageManager(tajoConf))
- .getAppender(meta, outSchema, dataFile);
+
+ FileTablespace space = (FileTablespace) TableSpaceManager.get(dataFile.toUri()).get();
+ FileAppender appender = (FileAppender) space.getAppender(meta, outSchema, dataFile);
appender.enableStats();
appender.init();
http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
index 75ad0d5..ab63d55 100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
@@ -122,7 +122,7 @@ public class TestCompressionStorages {
String fileName = "Compression_" + codec.getSimpleName();
Path tablePath = new Path(testDir, fileName);
- Appender appender = ((FileTablespace) TableSpaceManager.getFileStorageManager(conf)).getAppender(meta, schema, tablePath);
+ Appender appender = ((FileTablespace) TableSpaceManager.getLocalFs()).getAppender(meta, schema, tablePath);
appender.enableStats();
appender.init();
@@ -154,7 +154,7 @@ public class TestCompressionStorages {
FileFragment[] tablets = new FileFragment[1];
tablets[0] = new FileFragment(fileName, tablePath, 0, fileLen);
- Scanner scanner = TableSpaceManager.getFileStorageManager(conf).getScanner(meta, schema, tablets[0], schema);
+ Scanner scanner = TableSpaceManager.getLocalFs().getScanner(meta, schema, tablets[0], schema);
if (storeType.equalsIgnoreCase("CSV")) {
if (SplittableCompressionCodec.class.isAssignableFrom(codec)) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestDelimitedTextFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestDelimitedTextFile.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestDelimitedTextFile.java
index 9726ecc..2d919cd 100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestDelimitedTextFile.java
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestDelimitedTextFile.java
@@ -103,7 +103,7 @@ public class TestDelimitedTextFile {
TableMeta meta = CatalogUtil.newTableMeta("JSON");
meta.putOption(StorageUtil.TEXT_ERROR_TOLERANCE_MAXNUM, "-1");
FileFragment fragment = getFileFragment("testErrorTolerance1.json");
- Scanner scanner = TableSpaceManager.getFileStorageManager(conf).getScanner(meta, schema, fragment);
+ Scanner scanner = TableSpaceManager.getLocalFs().getScanner(meta, schema, fragment);
scanner.init();
Tuple tuple;
@@ -125,7 +125,7 @@ public class TestDelimitedTextFile {
TableMeta meta = CatalogUtil.newTableMeta("JSON");
meta.putOption(StorageUtil.TEXT_ERROR_TOLERANCE_MAXNUM, "1");
FileFragment fragment = getFileFragment("testErrorTolerance1.json");
- Scanner scanner = TableSpaceManager.getFileStorageManager(conf).getScanner(meta, schema, fragment);
+ Scanner scanner = TableSpaceManager.getLocalFs().getScanner(meta, schema, fragment);
scanner.init();
assertNotNull(scanner.next());
@@ -147,7 +147,7 @@ public class TestDelimitedTextFile {
TableMeta meta = CatalogUtil.newTableMeta("JSON");
meta.putOption(StorageUtil.TEXT_ERROR_TOLERANCE_MAXNUM, "0");
FileFragment fragment = getFileFragment("testErrorTolerance2.json");
- Scanner scanner = TableSpaceManager.getFileStorageManager(conf).getScanner(meta, schema, fragment);
+ Scanner scanner = TableSpaceManager.getLocalFs().getScanner(meta, schema, fragment);
scanner.init();
try {
@@ -166,7 +166,7 @@ public class TestDelimitedTextFile {
TableMeta meta = CatalogUtil.newTableMeta("JSON");
meta.putOption(StorageUtil.TEXT_ERROR_TOLERANCE_MAXNUM, "1");
FileFragment fragment = getFileFragment("testErrorTolerance3.json");
- Scanner scanner = TableSpaceManager.getFileStorageManager(conf).getScanner(meta, schema, fragment);
+ Scanner scanner = TableSpaceManager.getLocalFs().getScanner(meta, schema, fragment);
scanner.init();
try {
http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileStorageManager.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileStorageManager.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileStorageManager.java
deleted file mode 100644
index a6d6077..0000000
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileStorageManager.java
+++ /dev/null
@@ -1,233 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import com.google.common.collect.Lists;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.*;
-import org.apache.tajo.catalog.CatalogUtil;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.common.TajoDataTypes.Type;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.datum.Datum;
-import org.apache.tajo.datum.DatumFactory;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.storage.fragment.Fragment;
-import org.apache.tajo.util.CommonTestingUtil;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.UUID;
-
-import static org.junit.Assert.*;
-
-public class TestFileStorageManager {
- private TajoConf conf;
- private static String TEST_PATH = "target/test-data/TestFileStorageManager";
- private Path testDir;
- private FileSystem fs;
-
- @Before
- public void setUp() throws Exception {
- conf = new TajoConf();
- testDir = CommonTestingUtil.getTestDir(TEST_PATH);
- fs = testDir.getFileSystem(conf);
- }
-
- @After
- public void tearDown() throws Exception {
- }
-
- @Test
- public final void testGetScannerAndAppender() throws IOException {
- Schema schema = new Schema();
- schema.addColumn("id", Type.INT4);
- schema.addColumn("age",Type.INT4);
- schema.addColumn("name",Type.TEXT);
-
- TableMeta meta = CatalogUtil.newTableMeta("CSV");
-
- VTuple[] tuples = new VTuple[4];
- for(int i=0; i < tuples.length; i++) {
- tuples[i] = new VTuple(new Datum[] {
- DatumFactory.createInt4(i),
- DatumFactory.createInt4(i + 32),
- DatumFactory.createText("name" + i)});
- }
-
- Path path = StorageUtil.concatPath(testDir, "testGetScannerAndAppender", "table.csv");
- fs.mkdirs(path.getParent());
- FileTablespace fileStorageManager = (FileTablespace) TableSpaceManager.getFileStorageManager(conf);
- assertEquals(fs.getUri(), fileStorageManager.getFileSystem().getUri());
-
- Appender appender = fileStorageManager.getAppender(meta, schema, path);
- appender.init();
- for(Tuple t : tuples) {
- appender.addTuple(t);
- }
- appender.close();
-
- Scanner scanner = fileStorageManager.getFileScanner(meta, schema, path);
- scanner.init();
- int i=0;
- while(scanner.next() != null) {
- i++;
- }
- assertEquals(4,i);
- }
-
- @Test
- public void testGetSplit() throws Exception {
- final Configuration conf = new HdfsConfiguration();
- String testDataPath = TEST_PATH + "/" + UUID.randomUUID().toString();
- conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, testDataPath);
- conf.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, 0);
- conf.setBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED, false);
-
- final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
- .numDataNodes(1).build();
- cluster.waitClusterUp();
- TajoConf tajoConf = new TajoConf(conf);
- tajoConf.setVar(TajoConf.ConfVars.ROOT_DIR, cluster.getFileSystem().getUri() + "/tajo");
-
- int testCount = 10;
- Path tablePath = new Path("/testGetSplit");
- try {
- DistributedFileSystem fs = cluster.getFileSystem();
-
- // Create test partitions
- List<Path> partitions = Lists.newArrayList();
- for (int i =0; i < testCount; i++){
- Path tmpFile = new Path(tablePath, String.valueOf(i));
- DFSTestUtil.createFile(fs, new Path(tmpFile, "tmpfile.dat"), 10, (short) 2, 0xDEADDEADl);
- partitions.add(tmpFile);
- }
-
- assertTrue(fs.exists(tablePath));
- FileTablespace sm = (FileTablespace) TableSpaceManager.getFileStorageManager(tajoConf);
- assertEquals(fs.getUri(), sm.getFileSystem().getUri());
-
- Schema schema = new Schema();
- schema.addColumn("id", Type.INT4);
- schema.addColumn("age",Type.INT4);
- schema.addColumn("name",Type.TEXT);
- TableMeta meta = CatalogUtil.newTableMeta("CSV");
-
- List<Fragment> splits = Lists.newArrayList();
- // Get FileFragments in partition batch
- splits.addAll(sm.getSplits("data", meta, schema, partitions.toArray(new Path[partitions.size()])));
- assertEquals(testCount, splits.size());
- // -1 is unknown volumeId
- assertEquals(-1, ((FileFragment)splits.get(0)).getDiskIds()[0]);
-
- splits.clear();
- splits.addAll(sm.getSplits("data", meta, schema,
- partitions.subList(0, partitions.size() / 2).toArray(new Path[partitions.size() / 2])));
- assertEquals(testCount / 2, splits.size());
- assertEquals(1, splits.get(0).getHosts().length);
- assertEquals(-1, ((FileFragment)splits.get(0)).getDiskIds()[0]);
- fs.close();
- } finally {
- cluster.shutdown(true);
- }
- }
-
- @Test
- public void testGetSplitWithBlockStorageLocationsBatching() throws Exception {
- final Configuration conf = new HdfsConfiguration();
- String testDataPath = TEST_PATH + "/" + UUID.randomUUID().toString();
- conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, testDataPath);
- conf.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, 0);
- conf.setBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED, true);
-
- final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
- .numDataNodes(2).build();
- cluster.waitClusterUp();
-
- TajoConf tajoConf = new TajoConf(conf);
- tajoConf.setVar(TajoConf.ConfVars.ROOT_DIR, cluster.getFileSystem().getUri() + "/tajo");
-
- int testCount = 10;
- Path tablePath = new Path("/testGetSplitWithBlockStorageLocationsBatching");
- try {
- DistributedFileSystem fs = cluster.getFileSystem();
-
- // Create test files
- for (int i = 0; i < testCount; i++) {
- Path tmpFile = new Path(tablePath, "tmpfile" + i + ".dat");
- DFSTestUtil.createFile(fs, tmpFile, 10, (short) 2, 0xDEADDEADl);
- }
- assertTrue(fs.exists(tablePath));
- FileTablespace sm = (FileTablespace) TableSpaceManager.getFileStorageManager(tajoConf);
- assertEquals(fs.getUri(), sm.getFileSystem().getUri());
-
- Schema schema = new Schema();
- schema.addColumn("id", Type.INT4);
- schema.addColumn("age", Type.INT4);
- schema.addColumn("name", Type.TEXT);
- TableMeta meta = CatalogUtil.newTableMeta("CSV");
-
- List<Fragment> splits = Lists.newArrayList();
- splits.addAll(sm.getSplits("data", meta, schema, tablePath));
-
- assertEquals(testCount, splits.size());
- assertEquals(2, splits.get(0).getHosts().length);
- assertEquals(2, ((FileFragment)splits.get(0)).getDiskIds().length);
- assertNotEquals(-1, ((FileFragment)splits.get(0)).getDiskIds()[0]);
- fs.close();
- } finally {
- cluster.shutdown(true);
- }
- }
-
- @Test
- public void testStoreType() throws Exception {
- final Configuration hdfsConf = new HdfsConfiguration();
- String testDataPath = TEST_PATH + "/" + UUID.randomUUID().toString();
- hdfsConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, testDataPath);
- hdfsConf.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, 0);
- hdfsConf.setBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED, true);
-
- final MiniDFSCluster cluster = new MiniDFSCluster.Builder(hdfsConf)
- .numDataNodes(2).build();
- cluster.waitClusterUp();
-
- TajoConf tajoConf = new TajoConf(hdfsConf);
- tajoConf.setVar(TajoConf.ConfVars.ROOT_DIR, cluster.getFileSystem().getUri() + "/tajo");
-
- try {
- /* Local FileSystem */
- FileTablespace sm = (FileTablespace) TableSpaceManager.getStorageManager(conf, "CSV");
- assertEquals(fs.getUri(), sm.getFileSystem().getUri());
-
- /* Distributed FileSystem */
- sm = (FileTablespace) TableSpaceManager.getStorageManager(tajoConf, "CSV");
- assertNotEquals(fs.getUri(), sm.getFileSystem().getUri());
- assertEquals(cluster.getFileSystem().getUri(), sm.getFileSystem().getUri());
- } finally {
- cluster.shutdown(true);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileSystems.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileSystems.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileSystems.java
index a6c238b..9237e07 100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileSystems.java
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileSystems.java
@@ -57,7 +57,7 @@ public class TestFileSystems {
public TestFileSystems(FileSystem fs) throws IOException {
this.fs = fs;
this.conf = new TajoConf(fs.getConf());
- sm = (FileTablespace) TableSpaceManager.getFileStorageManager(conf);
+ sm = TableSpaceManager.getLocalFs();
testDir = getTestDir(this.fs, TEST_PATH);
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileTablespace.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileTablespace.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileTablespace.java
new file mode 100644
index 0000000..ec3e143
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileTablespace.java
@@ -0,0 +1,250 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.*;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.fragment.Fragment;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.List;
+import java.util.UUID;
+
+import static org.junit.Assert.*;
+
+public class TestFileTablespace {
+ private TajoConf conf;
+ private static String TEST_PATH = "target/test-data/TestFileTablespace";
+ private Path testDir;
+ private FileSystem localFs;
+
+ @Before
+ public void setUp() throws Exception {
+ conf = new TajoConf();
+ testDir = CommonTestingUtil.getTestDir(TEST_PATH);
+ localFs = testDir.getFileSystem(conf);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ }
+
+ @Test
+ public final void testGetScannerAndAppender() throws IOException {
+ Schema schema = new Schema();
+ schema.addColumn("id", Type.INT4);
+ schema.addColumn("age",Type.INT4);
+ schema.addColumn("name",Type.TEXT);
+
+ TableMeta meta = CatalogUtil.newTableMeta("CSV");
+
+ VTuple[] tuples = new VTuple[4];
+ for(int i=0; i < tuples.length; i++) {
+ tuples[i] = new VTuple(new Datum[] {
+ DatumFactory.createInt4(i),
+ DatumFactory.createInt4(i + 32),
+ DatumFactory.createText("name" + i)});
+ }
+
+ Path path = StorageUtil.concatPath(testDir, "testGetScannerAndAppender", "table.csv");
+ localFs.mkdirs(path.getParent());
+ FileTablespace fileStorageManager = (FileTablespace) TableSpaceManager.getLocalFs();
+ assertEquals(localFs.getUri(), fileStorageManager.getFileSystem().getUri());
+
+ Appender appender = fileStorageManager.getAppender(meta, schema, path);
+ appender.init();
+ for(Tuple t : tuples) {
+ appender.addTuple(t);
+ }
+ appender.close();
+
+ Scanner scanner = fileStorageManager.getFileScanner(meta, schema, path);
+ scanner.init();
+ int i=0;
+ while(scanner.next() != null) {
+ i++;
+ }
+ assertEquals(4,i);
+ }
+
+ @Test
+ public void testGetSplit() throws Exception {
+ final Configuration conf = new HdfsConfiguration();
+ String testDataPath = TEST_PATH + "/" + UUID.randomUUID().toString();
+ conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, testDataPath);
+ conf.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, 0);
+ conf.setBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED, false);
+
+ final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+ .numDataNodes(1).build();
+ cluster.waitClusterUp();
+ TajoConf tajoConf = new TajoConf(conf);
+ tajoConf.setVar(TajoConf.ConfVars.ROOT_DIR, cluster.getFileSystem().getUri() + "/tajo");
+
+ int testCount = 10;
+ Path tablePath = new Path("/testGetSplit");
+ try {
+ DistributedFileSystem fs = cluster.getFileSystem();
+
+ // Create test partitions
+ List<Path> partitions = Lists.newArrayList();
+ for (int i =0; i < testCount; i++){
+ Path tmpFile = new Path(tablePath, String.valueOf(i));
+ DFSTestUtil.createFile(fs, new Path(tmpFile, "tmpfile.dat"), 10, (short) 2, 0xDEADDEADl);
+ partitions.add(tmpFile);
+ }
+
+ assertTrue(fs.exists(tablePath));
+ FileTablespace space = new FileTablespace("testGetSplit", fs.getUri());
+ space.init(new TajoConf(conf));
+ assertEquals(fs.getUri(), space.getUri());
+
+ Schema schema = new Schema();
+ schema.addColumn("id", Type.INT4);
+ schema.addColumn("age",Type.INT4);
+ schema.addColumn("name",Type.TEXT);
+ TableMeta meta = CatalogUtil.newTableMeta("CSV");
+
+ List<Fragment> splits = Lists.newArrayList();
+ // Get FileFragments in partition batch
+ splits.addAll(space.getSplits("data", meta, schema, partitions.toArray(new Path[partitions.size()])));
+ assertEquals(testCount, splits.size());
+ // -1 is unknown volumeId
+ assertEquals(-1, ((FileFragment)splits.get(0)).getDiskIds()[0]);
+
+ splits.clear();
+ splits.addAll(space.getSplits("data", meta, schema,
+ partitions.subList(0, partitions.size() / 2).toArray(new Path[partitions.size() / 2])));
+ assertEquals(testCount / 2, splits.size());
+ assertEquals(1, splits.get(0).getHosts().length);
+ assertEquals(-1, ((FileFragment)splits.get(0)).getDiskIds()[0]);
+ fs.close();
+ } finally {
+ cluster.shutdown(true);
+ }
+ }
+
+ @Test
+ public void testGetSplitWithBlockStorageLocationsBatching() throws Exception {
+ final Configuration conf = new HdfsConfiguration();
+ String testDataPath = TEST_PATH + "/" + UUID.randomUUID().toString();
+ conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, testDataPath);
+ conf.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, 0);
+ conf.setBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED, true);
+
+ final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+ .numDataNodes(2).build();
+ cluster.waitClusterUp();
+
+ TajoConf tajoConf = new TajoConf(conf);
+ tajoConf.setVar(TajoConf.ConfVars.ROOT_DIR, cluster.getFileSystem().getUri() + "/tajo");
+
+ int testCount = 10;
+ Path tablePath = new Path("/testGetSplitWithBlockStorageLocationsBatching");
+ try {
+ DistributedFileSystem fs = cluster.getFileSystem();
+
+ // Create test files
+ for (int i = 0; i < testCount; i++) {
+ Path tmpFile = new Path(tablePath, "tmpfile" + i + ".dat");
+ DFSTestUtil.createFile(fs, tmpFile, 10, (short) 2, 0xDEADDEADl);
+ }
+ assertTrue(fs.exists(tablePath));
+
+ FileTablespace sm = new FileTablespace("testGetSplitWithBlockStorageLocationsBatching", fs.getUri());
+ sm.init(new TajoConf(conf));
+
+ assertEquals(fs.getUri(), sm.getUri());
+
+ Schema schema = new Schema();
+ schema.addColumn("id", Type.INT4);
+ schema.addColumn("age", Type.INT4);
+ schema.addColumn("name", Type.TEXT);
+ TableMeta meta = CatalogUtil.newTableMeta("CSV");
+
+ List<Fragment> splits = Lists.newArrayList();
+ splits.addAll(sm.getSplits("data", meta, schema, tablePath));
+
+ assertEquals(testCount, splits.size());
+ assertEquals(2, splits.get(0).getHosts().length);
+ assertEquals(2, ((FileFragment)splits.get(0)).getDiskIds().length);
+ assertNotEquals(-1, ((FileFragment)splits.get(0)).getDiskIds()[0]);
+ fs.close();
+ } finally {
+ cluster.shutdown(true);
+ }
+ }
+
+ @Test
+ public void testGetFileTablespace() throws Exception {
+ final Configuration hdfsConf = new HdfsConfiguration();
+ String testDataPath = TEST_PATH + "/" + UUID.randomUUID().toString();
+ hdfsConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, testDataPath);
+ hdfsConf.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, 0);
+ hdfsConf.setBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED, true);
+
+ final MiniDFSCluster cluster =
+ new MiniDFSCluster.Builder(hdfsConf).numDataNodes(2).build();
+ cluster.waitClusterUp();
+ URI uri = URI.create(cluster.getFileSystem().getUri() + "/tajo");
+
+ Optional<Tablespace> existingTs = Optional.absent();
+ try {
+ /* Local FileSystem */
+ FileTablespace space = TableSpaceManager.getLocalFs();
+ assertEquals(localFs.getUri(), space.getFileSystem().getUri());
+
+ FileTablespace distTablespace = new FileTablespace("testGetFileTablespace", uri);
+ distTablespace.init(conf);
+ existingTs = TableSpaceManager.addTableSpaceForTest(distTablespace);
+
+ /* Distributed FileSystem */
+ space = (FileTablespace) TableSpaceManager.get(uri).get();
+ assertEquals(cluster.getFileSystem().getUri(), space.getFileSystem().getUri());
+
+ space = (FileTablespace) TableSpaceManager.getByName("testGetFileTablespace").get();
+ assertEquals(cluster.getFileSystem().getUri(), space.getFileSystem().getUri());
+
+ } finally {
+
+ if (existingTs.isPresent()) {
+ TableSpaceManager.addTableSpaceForTest(existingTs.get());
+ }
+
+ cluster.shutdown(true);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestLineReader.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestLineReader.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestLineReader.java
index 266f906..c13ce16 100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestLineReader.java
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestLineReader.java
@@ -65,7 +65,7 @@ public class TestLineReader {
TableMeta meta = CatalogUtil.newTableMeta("TEXT");
Path tablePath = new Path(testDir, "line.data");
- FileAppender appender = (FileAppender) TableSpaceManager.getFileStorageManager(conf).getAppender(
+ FileAppender appender = (FileAppender) TableSpaceManager.getLocalFs().getAppender(
null, null, meta, schema, tablePath);
appender.enableStats();
appender.init();
@@ -118,7 +118,7 @@ public class TestLineReader {
meta.putOption("compression.codec", DeflateCodec.class.getCanonicalName());
Path tablePath = new Path(testDir, "testLineDelimitedReaderWithCompression." + DeflateCodec.class.getSimpleName());
- FileAppender appender = (FileAppender) TableSpaceManager.getFileStorageManager(conf).getAppender(
+ FileAppender appender = (FileAppender) (TableSpaceManager.getLocalFs()).getAppender(
null, null, meta, schema, tablePath);
appender.enableStats();
appender.init();
@@ -176,7 +176,7 @@ public class TestLineReader {
TableMeta meta = CatalogUtil.newTableMeta("TEXT");
Path tablePath = new Path(testDir, "testLineDelimitedReader");
- FileAppender appender = (FileAppender) TableSpaceManager.getFileStorageManager(conf).getAppender(
+ FileAppender appender = (FileAppender) TableSpaceManager.getLocalFs().getAppender(
null, null, meta, schema, tablePath);
appender.enableStats();
appender.init();
@@ -279,7 +279,7 @@ public class TestLineReader {
TableMeta meta = CatalogUtil.newTableMeta("TEXT");
Path tablePath = new Path(testDir, "testSeekableByteBufLineReader.data");
- FileAppender appender = (FileAppender) TableSpaceManager.getFileStorageManager(conf).getAppender(
+ FileAppender appender = (FileAppender) TableSpaceManager.getLocalFs().getAppender(
null, null, meta, schema, tablePath);
appender.enableStats();
appender.init();
http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestMergeScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestMergeScanner.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestMergeScanner.java
index 03a601d..79928ff 100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestMergeScanner.java
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestMergeScanner.java
@@ -94,7 +94,7 @@ public class TestMergeScanner {
conf.setStrings("tajo.storage.projectable-scanner", "rcfile", "parquet", "avro");
testDir = CommonTestingUtil.getTestDir(TEST_PATH);
fs = testDir.getFileSystem(conf);
- sm = TableSpaceManager.getFileStorageManager(conf);
+ sm = TableSpaceManager.getLocalFs();
}
@Test
@@ -114,7 +114,7 @@ public class TestMergeScanner {
}
Path table1Path = new Path(testDir, storeType + "_1.data");
- Appender appender1 = TableSpaceManager.getFileStorageManager(conf).getAppender(null, null, meta, schema, table1Path);
+ Appender appender1 = TableSpaceManager.getLocalFs().getAppender(null, null, meta, schema, table1Path);
appender1.enableStats();
appender1.init();
int tupleNum = 10000;
@@ -136,7 +136,7 @@ public class TestMergeScanner {
}
Path table2Path = new Path(testDir, storeType + "_2.data");
- Appender appender2 = TableSpaceManager.getFileStorageManager(conf).getAppender(null, null, meta, schema, table2Path);
+ Appender appender2 = TableSpaceManager.getLocalFs().getAppender(null, null, meta, schema, table2Path);
appender2.enableStats();
appender2.init();