You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2015/06/25 10:09:02 UTC
[10/15] tajo git commit: TAJO-1616: Implement TablespaceManager to
load Tablespaces. (missed commits)
http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TablespaceManager.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TablespaceManager.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TablespaceManager.java
new file mode 100644
index 0000000..26af769
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TablespaceManager.java
@@ -0,0 +1,390 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import net.minidev.json.JSONObject;
+import net.minidev.json.parser.JSONParser;
+import net.minidev.json.parser.ParseException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.TajoConstants;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.storage.fragment.Fragment;
+import org.apache.tajo.util.FileUtil;
+import org.apache.tajo.util.Pair;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.net.URI;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.UUID;
+
+import static org.apache.tajo.storage.StorageConstants.LOCAL_FS_URI;
+
+/**
+ * It handles available table spaces and cache TableSpace instances.
+ *
+ * Default tablespace must be a filesystem-based one.
+ * HDFS and S3 can be a default tablespace if a Tajo cluster is in fully distributed mode.
+ * Local file system can be a default tablespace if a Tajo cluster runs on a single machine.
+ */
+public class TablespaceManager implements StorageService {
+ private static final Log LOG = LogFactory.getLog(TablespaceManager.class);
+
+ public static final String DEFAULT_CONFIG_FILE = "storage-default.json";
+ public static final String SITE_CONFIG_FILE = "storage-site.json";
+
+ /** default tablespace name */
+ public static final String DEFAULT_TABLESPACE_NAME = "default";
+
+ private final static TajoConf systemConf = new TajoConf();
+ private final static JSONParser parser = new JSONParser(JSONParser.MODE_JSON_SIMPLE | JSONParser.IGNORE_CONTROL_CHAR);
+
+ // The relation ship among name, URI, Tablespaces must be kept 1:1:1.
+ protected static final Map<String, URI> SPACES_URIS_MAP = Maps.newHashMap();
+ protected static final TreeMap<URI, Tablespace> TABLE_SPACES = Maps.newTreeMap();
+
+ protected static final Map<Class<?>, Constructor<?>> CONSTRUCTORS = Maps.newHashMap();
+ protected static final Map<String, Class<? extends Tablespace>> TABLE_SPACE_HANDLERS = Maps.newHashMap();
+
+ public static final Class [] TABLESPACE_PARAM = new Class [] {String.class, URI.class};
+
+ static {
+ instance = new TablespaceManager();
+ }
+ /**
+ * Singleton instance
+ */
+ private static final TablespaceManager instance;
+
+ private TablespaceManager() {
+ initForDefaultConfig(); // loading storage-default.json
+ initSiteConfig(); // storage-site.json will override the configs of storage-default.json
+ addWarehouseAsSpace(); // adding a warehouse directory for a default tablespace
+ addLocalFsTablespace(); // adding a tablespace using local file system by default
+ }
+
+ private void addWarehouseAsSpace() {
+ Path warehouseDir = TajoConf.getWarehouseDir(systemConf);
+ registerTableSpace(DEFAULT_TABLESPACE_NAME, warehouseDir.toUri(), null, true, false);
+ }
+
+ private void addLocalFsTablespace() {
+ if (TABLE_SPACES.headMap(LOCAL_FS_URI, true).firstEntry() == null) {
+ String tmpName = UUID.randomUUID().toString();
+ registerTableSpace(tmpName, LOCAL_FS_URI, null, false, false);
+ }
+ }
+
+ public static TablespaceManager getInstance() {
+ return instance;
+ }
+
+ private void initForDefaultConfig() {
+ JSONObject json = loadFromConfig(DEFAULT_CONFIG_FILE);
+ if (json == null) {
+ throw new IllegalStateException("There is no " + SITE_CONFIG_FILE);
+ }
+ applyConfig(json, false);
+ }
+
+ private void initSiteConfig() {
+ JSONObject json = loadFromConfig(SITE_CONFIG_FILE);
+
+ // if there is no storage-site.json file, nothing happen.
+ if (json != null) {
+ applyConfig(json, true);
+ }
+ }
+
+ private JSONObject loadFromConfig(String fileName) {
+ String json;
+ try {
+ json = FileUtil.readTextFileFromResource(fileName);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ if (json != null) {
+ return parseJson(json);
+ } else {
+ return null;
+ }
+ }
+
+ private static JSONObject parseJson(String json) {
+ try {
+ return (JSONObject) parser.parse(json);
+ } catch (ParseException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void applyConfig(JSONObject json, boolean override) {
+ loadStorages(json);
+ loadTableSpaces(json, override);
+ }
+
+ private void loadStorages(JSONObject json) {
+ JSONObject spaces = (JSONObject) json.get(KEY_STORAGES);
+
+ if (spaces != null) {
+ Pair<String, Class<? extends Tablespace>> pair = null;
+ for (Map.Entry<String, Object> entry : spaces.entrySet()) {
+
+ try {
+ pair = extractStorage(entry);
+ } catch (ClassNotFoundException e) {
+ LOG.warn(e);
+ continue;
+ }
+
+ TABLE_SPACE_HANDLERS.put(pair.getFirst(), pair.getSecond());
+ }
+ }
+ }
+
+ private Pair<String, Class<? extends Tablespace>> extractStorage(Map.Entry<String, Object> entry)
+ throws ClassNotFoundException {
+
+ String storageType = entry.getKey();
+ JSONObject storageDesc = (JSONObject) entry.getValue();
+ String handlerClass = (String) storageDesc.get(KEY_STORAGE_HANDLER);
+
+ return new Pair<String, Class<? extends Tablespace>>(
+ storageType,(Class<? extends Tablespace>) Class.forName(handlerClass));
+ }
+
+ private void loadTableSpaces(JSONObject json, boolean override) {
+ JSONObject spaces = (JSONObject) json.get(KEY_SPACES);
+
+ if (spaces != null) {
+ for (Map.Entry<String, Object> entry : spaces.entrySet()) {
+ AddTableSpace(entry.getKey(), (JSONObject) entry.getValue(), override);
+ }
+ }
+ }
+
+ public static void AddTableSpace(String spaceName, JSONObject spaceDesc, boolean override) {
+ boolean defaultSpace = Boolean.parseBoolean(spaceDesc.getAsString("default"));
+ URI spaceUri = URI.create(spaceDesc.getAsString("uri"));
+
+ if (defaultSpace) {
+ registerTableSpace(DEFAULT_TABLESPACE_NAME, spaceUri, spaceDesc, true, override);
+ }
+ registerTableSpace(spaceName, spaceUri, spaceDesc, true, override);
+ }
+
+ private static void registerTableSpace(String spaceName, URI uri, JSONObject spaceDesc,
+ boolean visible, boolean override) {
+ Tablespace tableSpace = initializeTableSpace(spaceName, uri, visible);
+ tableSpace.setVisible(visible);
+
+ try {
+ tableSpace.init(systemConf);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ putTablespace(tableSpace, override);
+
+ // If the arbitrary path is allowed, root uri is also added as a tablespace
+ if (tableSpace.getProperty().isArbitraryPathAllowed()) {
+ URI rootUri = tableSpace.getRootUri();
+ // if there already exists or the rootUri is 'file:/', it won't overwrite the tablespace.
+ if (!TABLE_SPACES.containsKey(rootUri) && !rootUri.toString().startsWith(LOCAL_FS_URI.toString())) {
+ String tmpName = UUID.randomUUID().toString();
+ registerTableSpace(tmpName, rootUri, spaceDesc, false, override);
+ }
+ }
+ }
+
+ private static void putTablespace(Tablespace space, boolean override) {
+ // It is a device to keep the relationship among name, URI, and tablespace 1:1:1.
+
+ boolean nameExist = SPACES_URIS_MAP.containsKey(space.getName());
+ boolean uriExist = TABLE_SPACES.containsKey(space.uri);
+
+ boolean mismatch = nameExist && !SPACES_URIS_MAP.get(space.getName()).equals(space.getUri());
+ mismatch = mismatch || uriExist && TABLE_SPACES.get(space.uri).equals(space);
+
+ if (!override && mismatch) {
+ throw new RuntimeException("Name or URI of Tablespace must be unique.");
+ }
+
+ SPACES_URIS_MAP.put(space.getName(), space.getUri());
+ // We must guarantee that the same uri results in the same tablespace instance.
+ TABLE_SPACES.put(space.getUri(), space);
+ }
+
+ /**
+ * Return length of the fragment.
+ * In the UNKNOWN_LENGTH case get FRAGMENT_ALTERNATIVE_UNKNOWN_LENGTH from the configuration.
+ *
+ * @param conf Tajo system property
+ * @param fragment Fragment
+ * @return
+ */
+ public static long guessFragmentVolume(TajoConf conf, Fragment fragment) {
+ if (fragment.getLength() == TajoConstants.UNKNOWN_LENGTH) {
+ return conf.getLongVar(TajoConf.ConfVars.FRAGMENT_ALTERNATIVE_UNKNOWN_LENGTH);
+ } else {
+ return fragment.getLength();
+ }
+ }
+
+ public static final String KEY_STORAGES = "storages"; // storages
+ public static final String KEY_STORAGE_HANDLER = "handler"; // storages/?/handler
+ public static final String KEY_STORAGE_DEFAULT_FORMAT = "default-format"; // storages/?/default-format
+
+ public static final String KEY_SPACES = "spaces";
+
+ private static Tablespace initializeTableSpace(String spaceName, URI uri, boolean visible) {
+ Preconditions.checkNotNull(uri.getScheme(), "URI must include scheme, but it was " + uri);
+ Class<? extends Tablespace> clazz = TABLE_SPACE_HANDLERS.get(uri.getScheme());
+
+ if (clazz == null) {
+ throw new RuntimeException("There is no tablespace for " + uri.toString());
+ }
+
+ try {
+ Constructor<? extends Tablespace> constructor =
+ (Constructor<? extends Tablespace>) CONSTRUCTORS.get(clazz);
+
+ if (constructor == null) {
+ constructor = clazz.getDeclaredConstructor(TABLESPACE_PARAM);
+ constructor.setAccessible(true);
+ CONSTRUCTORS.put(clazz, constructor);
+ }
+
+ return constructor.newInstance(new Object[]{spaceName, uri});
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @VisibleForTesting
+ public static Optional<Tablespace> addTableSpaceForTest(Tablespace space) {
+ Tablespace existing;
+ synchronized (SPACES_URIS_MAP) {
+ // Remove existing one
+ SPACES_URIS_MAP.remove(space.getName());
+ existing = TABLE_SPACES.remove(space.getUri());
+
+ // Add anotherone for test
+ registerTableSpace(space.name, space.uri, null, true, true);
+ }
+ // if there is an existing one, return it.
+ return Optional.fromNullable(existing);
+ }
+
+ public Iterable<String> getSupportSchemes() {
+ return TABLE_SPACE_HANDLERS.keySet();
+ }
+
+ /**
+ * Get tablespace for the given URI. If uri is null, the default tablespace will be returned
+ *
+ * @param uri Table or Table Fragment URI.
+ * @param <T> Tablespace class type
+ * @return Tablespace. If uri is null, the default tablespace will be returned.
+ */
+ public static <T extends Tablespace> Optional<T> get(@Nullable String uri) {
+
+ if (uri == null || uri.isEmpty()) {
+ return (Optional<T>) Optional.of(getDefault());
+ }
+
+ Tablespace lastOne = null;
+
+ // Find the longest matched one. For example, assume that the caller tries to find /x/y/z, and
+ // there are /x and /x/y. In this case, /x/y will be chosen because it is more specific.
+ for (Map.Entry<URI, Tablespace> entry: TABLE_SPACES.headMap(URI.create(uri), true).entrySet()) {
+ if (uri.startsWith(entry.getKey().toString())) {
+ lastOne = entry.getValue();
+ }
+ }
+ return (Optional<T>) Optional.fromNullable(lastOne);
+ }
+
+ /**
+ * Get tablespace for the given URI. If uri is null, the default tablespace will be returned
+ *
+ * @param uri Table or Table Fragment URI.
+ * @param <T> Tablespace class type
+ * @return Tablespace. If uri is null, the default tablespace will be returned.
+ */
+ public static <T extends Tablespace> Optional<T> get(@Nullable URI uri) {
+ if (uri == null) {
+ return (Optional<T>) Optional.of(getDefault());
+ } else {
+ return (Optional<T>) get(uri.toString());
+ }
+ }
+
+ /**
+ * It returns the default tablespace. This method ensures that it always return the tablespace.
+ *
+ * @return
+ */
+ public static <T extends Tablespace> T getDefault() {
+ return (T) getByName(DEFAULT_TABLESPACE_NAME).get();
+ }
+
+ public static <T extends Tablespace> T getLocalFs() {
+ return (T) get(LOCAL_FS_URI).get();
+ }
+
+ public static Optional<? extends Tablespace> getByName(String name) {
+ URI uri = SPACES_URIS_MAP.get(name);
+ if (uri != null) {
+ return Optional.of(TABLE_SPACES.get(uri));
+ } else {
+ return Optional.absent();
+ }
+ }
+
+ public static Optional<? extends Tablespace> getAnyByScheme(String scheme) {
+ for (Map.Entry<URI, Tablespace> entry : TABLE_SPACES.entrySet()) {
+ String uriScheme = entry.getKey().getScheme();
+ if (uriScheme != null && uriScheme.equalsIgnoreCase(scheme)) {
+ return Optional.of(entry.getValue());
+ }
+ }
+
+ return Optional.absent();
+ }
+
+ @Override
+ public URI getTableURI(@Nullable String spaceName, String databaseName, String tableName) {
+ Tablespace space = spaceName == null ? getDefault() : getByName(spaceName).get();
+ return space.getTableUri(databaseName, tableName);
+ }
+
+ public static Iterable<Tablespace> getAllTablespaces() {
+ return TABLE_SPACES.values();
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java
index 916aae7..7943134 100644
--- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java
@@ -26,7 +26,7 @@ import org.apache.hadoop.hbase.client.Put;
import org.apache.tajo.TaskAttemptId;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.storage.TableSpaceManager;
+import org.apache.tajo.storage.TablespaceManager;
import org.apache.tajo.storage.Tuple;
import java.io.IOException;
@@ -47,7 +47,7 @@ public class HBasePutAppender extends AbstractHBaseAppender {
public void init() throws IOException {
super.init();
- HBaseTablespace space = (HBaseTablespace) TableSpaceManager.get(uri).get();
+ HBaseTablespace space = (HBaseTablespace) TablespaceManager.get(uri).get();
HConnection hconn = space.getConnection();
htable = hconn.getTable(columnMapping.getHbaseTableName());
htable.setAutoFlushTo(false);
http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java
index 16f4c14..7369897 100644
--- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java
@@ -133,7 +133,7 @@ public class HBaseScanner implements Scanner {
rowKeyDelimiter = columnMapping.getRowKeyDelimiter();
rowKeyFieldIndexes = columnMapping.getRowKeyFieldIndexes();
- HBaseTablespace space = (HBaseTablespace) TableSpaceManager.get(fragment.getUri()).get();
+ HBaseTablespace space = (HBaseTablespace) TablespaceManager.get(fragment.getUri()).get();
hbaseConf = space.getHbaseConf();
initScanner();
}
@@ -181,7 +181,7 @@ public class HBaseScanner implements Scanner {
}
if (htable == null) {
- HConnection hconn = ((HBaseTablespace) TableSpaceManager.get(fragment.getUri()).get()).getConnection();
+ HConnection hconn = ((HBaseTablespace) TablespaceManager.get(fragment.getUri()).get()).getConnection();
htable = hconn.getTable(fragment.getHbaseTableName());
}
scanner = htable.getScanner(scan);
http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java
index 5fac0cf..18bb7ed 100644
--- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java
@@ -51,10 +51,7 @@ import org.apache.tajo.plan.logical.NodeType;
import org.apache.tajo.plan.logical.ScanNode;
import org.apache.tajo.storage.*;
import org.apache.tajo.storage.fragment.Fragment;
-import org.apache.tajo.util.Bytes;
-import org.apache.tajo.util.BytesUtils;
-import org.apache.tajo.util.Pair;
-import org.apache.tajo.util.TUtil;
+import org.apache.tajo.util.*;
import java.io.BufferedReader;
import java.io.IOException;
@@ -68,9 +65,9 @@ import java.util.*;
public class HBaseTablespace extends Tablespace {
private static final Log LOG = LogFactory.getLog(HBaseTablespace.class);
- public static final StorageProperty HBASE_STORAGE_PROPERTIES = new StorageProperty(false, true, true, false);
-
- public static final FormatProperty HFILE_FORMAT_PROPERTIES = new FormatProperty(true);
+ public static final StorageProperty HBASE_STORAGE_PROPERTIES = new StorageProperty("hbase", false, true, false);
+ public static final FormatProperty HFILE_FORMAT_PROPERTIES = new FormatProperty(true, false, true);
+ public static final FormatProperty PUT_MODE_PROPERTIES = new FormatProperty(true, true, false);
private Configuration hbaseConf;
@@ -572,6 +569,15 @@ public class HBaseTablespace extends Tablespace {
}
@Override
+ public Appender getAppenderForInsertRow(OverridableConf queryContext,
+ TaskAttemptId taskAttemptId,
+ TableMeta meta,
+ Schema schema,
+ Path workDir) throws IOException {
+ return new HBasePutAppender(conf, uri, taskAttemptId, schema, meta, workDir);
+ }
+
+ @Override
public Appender getAppender(OverridableConf queryContext,
TaskAttemptId taskAttemptId, TableMeta meta, Schema schema, Path workDir)
throws IOException {
@@ -1096,8 +1102,14 @@ public class HBaseTablespace extends Tablespace {
}
@Override
- public FormatProperty getFormatProperty(String format) {
- return HFILE_FORMAT_PROPERTIES;
+ public FormatProperty getFormatProperty(TableMeta meta) {
+ KeyValueSet tableProperty = meta.getOptions();
+ if (tableProperty.isTrue(HBaseStorageConstants.INSERT_PUT_MODE) ||
+ tableProperty.isTrue(StorageConstants.INSERT_DIRECTLY)) {
+ return PUT_MODE_PROPERTIES;
+ } else {
+ return HFILE_FORMAT_PROPERTIES;
+ }
}
public void prepareTable(LogicalNode node) throws IOException {
@@ -1134,6 +1146,24 @@ public class HBaseTablespace extends Tablespace {
}
@Override
+ public URI getStagingUri(OverridableConf context, String queryId, TableMeta meta) throws IOException {
+ if (meta.getOptions().isTrue(HBaseStorageConstants.INSERT_PUT_MODE)) {
+ throw new IOException("Staging phase is not supported in this storage.");
+ } else {
+ return TablespaceManager.getDefault().getStagingUri(context, queryId, meta);
+ }
+ }
+
+ public URI prepareStagingSpace(TajoConf conf, String queryId, OverridableConf context,
+ TableMeta meta) throws IOException {
+ if (!meta.getOptions().isTrue(HBaseStorageConstants.INSERT_PUT_MODE)) {
+ return TablespaceManager.getDefault().prepareStagingSpace(conf, queryId, context, meta);
+ } else {
+ throw new IOException("Staging phase is not supported in this storage.");
+ }
+ }
+
+ @Override
public void verifySchemaToWrite(TableDesc tableDesc, Schema outSchema) throws IOException {
if (tableDesc != null) {
Schema tableSchema = tableDesc.getSchema();
http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-storage/tajo-storage-hbase/src/test/java/org/apache/tajo/storage/hbase/TestHBaseTableSpace.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/test/java/org/apache/tajo/storage/hbase/TestHBaseTableSpace.java b/tajo-storage/tajo-storage-hbase/src/test/java/org/apache/tajo/storage/hbase/TestHBaseTableSpace.java
index f7cbb5a..f0c8f15 100644
--- a/tajo-storage/tajo-storage-hbase/src/test/java/org/apache/tajo/storage/hbase/TestHBaseTableSpace.java
+++ b/tajo-storage/tajo-storage-hbase/src/test/java/org/apache/tajo/storage/hbase/TestHBaseTableSpace.java
@@ -25,7 +25,7 @@ import org.apache.tajo.datum.Datum;
import org.apache.tajo.datum.TextDatum;
import org.apache.tajo.plan.expr.*;
import org.apache.tajo.plan.logical.ScanNode;
-import org.apache.tajo.storage.TableSpaceManager;
+import org.apache.tajo.storage.TablespaceManager;
import org.apache.tajo.util.Pair;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -45,7 +45,7 @@ public class TestHBaseTableSpace {
String tableSpaceUri = "hbase:zk://host1:2171";
HBaseTablespace hBaseTablespace = new HBaseTablespace("cluster1", URI.create(tableSpaceUri));
hBaseTablespace.init(new TajoConf());
- TableSpaceManager.addTableSpaceForTest(hBaseTablespace);
+ TablespaceManager.addTableSpaceForTest(hBaseTablespace);
}
@Test
@@ -58,8 +58,8 @@ public class TestHBaseTableSpace {
@Test
public void testTablespaceHandler() throws Exception {
- assertTrue((TableSpaceManager.getByName("cluster1").get()) instanceof HBaseTablespace);
- assertTrue((TableSpaceManager.get(URI.create("hbase:zk://host1:2171")).get())
+ assertTrue((TablespaceManager.getByName("cluster1").get()) instanceof HBaseTablespace);
+ assertTrue((TablespaceManager.get(URI.create("hbase:zk://host1:2171")).get())
instanceof HBaseTablespace);
}
@@ -73,7 +73,7 @@ public class TestHBaseTableSpace {
EvalNode evalNodeA = new BinaryEval(EvalType.AND, evalNode1, evalNode2);
scanNode.setQual(evalNodeA);
- HBaseTablespace storageManager = (HBaseTablespace) TableSpaceManager.getByName("cluster1").get();
+ HBaseTablespace storageManager = (HBaseTablespace) TablespaceManager.getByName("cluster1").get();
List<Set<EvalNode>> indexEvals = storageManager.findIndexablePredicateSet(scanNode, new Column[]{rowkeyColumn});
assertNotNull(indexEvals);
assertEquals(1, indexEvals.size());
http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileAppender.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileAppender.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileAppender.java
index 081fa3f..efe2bfd 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileAppender.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileAppender.java
@@ -58,7 +58,7 @@ public abstract class FileAppender implements Appender {
throw new IllegalArgumentException("Configuration must be an instance of TajoConf");
}
- Optional<FileTablespace> spaceResult = TableSpaceManager.get(workDir.toUri());
+ Optional<FileTablespace> spaceResult = TablespaceManager.get(workDir.toUri());
if (!spaceResult.isPresent()) {
throw new IllegalStateException("No TableSpace for " + workDir.toUri());
http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java
index 2ce1f09..3b63012 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java
@@ -19,14 +19,17 @@
package org.apache.tajo.storage;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.tajo.*;
import org.apache.tajo.catalog.*;
import org.apache.tajo.catalog.statistics.TableStats;
@@ -92,8 +95,12 @@ public class FileTablespace extends Tablespace {
}
};
+ private static final StorageProperty FileStorageProperties = new StorageProperty("TEXT", true, true, true);
+ private static final FormatProperty GeneralFileProperties = new FormatProperty(true, false, true);
+
protected FileSystem fs;
- protected Path basePath;
+ protected Path spacePath;
+ protected Path stagingRootPath;
protected boolean blocksMetadataEnabled;
private static final HdfsVolumeId zeroVolumeId = new HdfsVolumeId(Bytes.toBytes(0));
@@ -103,8 +110,9 @@ public class FileTablespace extends Tablespace {
@Override
protected void storageInit() throws IOException {
- this.basePath = new Path(uri);
- this.fs = basePath.getFileSystem(conf);
+ this.spacePath = new Path(uri);
+ this.fs = spacePath.getFileSystem(conf);
+ this.stagingRootPath = fs.makeQualified(new Path(conf.getVar(TajoConf.ConfVars.STAGING_ROOT_DIR)));
this.conf.set(DFSConfigKeys.FS_DEFAULT_NAME_KEY, fs.getUri().toString());
this.blocksMetadataEnabled =
@@ -167,7 +175,7 @@ public class FileTablespace extends Tablespace {
@Override
public URI getTableUri(String databaseName, String tableName) {
- return StorageUtil.concatPath(basePath, databaseName, tableName).toUri();
+ return StorageUtil.concatPath(spacePath, databaseName, tableName).toUri();
}
private String partitionPath = "";
@@ -192,12 +200,12 @@ public class FileTablespace extends Tablespace {
}
public FileFragment[] split(String tableName) throws IOException {
- Path tablePath = new Path(basePath, tableName);
+ Path tablePath = new Path(spacePath, tableName);
return split(tableName, tablePath, fs.getDefaultBlockSize());
}
public FileFragment[] split(String tableName, long fragmentSize) throws IOException {
- Path tablePath = new Path(basePath, tableName);
+ Path tablePath = new Path(spacePath, tableName);
return split(tableName, tablePath, fragmentSize);
}
@@ -491,30 +499,6 @@ public class FileTablespace extends Tablespace {
}
/**
- * Generate the map of host and make them into Volume Ids.
- *
- */
- private Map<String, Set<Integer>> getVolumeMap(List<FileFragment> frags) {
- Map<String, Set<Integer>> volumeMap = new HashMap<String, Set<Integer>>();
- for (FileFragment frag : frags) {
- String[] hosts = frag.getHosts();
- int[] diskIds = frag.getDiskIds();
- for (int i = 0; i < hosts.length; i++) {
- Set<Integer> volumeList = volumeMap.get(hosts[i]);
- if (volumeList == null) {
- volumeList = new HashSet<Integer>();
- volumeMap.put(hosts[i], volumeList);
- }
-
- if (diskIds.length > 0 && diskIds[i] > -1) {
- volumeList.add(diskIds[i]);
- }
- }
- }
-
- return volumeMap;
- }
- /**
* Generate the list of files and make them into FileSplits.
*
* @throws IOException
@@ -674,7 +658,7 @@ public class FileTablespace extends Tablespace {
String simpleTableName = splitted[1];
// create a table directory (i.e., ${WAREHOUSE_DIR}/${DATABASE_NAME}/${TABLE_NAME} )
- Path tablePath = StorageUtil.concatPath(basePath, databaseName, simpleTableName);
+ Path tablePath = StorageUtil.concatPath(spacePath, databaseName, simpleTableName);
tableDesc.setUri(tablePath.toUri());
} else {
Preconditions.checkState(tableDesc.getUri() != null, "ERROR: LOCATION must be given.");
@@ -851,22 +835,14 @@ public class FileTablespace extends Tablespace {
}
}
- private static final StorageProperty FileStorageProperties = new StorageProperty(true, true, true, true);
- private static final FormatProperty GeneralFileProperties = new FormatProperty(false);
- private static final FormatProperty HFileProperties = new FormatProperty(true);
-
@Override
public StorageProperty getProperty() {
return FileStorageProperties;
}
@Override
- public FormatProperty getFormatProperty(String format) {
- if (format.equalsIgnoreCase("hbase")) {
- return HFileProperties;
- } else {
- return GeneralFileProperties;
- }
+ public FormatProperty getFormatProperty(TableMeta meta) {
+ return GeneralFileProperties;
}
@Override
@@ -882,6 +858,84 @@ public class FileTablespace extends Tablespace {
}
@Override
+ public URI getStagingUri(OverridableConf context, String queryId, TableMeta meta) throws IOException {
+ String outputPath = context.get(QueryVars.OUTPUT_TABLE_URI, "");
+
+ Path stagingDir;
+ // The fact that there is no output means that this query is neither CTAS or INSERT (OVERWRITE) INTO
+ // So, this query results won't be materialized as a part of a table.
+ // The result will be temporarily written in the staging directory.
+ if (outputPath.isEmpty()) {
+ // for temporarily written in the storage directory
+ stagingDir = fs.makeQualified(new Path(stagingRootPath, queryId));
+ } else {
+ Optional<Tablespace> spaceResult = TablespaceManager.get(outputPath);
+ if (!spaceResult.isPresent()) {
+ throw new IOException("No registered Tablespace for " + outputPath);
+ }
+
+ Tablespace space = spaceResult.get();
+ if (space.getProperty().isMovable()) { // checking if this tablespace allows MOVE operation
+ // If this space allows move operation, the staging directory will be underneath the final output table uri.
+ stagingDir = fs.makeQualified(StorageUtil.concatPath(outputPath, TMP_STAGING_DIR_PREFIX, queryId));
+ } else {
+ stagingDir = fs.makeQualified(new Path(stagingRootPath, queryId));
+ }
+ }
+
+ return stagingDir.toUri();
+ }
+
+ // query submission directory is private!
+ final public static FsPermission STAGING_DIR_PERMISSION = FsPermission.createImmutable((short) 0700); // rwx--------
+ public static final String TMP_STAGING_DIR_PREFIX = ".staging";
+
+ public URI prepareStagingSpace(TajoConf conf, String queryId, OverridableConf context, TableMeta meta)
+ throws IOException {
+
+ String realUser;
+ String currentUser;
+ UserGroupInformation ugi;
+ ugi = UserGroupInformation.getLoginUser();
+ realUser = ugi.getShortUserName();
+ currentUser = UserGroupInformation.getCurrentUser().getShortUserName();
+
+
+ Path stagingDir = new Path(getStagingUri(context, queryId, meta));
+
+ ////////////////////////////////////////////
+ // Create Output Directory
+ ////////////////////////////////////////////
+
+ if (fs.exists(stagingDir)) {
+ throw new IOException("The staging directory '" + stagingDir + "' already exists");
+ }
+ fs.mkdirs(stagingDir, new FsPermission(STAGING_DIR_PERMISSION));
+ FileStatus fsStatus = fs.getFileStatus(stagingDir);
+ String owner = fsStatus.getOwner();
+
+ if (!owner.isEmpty() && !(owner.equals(currentUser) || owner.equals(realUser))) {
+ throw new IOException("The ownership on the user's query " +
+ "directory " + stagingDir + " is not as expected. " +
+ "It is owned by " + owner + ". The directory must " +
+ "be owned by the submitter " + currentUser + " or " +
+ "by " + realUser);
+ }
+
+ if (!fsStatus.getPermission().equals(STAGING_DIR_PERMISSION)) {
+ LOG.info("Permissions on staging directory " + stagingDir + " are " +
+ "incorrect: " + fsStatus.getPermission() + ". Fixing permissions " +
+ "to correct value " + STAGING_DIR_PERMISSION);
+ fs.setPermission(stagingDir, new FsPermission(STAGING_DIR_PERMISSION));
+ }
+
+ Path stagingResultDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME);
+ fs.mkdirs(stagingResultDir);
+
+ return stagingDir.toUri();
+ }
+
+ @Override
public void verifySchemaToWrite(TableDesc tableDesc, Schema outSchema) throws IOException {
}
@@ -1257,4 +1311,6 @@ public class FileTablespace extends Tablespace {
return retValue;
}
+
+
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java
index bd5502d..1d32291 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java
@@ -85,7 +85,7 @@ public class HashShuffleAppenderManager {
fs.mkdirs(dataFile.getParent());
}
- FileTablespace space = (FileTablespace) TableSpaceManager.get(dataFile.toUri()).get();
+ FileTablespace space = (FileTablespace) TablespaceManager.get(dataFile.toUri()).get();
FileAppender appender = (FileAppender) space.getAppender(meta, outSchema, dataFile);
appender.enableStats();
appender.init();
http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
index ab63d55..f50a20d 100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
@@ -122,7 +122,7 @@ public class TestCompressionStorages {
String fileName = "Compression_" + codec.getSimpleName();
Path tablePath = new Path(testDir, fileName);
- Appender appender = ((FileTablespace) TableSpaceManager.getLocalFs()).getAppender(meta, schema, tablePath);
+ Appender appender = ((FileTablespace) TablespaceManager.getLocalFs()).getAppender(meta, schema, tablePath);
appender.enableStats();
appender.init();
@@ -154,7 +154,7 @@ public class TestCompressionStorages {
FileFragment[] tablets = new FileFragment[1];
tablets[0] = new FileFragment(fileName, tablePath, 0, fileLen);
- Scanner scanner = TableSpaceManager.getLocalFs().getScanner(meta, schema, tablets[0], schema);
+ Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, schema, tablets[0], schema);
if (storeType.equalsIgnoreCase("CSV")) {
if (SplittableCompressionCodec.class.isAssignableFrom(codec)) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestDelimitedTextFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestDelimitedTextFile.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestDelimitedTextFile.java
index 2d919cd..ca5885c 100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestDelimitedTextFile.java
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestDelimitedTextFile.java
@@ -103,7 +103,7 @@ public class TestDelimitedTextFile {
TableMeta meta = CatalogUtil.newTableMeta("JSON");
meta.putOption(StorageUtil.TEXT_ERROR_TOLERANCE_MAXNUM, "-1");
FileFragment fragment = getFileFragment("testErrorTolerance1.json");
- Scanner scanner = TableSpaceManager.getLocalFs().getScanner(meta, schema, fragment);
+ Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, schema, fragment);
scanner.init();
Tuple tuple;
@@ -125,7 +125,7 @@ public class TestDelimitedTextFile {
TableMeta meta = CatalogUtil.newTableMeta("JSON");
meta.putOption(StorageUtil.TEXT_ERROR_TOLERANCE_MAXNUM, "1");
FileFragment fragment = getFileFragment("testErrorTolerance1.json");
- Scanner scanner = TableSpaceManager.getLocalFs().getScanner(meta, schema, fragment);
+ Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, schema, fragment);
scanner.init();
assertNotNull(scanner.next());
@@ -147,7 +147,7 @@ public class TestDelimitedTextFile {
TableMeta meta = CatalogUtil.newTableMeta("JSON");
meta.putOption(StorageUtil.TEXT_ERROR_TOLERANCE_MAXNUM, "0");
FileFragment fragment = getFileFragment("testErrorTolerance2.json");
- Scanner scanner = TableSpaceManager.getLocalFs().getScanner(meta, schema, fragment);
+ Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, schema, fragment);
scanner.init();
try {
@@ -166,7 +166,7 @@ public class TestDelimitedTextFile {
TableMeta meta = CatalogUtil.newTableMeta("JSON");
meta.putOption(StorageUtil.TEXT_ERROR_TOLERANCE_MAXNUM, "1");
FileFragment fragment = getFileFragment("testErrorTolerance3.json");
- Scanner scanner = TableSpaceManager.getLocalFs().getScanner(meta, schema, fragment);
+ Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, schema, fragment);
scanner.init();
try {
http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileSystems.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileSystems.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileSystems.java
index 9237e07..1119968 100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileSystems.java
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileSystems.java
@@ -57,7 +57,7 @@ public class TestFileSystems {
public TestFileSystems(FileSystem fs) throws IOException {
this.fs = fs;
this.conf = new TajoConf(fs.getConf());
- sm = TableSpaceManager.getLocalFs();
+ sm = TablespaceManager.getLocalFs();
testDir = getTestDir(this.fs, TEST_PATH);
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileTablespace.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileTablespace.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileTablespace.java
index ec3e143..09b91ea 100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileTablespace.java
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileTablespace.java
@@ -81,7 +81,7 @@ public class TestFileTablespace {
Path path = StorageUtil.concatPath(testDir, "testGetScannerAndAppender", "table.csv");
localFs.mkdirs(path.getParent());
- FileTablespace fileStorageManager = (FileTablespace) TableSpaceManager.getLocalFs();
+ FileTablespace fileStorageManager = (FileTablespace) TablespaceManager.getLocalFs();
assertEquals(localFs.getUri(), fileStorageManager.getFileSystem().getUri());
Appender appender = fileStorageManager.getAppender(meta, schema, path);
@@ -224,24 +224,24 @@ public class TestFileTablespace {
Optional<Tablespace> existingTs = Optional.absent();
try {
/* Local FileSystem */
- FileTablespace space = TableSpaceManager.getLocalFs();
+ FileTablespace space = TablespaceManager.getLocalFs();
assertEquals(localFs.getUri(), space.getFileSystem().getUri());
FileTablespace distTablespace = new FileTablespace("testGetFileTablespace", uri);
distTablespace.init(conf);
- existingTs = TableSpaceManager.addTableSpaceForTest(distTablespace);
+ existingTs = TablespaceManager.addTableSpaceForTest(distTablespace);
/* Distributed FileSystem */
- space = (FileTablespace) TableSpaceManager.get(uri).get();
+ space = (FileTablespace) TablespaceManager.get(uri).get();
assertEquals(cluster.getFileSystem().getUri(), space.getFileSystem().getUri());
- space = (FileTablespace) TableSpaceManager.getByName("testGetFileTablespace").get();
+ space = (FileTablespace) TablespaceManager.getByName("testGetFileTablespace").get();
assertEquals(cluster.getFileSystem().getUri(), space.getFileSystem().getUri());
} finally {
if (existingTs.isPresent()) {
- TableSpaceManager.addTableSpaceForTest(existingTs.get());
+ TablespaceManager.addTableSpaceForTest(existingTs.get());
}
cluster.shutdown(true);
http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestLineReader.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestLineReader.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestLineReader.java
index c13ce16..7410778 100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestLineReader.java
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestLineReader.java
@@ -65,7 +65,7 @@ public class TestLineReader {
TableMeta meta = CatalogUtil.newTableMeta("TEXT");
Path tablePath = new Path(testDir, "line.data");
- FileAppender appender = (FileAppender) TableSpaceManager.getLocalFs().getAppender(
+ FileAppender appender = (FileAppender) TablespaceManager.getLocalFs().getAppender(
null, null, meta, schema, tablePath);
appender.enableStats();
appender.init();
@@ -118,7 +118,7 @@ public class TestLineReader {
meta.putOption("compression.codec", DeflateCodec.class.getCanonicalName());
Path tablePath = new Path(testDir, "testLineDelimitedReaderWithCompression." + DeflateCodec.class.getSimpleName());
- FileAppender appender = (FileAppender) (TableSpaceManager.getLocalFs()).getAppender(
+ FileAppender appender = (FileAppender) (TablespaceManager.getLocalFs()).getAppender(
null, null, meta, schema, tablePath);
appender.enableStats();
appender.init();
@@ -176,7 +176,7 @@ public class TestLineReader {
TableMeta meta = CatalogUtil.newTableMeta("TEXT");
Path tablePath = new Path(testDir, "testLineDelimitedReader");
- FileAppender appender = (FileAppender) TableSpaceManager.getLocalFs().getAppender(
+ FileAppender appender = (FileAppender) TablespaceManager.getLocalFs().getAppender(
null, null, meta, schema, tablePath);
appender.enableStats();
appender.init();
@@ -279,7 +279,7 @@ public class TestLineReader {
TableMeta meta = CatalogUtil.newTableMeta("TEXT");
Path tablePath = new Path(testDir, "testSeekableByteBufLineReader.data");
- FileAppender appender = (FileAppender) TableSpaceManager.getLocalFs().getAppender(
+ FileAppender appender = (FileAppender) TablespaceManager.getLocalFs().getAppender(
null, null, meta, schema, tablePath);
appender.enableStats();
appender.init();
http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestMergeScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestMergeScanner.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestMergeScanner.java
index 79928ff..331d3e8 100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestMergeScanner.java
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestMergeScanner.java
@@ -94,7 +94,7 @@ public class TestMergeScanner {
conf.setStrings("tajo.storage.projectable-scanner", "rcfile", "parquet", "avro");
testDir = CommonTestingUtil.getTestDir(TEST_PATH);
fs = testDir.getFileSystem(conf);
- sm = TableSpaceManager.getLocalFs();
+ sm = TablespaceManager.getLocalFs();
}
@Test
@@ -114,7 +114,7 @@ public class TestMergeScanner {
}
Path table1Path = new Path(testDir, storeType + "_1.data");
- Appender appender1 = TableSpaceManager.getLocalFs().getAppender(null, null, meta, schema, table1Path);
+ Appender appender1 = TablespaceManager.getLocalFs().getAppender(null, null, meta, schema, table1Path);
appender1.enableStats();
appender1.init();
int tupleNum = 10000;
@@ -136,7 +136,7 @@ public class TestMergeScanner {
}
Path table2Path = new Path(testDir, storeType + "_2.data");
- Appender appender2 = TableSpaceManager.getLocalFs().getAppender(null, null, meta, schema, table2Path);
+ Appender appender2 = TablespaceManager.getLocalFs().getAppender(null, null, meta, schema, table2Path);
appender2.enableStats();
appender2.init();
http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
index ce2a926..dbfdac3 100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
@@ -155,7 +155,7 @@ public class TestStorages {
TableMeta meta = CatalogUtil.newTableMeta(storeType);
Path tablePath = new Path(testDir, "Splitable.data");
- FileTablespace sm = TableSpaceManager.getLocalFs();
+ FileTablespace sm = TablespaceManager.getLocalFs();
Appender appender = sm.getAppender(meta, schema, tablePath);
appender.enableStats();
appender.init();
@@ -210,7 +210,7 @@ public class TestStorages {
TableMeta meta = CatalogUtil.newTableMeta(storeType);
Path tablePath = new Path(testDir, "Splitable.data");
- FileTablespace sm = TableSpaceManager.getLocalFs();
+ FileTablespace sm = TablespaceManager.getLocalFs();
Appender appender = sm.getAppender(meta, schema, tablePath);
appender.enableStats();
appender.init();
@@ -271,7 +271,7 @@ public class TestStorages {
}
Path tablePath = new Path(testDir, "testProjection.data");
- FileTablespace sm = TableSpaceManager.getLocalFs();
+ FileTablespace sm = TablespaceManager.getLocalFs();
Appender appender = sm.getAppender(meta, schema, tablePath);
appender.init();
int tupleNum = 10000;
@@ -347,7 +347,7 @@ public class TestStorages {
meta.putOption(StorageConstants.AVRO_SCHEMA_URL, path);
}
- FileTablespace sm = TableSpaceManager.getLocalFs();
+ FileTablespace sm = TablespaceManager.getLocalFs();
Path tablePath = new Path(testDir, "testVariousTypes.data");
Appender appender = sm.getAppender(meta, schema, tablePath);
appender.init();
@@ -425,7 +425,7 @@ public class TestStorages {
}
Path tablePath = new Path(testDir, "testVariousTypes.data");
- FileTablespace sm = TableSpaceManager.getLocalFs();
+ FileTablespace sm = TablespaceManager.getLocalFs();
Appender appender = sm.getAppender(meta, schema, tablePath);
appender.init();
@@ -469,7 +469,7 @@ public class TestStorages {
FileStatus status = fs.getFileStatus(tablePath);
FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen());
- Scanner scanner = TableSpaceManager.getLocalFs().getScanner(meta, schema, fragment);
+ Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, schema, fragment);
scanner.init();
Tuple retrieved;
@@ -513,7 +513,7 @@ public class TestStorages {
meta.putOption(StorageConstants.CSVFILE_SERDE, TextSerializerDeserializer.class.getName());
Path tablePath = new Path(testDir, "testVariousTypes.data");
- FileTablespace sm = TableSpaceManager.getLocalFs();
+ FileTablespace sm = TablespaceManager.getLocalFs();
Appender appender = sm.getAppender(meta, schema, tablePath);
appender.enableStats();
appender.init();
@@ -544,7 +544,7 @@ public class TestStorages {
assertEquals(appender.getStats().getNumBytes().longValue(), status.getLen());
FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen());
- Scanner scanner = TableSpaceManager.getLocalFs().getScanner(meta, schema, fragment);
+ Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, schema, fragment);
scanner.init();
Tuple retrieved;
@@ -582,7 +582,7 @@ public class TestStorages {
meta.putOption(StorageConstants.RCFILE_SERDE, BinarySerializerDeserializer.class.getName());
Path tablePath = new Path(testDir, "testVariousTypes.data");
- FileTablespace sm = TableSpaceManager.getLocalFs();
+ FileTablespace sm = TablespaceManager.getLocalFs();
Appender appender = sm.getAppender(meta, schema, tablePath);
appender.enableStats();
appender.init();
@@ -613,7 +613,7 @@ public class TestStorages {
assertEquals(appender.getStats().getNumBytes().longValue(), status.getLen());
FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen());
- Scanner scanner = TableSpaceManager.getLocalFs().getScanner(meta, schema, fragment);
+ Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, schema, fragment);
scanner.init();
Tuple retrieved;
@@ -651,7 +651,7 @@ public class TestStorages {
meta.putOption(StorageConstants.SEQUENCEFILE_SERDE, TextSerializerDeserializer.class.getName());
Path tablePath = new Path(testDir, "testVariousTypes.data");
- FileTablespace sm = TableSpaceManager.getLocalFs();
+ FileTablespace sm = TablespaceManager.getLocalFs();
Appender appender = sm.getAppender(meta, schema, tablePath);
appender.enableStats();
appender.init();
@@ -682,7 +682,7 @@ public class TestStorages {
assertEquals(appender.getStats().getNumBytes().longValue(), status.getLen());
FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen());
- Scanner scanner = TableSpaceManager.getLocalFs().getScanner(meta, schema, fragment);
+ Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, schema, fragment);
scanner.init();
assertTrue(scanner instanceof SequenceFileScanner);
@@ -724,7 +724,7 @@ public class TestStorages {
meta.putOption(StorageConstants.SEQUENCEFILE_SERDE, BinarySerializerDeserializer.class.getName());
Path tablePath = new Path(testDir, "testVariousTypes.data");
- FileTablespace sm = TableSpaceManager.getLocalFs();
+ FileTablespace sm = TablespaceManager.getLocalFs();
Appender appender = sm.getAppender(meta, schema, tablePath);
appender.enableStats();
appender.init();
@@ -756,7 +756,7 @@ public class TestStorages {
assertEquals(appender.getStats().getNumBytes().longValue(), status.getLen());
FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen());
- Scanner scanner = TableSpaceManager.getLocalFs().getScanner(meta, schema, fragment);
+ Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, schema, fragment);
scanner.init();
assertTrue(scanner instanceof SequenceFileScanner);
@@ -786,7 +786,7 @@ public class TestStorages {
TableMeta meta = CatalogUtil.newTableMeta(storeType, options);
Path tablePath = new Path(testDir, "testTime.data");
- FileTablespace sm = TableSpaceManager.getLocalFs();
+ FileTablespace sm = TablespaceManager.getLocalFs();
Appender appender = sm.getAppender(meta, schema, tablePath);
appender.init();
@@ -801,7 +801,7 @@ public class TestStorages {
FileStatus status = fs.getFileStatus(tablePath);
FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen());
- Scanner scanner = TableSpaceManager.getLocalFs().getScanner(meta, schema, fragment);
+ Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, schema, fragment);
scanner.init();
Tuple retrieved;
@@ -827,7 +827,7 @@ public class TestStorages {
TableMeta meta = CatalogUtil.newTableMeta(storeType);
Path tablePath = new Path(testDir, "Seekable.data");
- FileTablespace sm = TableSpaceManager.getLocalFs();
+ FileTablespace sm = TablespaceManager.getLocalFs();
FileAppender appender = (FileAppender) sm.getAppender(meta, schema, tablePath);
appender.enableStats();
appender.init();
@@ -869,7 +869,7 @@ public class TestStorages {
long readBytes = 0;
long readRows = 0;
for (long offset : offsets) {
- scanner = TableSpaceManager.getLocalFs().getScanner(meta, schema,
+ scanner = TablespaceManager.getLocalFs().getScanner(meta, schema,
new FileFragment("table", tablePath, prevOffset, offset - prevOffset), schema);
scanner.init();
@@ -917,7 +917,7 @@ public class TestStorages {
conf.setInt(RawFile.WRITE_BUFFER_SIZE, record + headerSize);
}
- FileTablespace sm = TableSpaceManager.getLocalFs();
+ FileTablespace sm = TablespaceManager.getLocalFs();
Path tablePath = new Path(testDir, "testMaxValue.data");
Appender appender = sm.getAppender(meta, schema, tablePath);
@@ -972,7 +972,7 @@ public class TestStorages {
meta.setOptions(CatalogUtil.newPhysicalProperties(storeType));
Path tablePath = new Path(testDir, "testLessThanSchemaSize.data");
- FileTablespace sm = TableSpaceManager.getLocalFs();
+ FileTablespace sm = TablespaceManager.getLocalFs();
Appender appender = sm.getAppender(meta, dataSchema, tablePath);
appender.init();
@@ -998,7 +998,7 @@ public class TestStorages {
inSchema.addColumn("col5", Type.INT8);
FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen());
- Scanner scanner = TableSpaceManager.getLocalFs().getScanner(meta, inSchema, fragment);
+ Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, inSchema, fragment);
Schema target = new Schema();
@@ -1036,7 +1036,7 @@ public class TestStorages {
meta.setOptions(CatalogUtil.newPhysicalProperties(storeType));
Path tablePath = new Path(testDir, "test_storetype_oversize.data");
- FileTablespace sm = TableSpaceManager.getLocalFs();
+ FileTablespace sm = TablespaceManager.getLocalFs();
Appender appender = sm.getAppender(meta, dataSchema, tablePath);
appender.init();
http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java
index 1a62f52..22fb607 100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java
@@ -89,7 +89,7 @@ public class TestBSTIndex {
meta = CatalogUtil.newTableMeta(storeType);
Path tablePath = new Path(testDir, "testFindValue_" + storeType);
- Appender appender = ((FileTablespace)TableSpaceManager.getLocalFs())
+ Appender appender = ((FileTablespace) TablespaceManager.getLocalFs())
.getAppender(meta, schema, tablePath);
appender.init();
Tuple tuple;
@@ -178,7 +178,7 @@ public class TestBSTIndex {
meta = CatalogUtil.newTableMeta(storeType);
Path tablePath = new Path(testDir, "testBuildIndexWithAppender_" + storeType);
- FileAppender appender = (FileAppender) ((FileTablespace)TableSpaceManager.getLocalFs())
+ FileAppender appender = (FileAppender) ((FileTablespace) TablespaceManager.getLocalFs())
.getAppender(meta, schema, tablePath);
appender.init();
@@ -257,7 +257,7 @@ public class TestBSTIndex {
meta = CatalogUtil.newTableMeta(storeType);
Path tablePath = StorageUtil.concatPath(testDir, "testFindOmittedValue_" + storeType);
- Appender appender = ((FileTablespace)TableSpaceManager.getLocalFs()).getAppender(meta, schema, tablePath);
+ Appender appender = ((FileTablespace) TablespaceManager.getLocalFs()).getAppender(meta, schema, tablePath);
appender.init();
Tuple tuple;
for (int i = 0; i < TUPLE_NUM; i += 2) {
@@ -327,7 +327,7 @@ public class TestBSTIndex {
meta = CatalogUtil.newTableMeta(storeType);
Path tablePath = new Path(testDir, "testFindNextKeyValue_" + storeType);
- Appender appender = ((FileTablespace)TableSpaceManager.getLocalFs())
+ Appender appender = ((FileTablespace) TablespaceManager.getLocalFs())
.getAppender(meta, schema, tablePath);
appender.init();
Tuple tuple;
@@ -418,7 +418,7 @@ public class TestBSTIndex {
meta = CatalogUtil.newTableMeta(storeType);
Path tablePath = new Path(testDir, "testFindNextKeyOmittedValue_" + storeType);
- Appender appender = (((FileTablespace)TableSpaceManager.getLocalFs()))
+ Appender appender = (((FileTablespace) TablespaceManager.getLocalFs()))
.getAppender(meta, schema, tablePath);
appender.init();
Tuple tuple;
@@ -498,7 +498,7 @@ public class TestBSTIndex {
meta = CatalogUtil.newTableMeta(storeType);
Path tablePath = new Path(testDir, "testFindMinValue" + storeType);
- Appender appender = ((FileTablespace)TableSpaceManager.getLocalFs())
+ Appender appender = ((FileTablespace) TablespaceManager.getLocalFs())
.getAppender(meta, schema, tablePath);
appender.init();
@@ -582,7 +582,7 @@ public class TestBSTIndex {
meta = CatalogUtil.newTableMeta(storeType);
Path tablePath = new Path(testDir, "testMinMax_" + storeType);
- Appender appender = ((FileTablespace)TableSpaceManager.getLocalFs())
+ Appender appender = ((FileTablespace) TablespaceManager.getLocalFs())
.getAppender(meta, schema, tablePath);
appender.init();
Tuple tuple;
@@ -687,7 +687,7 @@ public class TestBSTIndex {
meta = CatalogUtil.newTableMeta(storeType);
Path tablePath = new Path(testDir, "testConcurrentAccess_" + storeType);
- Appender appender = ((FileTablespace)TableSpaceManager.getLocalFs())
+ Appender appender = ((FileTablespace) TablespaceManager.getLocalFs())
.getAppender(meta, schema, tablePath);
appender.init();
@@ -768,7 +768,7 @@ public class TestBSTIndex {
meta = CatalogUtil.newTableMeta(storeType);
Path tablePath = new Path(testDir, "testFindValueDescOrder_" + storeType);
- Appender appender = ((FileTablespace)TableSpaceManager.getLocalFs())
+ Appender appender = ((FileTablespace) TablespaceManager.getLocalFs())
.getAppender(meta, schema, tablePath);
appender.init();
@@ -860,7 +860,7 @@ public class TestBSTIndex {
meta = CatalogUtil.newTableMeta(storeType);
Path tablePath = new Path(testDir, "testFindNextKeyValueDescOrder_" + storeType);
- Appender appender = ((FileTablespace)TableSpaceManager.getLocalFs()).getAppender(meta, schema, tablePath);
+ Appender appender = ((FileTablespace) TablespaceManager.getLocalFs()).getAppender(meta, schema, tablePath);
appender.init();
Tuple tuple;
http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java
index 5ad7a27..72810fd 100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java
@@ -77,7 +77,7 @@ public class TestSingleCSVFileBSTIndex {
Path tablePath = StorageUtil.concatPath(testDir, "testFindValueInSingleCSV", "table.csv");
fs.mkdirs(tablePath.getParent());
- Appender appender = ((FileTablespace)TableSpaceManager.getLocalFs()).getAppender(meta, schema, tablePath);
+ Appender appender = ((FileTablespace) TablespaceManager.getLocalFs()).getAppender(meta, schema, tablePath);
appender.init();
Tuple tuple;
for (int i = 0; i < TUPLE_NUM; i++) {
@@ -166,7 +166,7 @@ public class TestSingleCSVFileBSTIndex {
Path tablePath = StorageUtil.concatPath(testDir, "testFindNextKeyValueInSingleCSV",
"table1.csv");
fs.mkdirs(tablePath.getParent());
- Appender appender = ((FileTablespace) TableSpaceManager.getLocalFs()).getAppender(meta, schema, tablePath);
+ Appender appender = ((FileTablespace) TablespaceManager.getLocalFs()).getAppender(meta, schema, tablePath);
appender.init();
Tuple tuple;
for(int i = 0 ; i < TUPLE_NUM; i ++ ) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/json/TestJsonSerDe.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/json/TestJsonSerDe.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/json/TestJsonSerDe.java
index 2fbf5d6..8095081 100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/json/TestJsonSerDe.java
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/json/TestJsonSerDe.java
@@ -69,7 +69,7 @@ public class TestJsonSerDe {
FileSystem fs = FileSystem.getLocal(conf);
FileStatus status = fs.getFileStatus(tablePath);
FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen());
- Scanner scanner = TableSpaceManager.getLocalFs().getScanner(meta, schema, fragment);
+ Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, schema, fragment);
scanner.init();
Tuple tuple = scanner.next();