You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2015/06/24 00:49:16 UTC
[3/7] tajo git commit: TAJO-1616: Implement TablespaceManager to load
Tablespaces.
http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-storage/tajo-storage-common/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/pom.xml b/tajo-storage/tajo-storage-common/pom.xml
index f7c9676..f1d3438 100644
--- a/tajo-storage/tajo-storage-common/pom.xml
+++ b/tajo-storage/tajo-storage-common/pom.xml
@@ -58,6 +58,12 @@ limitations under the License.
<plugin>
<groupId>org.apache.rat</groupId>
<artifactId>apache-rat-plugin</artifactId>
+ <configuration>
+ <excludes>
+ <exclude>src/main/resources/*.json</exclude>
+ <exclude>src/test/resources/*.json</exclude>
+ </excludes>
+ </configuration>
<executions>
<execution>
<phase>verify</phase>
@@ -293,6 +299,10 @@ limitations under the License.
<groupId>io.netty</groupId>
<artifactId>netty-buffer</artifactId>
</dependency>
+ <dependency>
+ <groupId>net.minidev</groupId>
+ <artifactId>json-smart</artifactId>
+ </dependency>
</dependencies>
<profiles>
@@ -334,4 +344,4 @@ limitations under the License.
</plugin>
</plugins>
</reporting>
-</project>
\ No newline at end of file
+</project>
http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/FormatProperty.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/FormatProperty.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/FormatProperty.java
new file mode 100644
index 0000000..0f0cd10
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/FormatProperty.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+public class FormatProperty {
+ private boolean sortedInsertRequired;
+
+ public FormatProperty(boolean sortedInsertRequired) {
+ this.sortedInsertRequired = sortedInsertRequired;
+ }
+
+ public boolean sortedInsertRequired() {
+ return sortedInsertRequired;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/MergeScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/MergeScanner.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/MergeScanner.java
index a8926a0..ce573be 100644
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/MergeScanner.java
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/MergeScanner.java
@@ -64,7 +64,7 @@ public class MergeScanner implements Scanner {
long numBytes = 0;
for (Fragment eachFileFragment: rawFragmentList) {
- long fragmentLength = Tablespace.getFragmentLength((TajoConf) conf, eachFileFragment);
+ long fragmentLength = TableSpaceManager.guessFragmentVolume((TajoConf) conf, eachFileFragment);
if (fragmentLength > 0) {
numBytes += fragmentLength;
fragments.add(eachFileFragment);
@@ -131,8 +131,7 @@ public class MergeScanner implements Scanner {
private Scanner getNextScanner() throws IOException {
if (iterator.hasNext()) {
currentFragment = iterator.next();
- currentScanner = TableSpaceManager.getStorageManager((TajoConf) conf, meta.getStoreType()).getScanner(meta, schema,
- currentFragment, target);
+ currentScanner = TableSpaceManager.getLocalFs().getScanner(meta, schema, currentFragment, target);
currentScanner.init();
return currentScanner;
} else {
http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/OldStorageManager.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/OldStorageManager.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/OldStorageManager.java
new file mode 100644
index 0000000..12b236f
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/OldStorageManager.java
@@ -0,0 +1,251 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.TaskAttemptId;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.storage.fragment.Fragment;
+import org.apache.tajo.util.FileUtil;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.net.URI;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * It handles available table spaces and cache TableSpace instances.
+ */
+public class OldStorageManager {
+ private static final Log LOG = LogFactory.getLog(OldStorageManager.class);
+
+ /**
+ * Cache of scanner handlers for each storage type.
+ */
+ protected static final Map<String, Class<? extends Scanner>> SCANNER_HANDLER_CACHE
+ = new ConcurrentHashMap<String, Class<? extends Scanner>>();
+ /**
+ * Cache of appender handlers for each storage type.
+ */
+ protected static final Map<String, Class<? extends Appender>> APPENDER_HANDLER_CACHE
+ = new ConcurrentHashMap<String, Class<? extends Appender>>();
+ private static final Class<?>[] DEFAULT_SCANNER_PARAMS = {
+ Configuration.class,
+ Schema.class,
+ TableMeta.class,
+ Fragment.class
+ };
+ private static final Class<?>[] DEFAULT_APPENDER_PARAMS = {
+ Configuration.class,
+ TaskAttemptId.class,
+ Schema.class,
+ TableMeta.class,
+ Path.class
+ };
+ /**
+ * Cache of Tablespace.
+ * Key is manager key(warehouse path) + store type
+ */
+ private static final Map<String, Tablespace> storageManagers = Maps.newHashMap();
+ /**
+ * Cache of constructors for each class. Pins the classes so they
+ * can't be garbage collected until ReflectionUtils can be collected.
+ */
+ protected static Map<Class<?>, Constructor<?>> CONSTRUCTOR_CACHE = Maps.newConcurrentMap();
+
+ /**
+ * Clear all class cache
+ */
+ @VisibleForTesting
+ protected synchronized static void clearCache() {
+ CONSTRUCTOR_CACHE.clear();
+ SCANNER_HANDLER_CACHE.clear();
+ APPENDER_HANDLER_CACHE.clear();
+ storageManagers.clear();
+ }
+
+ /**
+ * Close Tablespace
+ * @throws java.io.IOException
+ */
+ public static void shutdown() throws IOException {
+ synchronized(storageManagers) {
+ for (Tablespace eachTablespace : storageManagers.values()) {
+ eachTablespace.close();
+ }
+ }
+ clearCache();
+ }
+
+ /**
+ * Returns the proper Tablespace instance according to the storeType.
+ *
+ * @param tajoConf Tajo system property.
+ * @param storeType Storage type
+ * @return
+ * @throws IOException
+ */
+ public static Tablespace getStorageManager(TajoConf tajoConf, String storeType) throws IOException {
+ FileSystem fileSystem = TajoConf.getWarehouseDir(tajoConf).getFileSystem(tajoConf);
+ if (fileSystem != null) {
+ return getStorageManager(tajoConf, fileSystem.getUri(), storeType);
+ } else {
+ return getStorageManager(tajoConf, null, storeType);
+ }
+ }
+
+ /**
+ * Returns the proper Tablespace instance according to the storeType
+ *
+ * @param tajoConf Tajo system property.
+ * @param uri Key that can identify each storage manager(may be a path)
+ * @param storeType Storage type
+ * @return
+ * @throws IOException
+ */
+ public static synchronized Tablespace getStorageManager(
+ TajoConf tajoConf, URI uri, String storeType) throws IOException {
+ Preconditions.checkNotNull(tajoConf);
+ Preconditions.checkNotNull(uri);
+ Preconditions.checkNotNull(storeType);
+
+ String typeName;
+ if (storeType.equalsIgnoreCase("HBASE")) {
+ typeName = "hbase";
+ } else {
+ typeName = "hdfs";
+ }
+
+ synchronized (storageManagers) {
+ String storeKey = typeName + "_" + uri.toString();
+ Tablespace manager = storageManagers.get(storeKey);
+
+ if (manager == null) {
+ Class<? extends Tablespace> storageManagerClass =
+ tajoConf.getClass(String.format("tajo.storage.manager.%s.class", typeName), null, Tablespace.class);
+
+ if (storageManagerClass == null) {
+ throw new IOException("Unknown Storage Type: " + typeName);
+ }
+
+ try {
+ Constructor<? extends Tablespace> constructor =
+ (Constructor<? extends Tablespace>) CONSTRUCTOR_CACHE.get(storageManagerClass);
+ if (constructor == null) {
+ constructor = storageManagerClass.getDeclaredConstructor(TableSpaceManager.TABLESPACE_PARAM);
+ constructor.setAccessible(true);
+ CONSTRUCTOR_CACHE.put(storageManagerClass, constructor);
+ }
+ manager = constructor.newInstance(new Object[]{"noname", uri});
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ manager.init(tajoConf);
+ storageManagers.put(storeKey, manager);
+ }
+
+ return manager;
+ }
+ }
+
+ /**
+ * Returns Scanner instance.
+ *
+ * @param conf The system property
+ * @param meta The table meta
+ * @param schema The input schema
+ * @param fragment The fragment for scanning
+ * @param target The output schema
+ * @return Scanner instance
+ * @throws IOException
+ */
+ public static synchronized SeekableScanner getSeekableScanner(
+ TajoConf conf, TableMeta meta, Schema schema, Fragment fragment, Schema target) throws IOException {
+ return (SeekableScanner)getStorageManager(conf, meta.getStoreType()).getScanner(meta, schema, fragment, target);
+ }
+
+ /**
+ * Creates a scanner instance.
+ *
+ * @param theClass Concrete class of scanner
+ * @param conf System property
+ * @param schema Input schema
+ * @param meta Table meta data
+ * @param fragment The fragment for scanning
+ * @param <T>
+ * @return The scanner instance
+ */
+ public static <T> T newScannerInstance(Class<T> theClass, Configuration conf, Schema schema, TableMeta meta,
+ Fragment fragment) {
+ T result;
+ try {
+ Constructor<T> meth = (Constructor<T>) CONSTRUCTOR_CACHE.get(theClass);
+ if (meth == null) {
+ meth = theClass.getDeclaredConstructor(DEFAULT_SCANNER_PARAMS);
+ meth.setAccessible(true);
+ CONSTRUCTOR_CACHE.put(theClass, meth);
+ }
+ result = meth.newInstance(new Object[]{conf, schema, meta, fragment});
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ return result;
+ }
+
+ /**
+ * Creates a scanner instance.
+ *
+ * @param theClass Concrete class of scanner
+ * @param conf System property
+ * @param taskAttemptId Task id
+ * @param meta Table meta data
+ * @param schema Input schema
+ * @param workDir Working directory
+ * @param <T>
+ * @return The scanner instance
+ */
+ public static <T> T newAppenderInstance(Class<T> theClass, Configuration conf, TaskAttemptId taskAttemptId,
+ TableMeta meta, Schema schema, Path workDir) {
+ T result;
+ try {
+ Constructor<T> meth = (Constructor<T>) CONSTRUCTOR_CACHE.get(theClass);
+ if (meth == null) {
+ meth = theClass.getDeclaredConstructor(DEFAULT_APPENDER_PARAMS);
+ meth.setAccessible(true);
+ CONSTRUCTOR_CACHE.put(theClass, meth);
+ }
+ result = meth.newInstance(new Object[]{conf, taskAttemptId, schema, meta, workDir});
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageProperty.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageProperty.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageProperty.java
index 6816d08..38d0734 100644
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageProperty.java
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageProperty.java
@@ -19,22 +19,51 @@
package org.apache.tajo.storage;
public class StorageProperty {
- private boolean supportsInsertInto;
- private boolean sortedInsert;
+ private boolean movable;
+ private boolean writable;
+ private boolean insertable;
+ private boolean absolutePathAllowed;
- public boolean isSupportsInsertInto() {
- return supportsInsertInto;
+ public StorageProperty(boolean movable, boolean writable, boolean isInsertable, boolean absolutePathAllowed) {
+ this.movable = movable;
+ this.writable = writable;
+ this.insertable = isInsertable;
+ this.absolutePathAllowed = absolutePathAllowed;
}
- public void setSupportsInsertInto(boolean supportsInsertInto) {
- this.supportsInsertInto = supportsInsertInto;
+ /**
+ * Move-like operation is allowed
+ *
+ * @return true if move operation is available
+ */
+ public boolean isMovable() {
+ return movable;
}
- public boolean isSortedInsert() {
- return sortedInsert;
+ /**
+ * Is it Writable storage?
+ *
+ * @return true if this storage is writable.
+ */
+ public boolean isWritable() {
+ return writable;
}
- public void setSortedInsert(boolean sortedInsert) {
- this.sortedInsert = sortedInsert;
+ /**
+ * this storage supports insert operation?
+ *
+ * @return true if insert operation is allowed.
+ */
+ public boolean isInsertable() {
+ return insertable;
+ }
+
+ /**
+ * Does this storage allows the use of arbitrary absolute paths outside tablespace?
+ *
+ * @return
+ */
+ public boolean isArbitraryPathAllowed() {
+ return this.absolutePathAllowed;
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageUtil.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageUtil.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageUtil.java
index 926b5d3..20a5d5c 100644
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageUtil.java
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageUtil.java
@@ -68,7 +68,7 @@ public class StorageUtil extends StorageConstants {
return 0;
}
}
-
+
public static Path concatPath(String parent, String...childs) {
return concatPath(new Path(parent), childs);
}
@@ -82,7 +82,7 @@ public class StorageUtil extends StorageConstants {
sb.append("/");
}
- return new Path(parent, sb.toString());
+ return new Path(parent + "/" + sb.toString());
}
static final String fileNamePatternV08 = "part-[0-9]*-[0-9]*";
http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableSpaceManager.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableSpaceManager.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableSpaceManager.java
index a787cdb..ef04509 100644
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableSpaceManager.java
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableSpaceManager.java
@@ -19,236 +19,372 @@
package org.apache.tajo.storage;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
+import net.minidev.json.JSONObject;
+import net.minidev.json.parser.JSONParser;
+import net.minidev.json.parser.ParseException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
-import org.apache.tajo.TaskAttemptId;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.TajoConstants;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.storage.fragment.Fragment;
+import org.apache.tajo.util.FileUtil;
+import org.apache.tajo.util.Pair;
+import javax.annotation.Nullable;
import java.io.IOException;
import java.lang.reflect.Constructor;
+import java.net.URI;
import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.TreeMap;
+import java.util.UUID;
+
+import static org.apache.tajo.storage.StorageConstants.LOCAL_FS_URI;
/**
* It handles available table spaces and cache TableSpace instances.
+ *
+ * Default tablespace must be a filesystem-based one.
+ * HDFS and S3 can be a default tablespace if a Tajo cluster is in fully distributed mode.
+ * Local file system can be a default tablespace if a Tajo cluster runs on a single machine.
*/
-public class TableSpaceManager {
+public class TableSpaceManager implements StorageService {
+ private static final Log LOG = LogFactory.getLog(TableSpaceManager.class);
- /**
- * Cache of scanner handlers for each storage type.
- */
- protected static final Map<String, Class<? extends Scanner>> SCANNER_HANDLER_CACHE
- = new ConcurrentHashMap<String, Class<? extends Scanner>>();
- /**
- * Cache of appender handlers for each storage type.
- */
- protected static final Map<String, Class<? extends Appender>> APPENDER_HANDLER_CACHE
- = new ConcurrentHashMap<String, Class<? extends Appender>>();
- private static final Class<?>[] DEFAULT_SCANNER_PARAMS = {
- Configuration.class,
- Schema.class,
- TableMeta.class,
- Fragment.class
- };
- private static final Class<?>[] DEFAULT_APPENDER_PARAMS = {
- Configuration.class,
- TaskAttemptId.class,
- Schema.class,
- TableMeta.class,
- Path.class
- };
- /**
- * Cache of Tablespace.
- * Key is manager key(warehouse path) + store type
- */
- private static final Map<String, Tablespace> storageManagers = Maps.newHashMap();
- /**
- * Cache of constructors for each class. Pins the classes so they
- * can't be garbage collected until ReflectionUtils can be collected.
- */
- private static final Map<Class<?>, Constructor<?>> CONSTRUCTOR_CACHE =
- new ConcurrentHashMap<Class<?>, Constructor<?>>();
+ public static final String DEFAULT_CONFIG_FILE = "storage-default.json";
+ public static final String SITE_CONFIG_FILE = "storage-site.json";
+
+ /** default tablespace name */
+ public static final String DEFAULT_TABLESPACE_NAME = "default";
+
+ private final static TajoConf systemConf = new TajoConf();
+ private final static JSONParser parser = new JSONParser(JSONParser.MODE_JSON_SIMPLE | JSONParser.IGNORE_CONTROL_CHAR);
+
+ // The relation ship among name, URI, Tablespaces must be kept 1:1:1.
+ protected static final Map<String, URI> SPACES_URIS_MAP = Maps.newHashMap();
+ protected static final TreeMap<URI, Tablespace> TABLE_SPACES = Maps.newTreeMap();
+ protected static final Map<Class<?>, Constructor<?>> CONSTRUCTORS = Maps.newHashMap();
+ protected static final Map<String, Class<? extends Tablespace>> TABLE_SPACE_HANDLERS = Maps.newHashMap();
+
+ public static final Class [] TABLESPACE_PARAM = new Class [] {String.class, URI.class};
+
+ static {
+ instance = new TableSpaceManager();
+ }
/**
- * Clear all class cache
+ * Singleton instance
*/
- @VisibleForTesting
- protected synchronized static void clearCache() {
- CONSTRUCTOR_CACHE.clear();
- SCANNER_HANDLER_CACHE.clear();
- APPENDER_HANDLER_CACHE.clear();
- storageManagers.clear();
+ private static final TableSpaceManager instance;
+
+ private TableSpaceManager() {
+ initForDefaultConfig(); // loading storage-default.json
+ initSiteConfig(); // storage-site.json will override the configs of storage-default.json
+ addWarehouseAsSpace(); // adding a warehouse directory for a default tablespace
+ addLocalFsTablespace(); // adding a tablespace using local file system by default
}
- /**
- * Close Tablespace
- * @throws java.io.IOException
- */
- public static void shutdown() throws IOException {
- synchronized(storageManagers) {
- for (Tablespace eachTablespace : storageManagers.values()) {
- eachTablespace.close();
- }
+ private void addWarehouseAsSpace() {
+ Path warehouseDir = TajoConf.getWarehouseDir(systemConf);
+ registerTableSpace(DEFAULT_TABLESPACE_NAME, warehouseDir.toUri(), null, true, false);
+ }
+
+ private void addLocalFsTablespace() {
+ if (TABLE_SPACES.headMap(LOCAL_FS_URI, true).firstEntry() == null) {
+ String tmpName = UUID.randomUUID().toString();
+ registerTableSpace(tmpName, LOCAL_FS_URI, null, false, false);
}
- clearCache();
}
- /**
- * Returns FileStorageManager instance.
- *
- * @param tajoConf Tajo system property.
- * @return
- * @throws IOException
- */
- public static Tablespace getFileStorageManager(TajoConf tajoConf) throws IOException {
- return getStorageManager(tajoConf, "CSV");
+ public static TableSpaceManager getInstance() {
+ return instance;
}
- /**
- * Returns the proper Tablespace instance according to the storeType.
- *
- * @param tajoConf Tajo system property.
- * @param storeType Storage type
- * @return
- * @throws IOException
- */
- public static Tablespace getStorageManager(TajoConf tajoConf, String storeType) throws IOException {
- FileSystem fileSystem = TajoConf.getWarehouseDir(tajoConf).getFileSystem(tajoConf);
- if (fileSystem != null) {
- return getStorageManager(tajoConf, storeType, fileSystem.getUri().toString());
- } else {
- return getStorageManager(tajoConf, storeType, null);
+ private void initForDefaultConfig() {
+ JSONObject json = loadFromConfig(DEFAULT_CONFIG_FILE);
+ if (json == null) {
+ throw new IllegalStateException("There is no " + SITE_CONFIG_FILE);
}
+ applyConfig(json, false);
}
- /**
- * Returns the proper Tablespace instance according to the storeType
- *
- * @param tajoConf Tajo system property.
- * @param storeType Storage type
- * @param managerKey Key that can identify each storage manager(may be a path)
- * @return
- * @throws IOException
- */
- private static synchronized Tablespace getStorageManager (
- TajoConf tajoConf, String storeType, String managerKey) throws IOException {
+ private void initSiteConfig() {
+ JSONObject json = loadFromConfig(SITE_CONFIG_FILE);
+
+ // if there is no storage-site.json file, nothing happen.
+ if (json != null) {
+ applyConfig(json, true);
+ }
+ }
+
+ private JSONObject loadFromConfig(String fileName) {
+ String json;
+ try {
+ json = FileUtil.readTextFileFromResource(fileName);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
- String typeName;
- if (storeType.equalsIgnoreCase("HBASE")) {
- typeName = "hbase";
+ if (json != null) {
+ return parseJson(json);
} else {
- typeName = "hdfs";
+ return null;
}
+ }
+
+ private static JSONObject parseJson(String json) {
+ try {
+ return (JSONObject) parser.parse(json);
+ } catch (ParseException e) {
+ throw new RuntimeException(e);
+ }
+ }
- synchronized (storageManagers) {
- String storeKey = typeName + "_" + managerKey;
- Tablespace manager = storageManagers.get(storeKey);
+ private void applyConfig(JSONObject json, boolean override) {
+ loadStorages(json);
+ loadTableSpaces(json, override);
+ }
- if (manager == null) {
- Class<? extends Tablespace> storageManagerClass =
- tajoConf.getClass(String.format("tajo.storage.manager.%s.class", typeName), null, Tablespace.class);
+ private void loadStorages(JSONObject json) {
+ JSONObject spaces = (JSONObject) json.get(KEY_STORAGES);
- if (storageManagerClass == null) {
- throw new IOException("Unknown Storage Type: " + typeName);
- }
+ if (spaces != null) {
+ Pair<String, Class<? extends Tablespace>> pair = null;
+ for (Map.Entry<String, Object> entry : spaces.entrySet()) {
try {
- Constructor<? extends Tablespace> constructor =
- (Constructor<? extends Tablespace>) CONSTRUCTOR_CACHE.get(storageManagerClass);
- if (constructor == null) {
- constructor = storageManagerClass.getDeclaredConstructor(new Class<?>[]{String.class});
- constructor.setAccessible(true);
- CONSTRUCTOR_CACHE.put(storageManagerClass, constructor);
- }
- manager = constructor.newInstance(new Object[]{storeType});
- } catch (Exception e) {
- throw new RuntimeException(e);
+ pair = extractStorage(entry);
+ } catch (ClassNotFoundException e) {
+ LOG.warn(e);
+ continue;
}
- manager.init(tajoConf);
- storageManagers.put(storeKey, manager);
+
+ TABLE_SPACE_HANDLERS.put(pair.getFirst(), pair.getSecond());
+ }
+ }
+ }
+
+ private Pair<String, Class<? extends Tablespace>> extractStorage(Map.Entry<String, Object> entry)
+ throws ClassNotFoundException {
+
+ String storageType = entry.getKey();
+ JSONObject storageDesc = (JSONObject) entry.getValue();
+ String handlerClass = (String) storageDesc.get(KEY_STORAGE_HANDLER);
+
+ return new Pair<String, Class<? extends Tablespace>>(
+ storageType,(Class<? extends Tablespace>) Class.forName(handlerClass));
+ }
+
+ private void loadTableSpaces(JSONObject json, boolean override) {
+ JSONObject spaces = (JSONObject) json.get(KEY_SPACES);
+
+ if (spaces != null) {
+ for (Map.Entry<String, Object> entry : spaces.entrySet()) {
+ AddTableSpace(entry.getKey(), (JSONObject) entry.getValue(), override);
}
+ }
+ }
- return manager;
+ public static void AddTableSpace(String spaceName, JSONObject spaceDesc, boolean override) {
+ boolean defaultSpace = Boolean.parseBoolean(spaceDesc.getAsString("default"));
+ URI spaceUri = URI.create(spaceDesc.getAsString("uri"));
+
+ if (defaultSpace) {
+ registerTableSpace(DEFAULT_TABLESPACE_NAME, spaceUri, spaceDesc, true, override);
}
+ registerTableSpace(spaceName, spaceUri, spaceDesc, true, override);
}
- /**
- * Returns Scanner instance.
- *
- * @param conf The system property
- * @param meta The table meta
- * @param schema The input schema
- * @param fragment The fragment for scanning
- * @param target The output schema
- * @return Scanner instance
- * @throws IOException
- */
- public static synchronized SeekableScanner getSeekableScanner(
- TajoConf conf, TableMeta meta, Schema schema, Fragment fragment, Schema target) throws IOException {
- return (SeekableScanner)getStorageManager(conf, meta.getStoreType()).getScanner(meta, schema, fragment, target);
+ private static void registerTableSpace(String spaceName, URI uri, JSONObject spaceDesc,
+ boolean visible, boolean override) {
+ Tablespace tableSpace = initializeTableSpace(spaceName, uri, visible);
+ tableSpace.setVisible(visible);
+
+ try {
+ tableSpace.init(systemConf);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ putTablespace(tableSpace, override);
+
+ // If the arbitrary path is allowed, root uri is also added as a tablespace
+ if (tableSpace.getProperty().isArbitraryPathAllowed()) {
+ URI rootUri = tableSpace.getRootUri();
+ // if there already exists or the rootUri is 'file:/', it won't overwrite the tablespace.
+ if (!TABLE_SPACES.containsKey(rootUri) && !rootUri.toString().startsWith(LOCAL_FS_URI.toString())) {
+ String tmpName = UUID.randomUUID().toString();
+ registerTableSpace(tmpName, rootUri, spaceDesc, false, override);
+ }
+ }
+ }
+
+ private static void putTablespace(Tablespace space, boolean override) {
+ // It is a device to keep the relationship among name, URI, and tablespace 1:1:1.
+
+ boolean nameExist = SPACES_URIS_MAP.containsKey(space.getName());
+ boolean uriExist = TABLE_SPACES.containsKey(space.uri);
+
+ boolean mismatch = nameExist && !SPACES_URIS_MAP.get(space.getName()).equals(space.getUri());
+ mismatch = mismatch || uriExist && TABLE_SPACES.get(space.uri).equals(space);
+
+ if (!override && mismatch) {
+ throw new RuntimeException("Name or URI of Tablespace must be unique.");
+ }
+
+ SPACES_URIS_MAP.put(space.getName(), space.getUri());
+ // We must guarantee that the same uri results in the same tablespace instance.
+ TABLE_SPACES.put(space.getUri(), space);
}
/**
- * Creates a scanner instance.
+ * Return length of the fragment.
+ * In the UNKNOWN_LENGTH case get FRAGMENT_ALTERNATIVE_UNKNOWN_LENGTH from the configuration.
*
- * @param theClass Concrete class of scanner
- * @param conf System property
- * @param schema Input schema
- * @param meta Table meta data
- * @param fragment The fragment for scanning
- * @param <T>
- * @return The scanner instance
+ * @param conf Tajo system property
+ * @param fragment Fragment
+ * @return
*/
- public static <T> T newScannerInstance(Class<T> theClass, Configuration conf, Schema schema, TableMeta meta,
- Fragment fragment) {
- T result;
+ public static long guessFragmentVolume(TajoConf conf, Fragment fragment) {
+ if (fragment.getLength() == TajoConstants.UNKNOWN_LENGTH) {
+ return conf.getLongVar(TajoConf.ConfVars.FRAGMENT_ALTERNATIVE_UNKNOWN_LENGTH);
+ } else {
+ return fragment.getLength();
+ }
+ }
+
+ public static final String KEY_STORAGES = "storages"; // storages
+ public static final String KEY_STORAGE_HANDLER = "handler"; // storages/?/handler
+ public static final String KEY_STORAGE_DEFAULT_FORMAT = "default-format"; // storages/?/default-format
+
+ public static final String KEY_SPACES = "spaces";
+
+ private static Tablespace initializeTableSpace(String spaceName, URI uri, boolean visible) {
+ Preconditions.checkNotNull(uri.getScheme(), "URI must include scheme, but it was " + uri);
+ Class<? extends Tablespace> clazz = TABLE_SPACE_HANDLERS.get(uri.getScheme());
+
+ if (clazz == null) {
+ throw new RuntimeException("There is no tablespace for " + uri.toString());
+ }
+
try {
- Constructor<T> meth = (Constructor<T>) CONSTRUCTOR_CACHE.get(theClass);
- if (meth == null) {
- meth = theClass.getDeclaredConstructor(DEFAULT_SCANNER_PARAMS);
- meth.setAccessible(true);
- CONSTRUCTOR_CACHE.put(theClass, meth);
+ Constructor<? extends Tablespace> constructor =
+ (Constructor<? extends Tablespace>) CONSTRUCTORS.get(clazz);
+
+ if (constructor == null) {
+ constructor = clazz.getDeclaredConstructor(TABLESPACE_PARAM);
+ constructor.setAccessible(true);
+ CONSTRUCTORS.put(clazz, constructor);
}
- result = meth.newInstance(new Object[]{conf, schema, meta, fragment});
+
+ return constructor.newInstance(new Object[]{spaceName, uri});
} catch (Exception e) {
throw new RuntimeException(e);
}
+ }
+
+ @VisibleForTesting
+ public static Optional<Tablespace> addTableSpaceForTest(Tablespace space) {
+ Tablespace existing;
+ synchronized (SPACES_URIS_MAP) {
+ // Remove existing one
+ SPACES_URIS_MAP.remove(space.getName());
+ existing = TABLE_SPACES.remove(space.getUri());
+
+ // Add anotherone for test
+ registerTableSpace(space.name, space.uri, null, true, true);
+ }
+ // if there is an existing one, return it.
+ return Optional.fromNullable(existing);
+ }
- return result;
+ public Iterable<String> getSupportSchemes() {
+ return TABLE_SPACE_HANDLERS.keySet();
}
/**
- * Creates a scanner instance.
+ * Get tablespace for the given URI. If uri is null, the default tablespace will be returned
*
- * @param theClass Concrete class of scanner
- * @param conf System property
- * @param taskAttemptId Task id
- * @param meta Table meta data
- * @param schema Input schema
- * @param workDir Working directory
- * @param <T>
- * @return The scanner instance
+ * @param uri Table or Table Fragment URI.
+ * @param <T> Tablespace class type
+ * @return Tablespace. If uri is null, the default tablespace will be returned.
*/
- public static <T> T newAppenderInstance(Class<T> theClass, Configuration conf, TaskAttemptId taskAttemptId,
- TableMeta meta, Schema schema, Path workDir) {
- T result;
- try {
- Constructor<T> meth = (Constructor<T>) CONSTRUCTOR_CACHE.get(theClass);
- if (meth == null) {
- meth = theClass.getDeclaredConstructor(DEFAULT_APPENDER_PARAMS);
- meth.setAccessible(true);
- CONSTRUCTOR_CACHE.put(theClass, meth);
+ public static <T extends Tablespace> Optional<T> get(@Nullable String uri) {
+
+ if (uri == null || uri.isEmpty()) {
+ return (Optional<T>) Optional.of(getDefault());
+ }
+
+ Tablespace lastOne = null;
+
+ // Find the longest matched one. For example, assume that the caller tries to find /x/y/z, and
+ // there are /x and /x/y. In this case, /x/y will be chosen because it is more specific.
+ for (Map.Entry<URI, Tablespace> entry: TABLE_SPACES.headMap(URI.create(uri), true).entrySet()) {
+ if (uri.startsWith(entry.getKey().toString())) {
+ lastOne = entry.getValue();
}
- result = meth.newInstance(new Object[]{conf, taskAttemptId, schema, meta, workDir});
- } catch (Exception e) {
- throw new RuntimeException(e);
}
+ return (Optional<T>) Optional.fromNullable(lastOne);
+ }
+
+ /**
+ * Get tablespace for the given URI. If uri is null, the default tablespace will be returned
+ *
+ * @param uri Table or Table Fragment URI.
+ * @param <T> Tablespace class type
+ * @return Tablespace. If uri is null, the default tablespace will be returned.
+ */
+ public static <T extends Tablespace> Optional<T> get(@Nullable URI uri) {
+ if (uri == null) {
+ return (Optional<T>) Optional.of(getDefault());
+ } else {
+ return (Optional<T>) get(uri.toString());
+ }
+ }
+
+ /**
+ * It returns the default tablespace. This method ensures that it always return the tablespace.
+ *
+ * @return
+ */
+ public static <T extends Tablespace> T getDefault() {
+ return (T) getByName(DEFAULT_TABLESPACE_NAME).get();
+ }
+
+ public static <T extends Tablespace> T getLocalFs() {
+ return (T) get(LOCAL_FS_URI).get();
+ }
+
+ public static Optional<? extends Tablespace> getByName(String name) {
+ URI uri = SPACES_URIS_MAP.get(name);
+ if (uri != null) {
+ return Optional.of(TABLE_SPACES.get(uri));
+ } else {
+ return Optional.absent();
+ }
+ }
+
+ public static Optional<? extends Tablespace> getAnyByScheme(String scheme) {
+ for (Map.Entry<URI, Tablespace> entry : TABLE_SPACES.entrySet()) {
+ String uriScheme = entry.getKey().getScheme();
+ if (uriScheme != null && uriScheme.equalsIgnoreCase(scheme)) {
+ return Optional.of(entry.getValue());
+ }
+ }
+
+ return Optional.absent();
+ }
+
+ @Override
+ public URI getTableURI(@Nullable String spaceName, String databaseName, String tableName) {
+ Tablespace space = spaceName == null ? getDefault() : getByName(spaceName).get();
+ return space.getTableUri(databaseName, tableName);
+ }
- return result;
+ public static Iterable<Tablespace> getAllTablespaces() {
+ return TABLE_SPACES.values();
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java
index 0626da8..77c5d05 100644
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java
@@ -19,10 +19,8 @@
package org.apache.tajo.storage;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
import org.apache.tajo.ExecutionBlockId;
import org.apache.tajo.OverridableConf;
-import org.apache.tajo.TajoConstants;
import org.apache.tajo.TaskAttemptId;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.SortSpec;
@@ -30,16 +28,20 @@ import org.apache.tajo.catalog.TableDesc;
import org.apache.tajo.catalog.TableMeta;
import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.conf.TajoConf.ConfVars;
+import org.apache.tajo.exception.UnsupportedException;
import org.apache.tajo.plan.LogicalPlan;
+import org.apache.tajo.plan.PlanningException;
import org.apache.tajo.plan.logical.LogicalNode;
import org.apache.tajo.plan.logical.ScanNode;
-import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule;
import org.apache.tajo.storage.fragment.Fragment;
import org.apache.tajo.storage.fragment.FragmentConvertor;
import java.io.IOException;
+import java.net.URI;
+import java.util.Collections;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
/**
* Tablespace manages the functions of storing and reading data.
@@ -49,18 +51,24 @@ import java.util.List;
*/
public abstract class Tablespace {
- public static final PathFilter hiddenFileFilter = new PathFilter() {
- public boolean accept(Path p) {
- String name = p.getName();
- return !name.startsWith("_") && !name.startsWith(".");
- }
- };
+ protected final String name;
+ protected final URI uri;
+ /** this space is visible or not. */
+ protected boolean visible = true;
protected TajoConf conf;
- protected String storeType;
- public Tablespace(String storeType) {
- this.storeType = storeType;
+ public Tablespace(String name, URI uri) {
+ this.name = name;
+ this.uri = uri;
+ }
+
+ public void setVisible(boolean visible) {
+ this.visible = visible;
+ }
+
+ public Set<String> getDependencies() {
+ return Collections.emptySet();
}
/**
@@ -69,24 +77,47 @@ public abstract class Tablespace {
*/
protected abstract void storageInit() throws IOException;
+ public String getName() {
+ return name;
+ }
+
+ public URI getUri() {
+ return uri;
+ }
+
+ public boolean isVisible() {
+ return visible;
+ }
+
+ public abstract void setConfig(String name, String value);
+
+ public abstract void setConfigs(Map<String, String> configs);
+
+ public String toString() {
+ return name + "=" + uri.toString();
+ }
+
+ public abstract long getTableVolume(URI uri) throws IOException;
+
/**
- * This method is called after executing "CREATE TABLE" statement.
- * If a storage is a file based storage, a storage manager may create directory.
+ * if {@link StorageProperty#isArbitraryPathAllowed} is true,
+ * the storage allows arbitrary path accesses. In this case, the storage must provide the root URI.
*
- * @param tableDesc Table description which is created.
- * @param ifNotExists Creates the table only when the table does not exist.
- * @throws java.io.IOException
+ * @see {@link StorageProperty#isArbitraryPathAllowed}
+ * @return Root URI
*/
- public abstract void createTable(TableDesc tableDesc, boolean ifNotExists) throws IOException;
+ public URI getRootUri() {
+ throw new UnsupportedException(
+ String.format("Tablespace '%s' does not allow the use of artibrary paths", uri.toString()));
+ }
/**
- * This method is called after executing "DROP TABLE" statement with the 'PURGE' option
- * which is the option to delete all the data.
+ * Get Table URI
*
- * @param tableDesc
- * @throws java.io.IOException
+ * @param tableName
+ * @return
*/
- public abstract void purgeTable(TableDesc tableDesc) throws IOException;
+ public abstract URI getTableUri(String databaseName, String tableName);
/**
* Returns the splits that will serve as input for the scan tasks. The
@@ -116,7 +147,9 @@ public abstract class Tablespace {
* It returns the storage property.
* @return The storage property
*/
- public abstract StorageProperty getStorageProperty();
+ public abstract StorageProperty getProperty();
+
+ public abstract FormatProperty getFormatProperty(String dataFormat);
/**
* Release storage manager resource
@@ -137,21 +170,10 @@ public abstract class Tablespace {
* @throws java.io.IOException
*/
public abstract TupleRange[] getInsertSortRanges(OverridableConf queryContext, TableDesc tableDesc,
- Schema inputSchema, SortSpec[] sortSpecs,
+ Schema inputSchema, SortSpec [] sortSpecs,
TupleRange dataRange) throws IOException;
/**
- * This method is called before executing 'INSERT' or 'CREATE TABLE as SELECT'.
- * In general Tajo creates the target table after finishing the final sub-query of CATS.
- * But In the special cases, such as HBase INSERT or CAST query uses the target table information.
- * That kind of the storage should implements the logic related to creating table in this method.
- *
- * @param node The child node of the root node.
- * @throws java.io.IOException
- */
- public abstract void beforeInsertOrCATS(LogicalNode node) throws IOException;
-
- /**
* It is called when the query failed.
* Each storage manager should implement to be processed when the query fails in this method.
*
@@ -160,21 +182,13 @@ public abstract class Tablespace {
*/
/**
- * Returns the current storage type.
- * @return
- */
- public String getStoreType() {
- return storeType;
- }
-
- /**
* Initialize Tablespace instance. It should be called before using.
*
* @param tajoConf
* @throws java.io.IOException
*/
public void init(TajoConf tajoConf) throws IOException {
- this.conf = tajoConf;
+ this.conf = new TajoConf(tajoConf);
storageInit();
}
@@ -239,7 +253,7 @@ public abstract class Tablespace {
Scanner scanner;
Class<? extends Scanner> scannerClass = getScannerClass(meta.getStoreType());
- scanner = TableSpaceManager.newScannerInstance(scannerClass, conf, schema, meta, fragment);
+ scanner = OldStorageManager.newScannerInstance(scannerClass, conf, schema, meta, fragment);
scanner.setTarget(target.toArray());
return scanner;
@@ -263,18 +277,18 @@ public abstract class Tablespace {
Class<? extends Appender> appenderClass;
String handlerName = meta.getStoreType().toLowerCase();
- appenderClass = TableSpaceManager.APPENDER_HANDLER_CACHE.get(handlerName);
+ appenderClass = OldStorageManager.APPENDER_HANDLER_CACHE.get(handlerName);
if (appenderClass == null) {
appenderClass = conf.getClass(
String.format("tajo.storage.appender-handler.%s.class", handlerName), null, Appender.class);
- TableSpaceManager.APPENDER_HANDLER_CACHE.put(handlerName, appenderClass);
+ OldStorageManager.APPENDER_HANDLER_CACHE.put(handlerName, appenderClass);
}
if (appenderClass == null) {
throw new IOException("Unknown Storage Type: " + meta.getStoreType());
}
- appender = TableSpaceManager.newAppenderInstance(appenderClass, conf, taskAttemptId, meta, schema, workDir);
+ appender = OldStorageManager.newAppenderInstance(appenderClass, conf, taskAttemptId, meta, schema, workDir);
return appender;
}
@@ -288,11 +302,11 @@ public abstract class Tablespace {
*/
public Class<? extends Scanner> getScannerClass(String storeType) throws IOException {
String handlerName = storeType.toLowerCase();
- Class<? extends Scanner> scannerClass = TableSpaceManager.SCANNER_HANDLER_CACHE.get(handlerName);
+ Class<? extends Scanner> scannerClass = OldStorageManager.SCANNER_HANDLER_CACHE.get(handlerName);
if (scannerClass == null) {
scannerClass = conf.getClass(
String.format("tajo.storage.scanner-handler.%s.class", handlerName), null, Scanner.class);
- TableSpaceManager.SCANNER_HANDLER_CACHE.put(handlerName, scannerClass);
+ OldStorageManager.SCANNER_HANDLER_CACHE.put(handlerName, scannerClass);
}
if (scannerClass == null) {
@@ -303,43 +317,54 @@ public abstract class Tablespace {
}
/**
- * Return length of the fragment.
- * In the UNKNOWN_LENGTH case get FRAGMENT_ALTERNATIVE_UNKNOWN_LENGTH from the configuration.
+ * It is called after making logical plan. Storage manager should verify the schema for inserting.
*
- * @param conf Tajo system property
- * @param fragment Fragment
- * @return
+ * @param tableDesc The table description of insert target.
+ * @param outSchema The output schema of select query for inserting.
+ * @throws java.io.IOException
*/
- public static long getFragmentLength(TajoConf conf, Fragment fragment) {
- if (fragment.getLength() == TajoConstants.UNKNOWN_LENGTH) {
- return conf.getLongVar(ConfVars.FRAGMENT_ALTERNATIVE_UNKNOWN_LENGTH);
- } else {
- return fragment.getLength();
- }
+ public abstract void verifySchemaToWrite(TableDesc tableDesc, Schema outSchema) throws IOException;
+
+ /**
+ * Rewrite the logical plan. It is assumed that the final plan will be given in this method.
+ */
+ public void rewritePlan(OverridableConf context, LogicalPlan plan) throws PlanningException {
+ // nothing to do by default
}
- public abstract void rollbackOutputCommit(LogicalNode node) throws IOException;
+ ////////////////////////////////////////////////////////////////////////////
+ // Table Lifecycle Section
+ ////////////////////////////////////////////////////////////////////////////
/**
- * It is called after making logical plan. Storage manager should verify the schema for inserting.
+ * This method is called after executing "CREATE TABLE" statement.
+ * If a storage is a file based storage, a storage manager may create directory.
*
- * @param tableDesc The table description of insert target.
- * @param outSchema The output schema of select query for inserting.
+ * @param tableDesc Table description which is created.
+ * @param ifNotExists Creates the table only when the table does not exist.
* @throws java.io.IOException
*/
- public abstract void verifyInsertTableSchema(TableDesc tableDesc, Schema outSchema) throws IOException;
+ public abstract void createTable(TableDesc tableDesc, boolean ifNotExists) throws IOException;
/**
- * Returns the list of storage specified rewrite rules.
- * This values are used by LogicalOptimizer.
+ * This method is called after executing "DROP TABLE" statement with the 'PURGE' option
+ * which is the option to delete all the data.
*
- * @param queryContext The query property
- * @param tableDesc The description of the target table.
- * @return The list of storage specified rewrite rules
+ * @param tableDesc
* @throws java.io.IOException
*/
- public abstract List<LogicalPlanRewriteRule> getRewriteRules(OverridableConf queryContext, TableDesc tableDesc)
- throws IOException;
+ public abstract void purgeTable(TableDesc tableDesc) throws IOException;
+
+ /**
+ * This method is called before executing 'INSERT' or 'CREATE TABLE as SELECT'.
+ * In general Tajo creates the target table after finishing the final sub-query of CATS.
+ * But In the special cases, such as HBase INSERT or CAST query uses the target table information.
+ * That kind of the storage should implements the logic related to creating table in this method.
+ *
+ * @param node The child node of the root node.
+ * @throws java.io.IOException
+ */
+ public abstract void prepareTable(LogicalNode node) throws IOException;
/**
* Finalizes result data. Tajo stores result data in the staging directory.
@@ -354,7 +379,20 @@ public abstract class Tablespace {
* @return Saved path
* @throws java.io.IOException
*/
- public abstract Path commitOutputData(OverridableConf queryContext, ExecutionBlockId finalEbId,
- LogicalPlan plan, Schema schema,
- TableDesc tableDesc) throws IOException;
+ public abstract Path commitTable(OverridableConf queryContext,
+ ExecutionBlockId finalEbId,
+ LogicalPlan plan, Schema schema,
+ TableDesc tableDesc) throws IOException;
+
+ public abstract void rollbackTable(LogicalNode node) throws IOException;
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof Tablespace) {
+ Tablespace other = (Tablespace) obj;
+ return name.equals(other.name) && uri.equals(other.uri);
+ } else {
+ return false;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-storage/tajo-storage-common/src/main/resources/storage-default.json
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/resources/storage-default.json b/tajo-storage/tajo-storage-common/src/main/resources/storage-default.json
new file mode 100644
index 0000000..40e17f4
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/main/resources/storage-default.json
@@ -0,0 +1,20 @@
+{
+ "storages": {
+ "hdfs": {
+ "handler": "org.apache.tajo.storage.FileTablespace",
+ "default-format": "text"
+ },
+ "file": {
+ "handler": "org.apache.tajo.storage.FileTablespace",
+ "default-format": "text"
+ },
+ "s3": {
+ "handler": "org.apache.tajo.storage.FileTablespace",
+ "default-format": "text"
+ },
+ "hbase": {
+ "handler": "org.apache.tajo.storage.hbase.HBaseTablespace",
+ "default-format": "hbase"
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-storage/tajo-storage-hbase/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/pom.xml b/tajo-storage/tajo-storage-hbase/pom.xml
index 3456b76..5a1dc9a 100644
--- a/tajo-storage/tajo-storage-hbase/pom.xml
+++ b/tajo-storage/tajo-storage-hbase/pom.xml
@@ -61,6 +61,12 @@
<plugin>
<groupId>org.apache.rat</groupId>
<artifactId>apache-rat-plugin</artifactId>
+ <configuration>
+ <excludes>
+ <exclude>src/main/resources/*.json</exclude>
+ <exclude>src/test/resources/*.json</exclude>
+ </excludes>
+ </configuration>
<executions>
<execution>
<phase>verify</phase>
@@ -182,6 +188,11 @@
<artifactId>tajo-storage-common</artifactId>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.tajo</groupId>
+ <artifactId>tajo-storage-hdfs</artifactId>
+ <scope>provided</scope>
+ </dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AbstractHBaseAppender.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AbstractHBaseAppender.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AbstractHBaseAppender.java
index 425f392..0fc2922 100644
--- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AbstractHBaseAppender.java
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AbstractHBaseAppender.java
@@ -89,7 +89,7 @@ public abstract class AbstractHBaseAppender implements Appender {
if (enabledStats) {
stats = new TableStatistics(this.schema);
}
- columnMapping = new ColumnMapping(schema, meta);
+ columnMapping = new ColumnMapping(schema, meta.getOptions());
mappingColumnFamilies = columnMapping.getMappingColumns();
http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AddSortForInsertRewriter.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AddSortForInsertRewriter.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AddSortForInsertRewriter.java
deleted file mode 100644
index 32f1e43..0000000
--- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AddSortForInsertRewriter.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.hbase;
-
-import org.apache.tajo.OverridableConf;
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.SortSpec;
-import org.apache.tajo.catalog.TableDesc;
-import org.apache.tajo.plan.LogicalPlan;
-import org.apache.tajo.plan.PlanningException;
-import org.apache.tajo.plan.logical.LogicalNode;
-import org.apache.tajo.plan.logical.LogicalRootNode;
-import org.apache.tajo.plan.logical.SortNode;
-import org.apache.tajo.plan.logical.SortNode.SortPurpose;
-import org.apache.tajo.plan.logical.UnaryNode;
-import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule;
-import org.apache.tajo.plan.util.PlannerUtil;
-
-public class AddSortForInsertRewriter implements LogicalPlanRewriteRule {
- private int[] sortColumnIndexes;
- private Column[] sortColumns;
-
- public AddSortForInsertRewriter(TableDesc tableDesc, Column[] sortColumns) {
- this.sortColumns = sortColumns;
- this.sortColumnIndexes = new int[sortColumns.length];
-
- Schema tableSchema = tableDesc.getSchema();
- for (int i = 0; i < sortColumns.length; i++) {
- sortColumnIndexes[i] = tableSchema.getColumnId(sortColumns[i].getQualifiedName());
- }
- }
-
- @Override
- public String getName() {
- return "AddSortForInsertRewriter";
- }
-
- @Override
- public boolean isEligible(OverridableConf queryContext, LogicalPlan plan) {
- String storeType = PlannerUtil.getStoreType(plan);
- return storeType != null;
- }
-
- @Override
- public LogicalPlan rewrite(OverridableConf queryContext, LogicalPlan plan) throws PlanningException {
- LogicalRootNode rootNode = plan.getRootBlock().getRoot();
- UnaryNode insertNode = rootNode.getChild();
- LogicalNode childNode = insertNode.getChild();
-
- Schema sortSchema = childNode.getOutSchema();
- SortNode sortNode = plan.createNode(SortNode.class);
- sortNode.setSortPurpose(SortPurpose.STORAGE_SPECIFIED);
- sortNode.setInSchema(sortSchema);
- sortNode.setOutSchema(sortSchema);
-
- SortSpec[] sortSpecs = new SortSpec[sortColumns.length];
- int index = 0;
-
- for (int i = 0; i < sortColumnIndexes.length; i++) {
- Column sortColumn = sortSchema.getColumn(sortColumnIndexes[i]);
- if (sortColumn == null) {
- throw new PlanningException("Can't fine proper sort column:" + sortColumns[i]);
- }
- sortSpecs[index++] = new SortSpec(sortColumn, true, true);
- }
- sortNode.setSortSpecs(sortSpecs);
-
- sortNode.setChild(insertNode.getChild());
- insertNode.setChild(sortNode);
- plan.getRootBlock().registerNode(sortNode);
-
- return plan;
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/ColumnMapping.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/ColumnMapping.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/ColumnMapping.java
index e66a707..0314e8e 100644
--- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/ColumnMapping.java
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/ColumnMapping.java
@@ -20,18 +20,18 @@ package org.apache.tajo.storage.hbase;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableMeta;
import org.apache.tajo.util.BytesUtils;
+import org.apache.tajo.util.KeyValueSet;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class ColumnMapping {
- private TableMeta tableMeta;
private Schema schema;
- private char rowKeyDelimiter;
+ private KeyValueSet tableProperty;
+ private char rowKeyDelimiter;
private String hbaseTableName;
private int[] rowKeyFieldIndexes;
@@ -45,16 +45,15 @@ public class ColumnMapping {
private int numRowKeys;
- public ColumnMapping(Schema schema, TableMeta tableMeta) throws IOException {
+ public ColumnMapping(Schema schema, KeyValueSet tableProperty) throws IOException{
this.schema = schema;
- this.tableMeta = tableMeta;
-
+ this.tableProperty = tableProperty;
init();
}
public void init() throws IOException {
- hbaseTableName = tableMeta.getOption(HBaseStorageConstants.META_TABLE_KEY);
- String delim = tableMeta.getOption(HBaseStorageConstants.META_ROWKEY_DELIMITER, "").trim();
+ hbaseTableName = tableProperty.get(HBaseStorageConstants.META_TABLE_KEY);
+ String delim = tableProperty.get(HBaseStorageConstants.META_ROWKEY_DELIMITER, "").trim();
if (delim.length() > 0) {
rowKeyDelimiter = delim.charAt(0);
}
@@ -70,7 +69,7 @@ public class ColumnMapping {
rowKeyFieldIndexes[i] = -1;
}
- String columnMapping = tableMeta.getOption(HBaseStorageConstants.META_COLUMNS_KEY, "");
+ String columnMapping = tableProperty.get(HBaseStorageConstants.META_COLUMNS_KEY, "");
if (columnMapping == null || columnMapping.isEmpty()) {
throw new IOException("'columns' property is required.");
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseFragment.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseFragment.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseFragment.java
index 9ea0bf6..5961751 100644
--- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseFragment.java
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseFragment.java
@@ -29,8 +29,12 @@ import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
import org.apache.tajo.storage.fragment.Fragment;
import org.apache.tajo.storage.hbase.StorageFragmentProtos.*;
+import java.net.URI;
+
public class HBaseFragment implements Fragment, Comparable<HBaseFragment>, Cloneable {
@Expose
+ private URI uri;
+ @Expose
private String tableName;
@Expose
private String hbaseTableName;
@@ -45,7 +49,9 @@ public class HBaseFragment implements Fragment, Comparable<HBaseFragment>, Clone
@Expose
private long length;
- public HBaseFragment(String tableName, String hbaseTableName, byte[] startRow, byte[] stopRow, String regionLocation) {
+ public HBaseFragment(URI uri, String tableName, String hbaseTableName, byte[] startRow, byte[] stopRow,
+ String regionLocation) {
+ this.uri = uri;
this.tableName = tableName;
this.hbaseTableName = hbaseTableName;
this.startRow = startRow;
@@ -62,6 +68,7 @@ public class HBaseFragment implements Fragment, Comparable<HBaseFragment>, Clone
}
private void init(HBaseFragmentProto proto) {
+ this.uri = URI.create(proto.getUri());
this.tableName = proto.getTableName();
this.hbaseTableName = proto.getHbaseTableName();
this.startRow = proto.getStartRow().toByteArray();
@@ -76,6 +83,10 @@ public class HBaseFragment implements Fragment, Comparable<HBaseFragment>, Clone
return Bytes.compareTo(startRow, t.startRow);
}
+ public URI getUri() {
+ return uri;
+ }
+
@Override
public String getTableName() {
return tableName;
@@ -107,6 +118,7 @@ public class HBaseFragment implements Fragment, Comparable<HBaseFragment>, Clone
public Object clone() throws CloneNotSupportedException {
HBaseFragment frag = (HBaseFragment) super.clone();
+ frag.uri = uri;
frag.tableName = tableName;
frag.hbaseTableName = hbaseTableName;
frag.startRow = startRow;
@@ -137,16 +149,20 @@ public class HBaseFragment implements Fragment, Comparable<HBaseFragment>, Clone
@Override
public String toString() {
- return "\"fragment\": {\"tableName\": \""+ tableName + "\", hbaseTableName\": \"" + hbaseTableName + "\"" +
- ", \"startRow\": \"" + new String(startRow) + "\"" +
- ", \"stopRow\": \"" + new String(stopRow) + "\"" +
- ", \"length\": \"" + length + "\"}" ;
+ return
+ "\"fragment\": {\"uri:\"" + uri.toString() +"\", \"tableName\": \""+ tableName +
+ "\", hbaseTableName\": \"" + hbaseTableName + "\"" +
+ ", \"startRow\": \"" + new String(startRow) + "\"" +
+ ", \"stopRow\": \"" + new String(stopRow) + "\"" +
+ ", \"length\": \"" + length + "\"}" ;
}
@Override
public FragmentProto getProto() {
HBaseFragmentProto.Builder builder = HBaseFragmentProto.newBuilder();
- builder.setTableName(tableName)
+ builder
+ .setUri(uri.toString())
+ .setTableName(tableName)
.setHbaseTableName(hbaseTableName)
.setStartRow(ByteString.copyFrom(startRow))
.setStopRow(ByteString.copyFrom(stopRow))
http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java
index 19fdf80..916aae7 100644
--- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java
@@ -26,28 +26,29 @@ import org.apache.hadoop.hbase.client.Put;
import org.apache.tajo.TaskAttemptId;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.storage.TableSpaceManager;
import org.apache.tajo.storage.Tuple;
import java.io.IOException;
+import java.net.URI;
public class HBasePutAppender extends AbstractHBaseAppender {
+ private URI uri;
private HTableInterface htable;
private long totalNumBytes;
- public HBasePutAppender(Configuration conf, TaskAttemptId taskAttemptId,
+ public HBasePutAppender(Configuration conf, URI uri, TaskAttemptId taskAttemptId,
Schema schema, TableMeta meta, Path stagingDir) {
super(conf, taskAttemptId, schema, meta, stagingDir);
+ this.uri = uri;
}
@Override
public void init() throws IOException {
super.init();
- Configuration hbaseConf = HBaseTablespace.getHBaseConfiguration(conf, meta);
- HConnection hconn = ((HBaseTablespace) TableSpaceManager.getStorageManager((TajoConf) conf, "HBASE"))
- .getConnection(hbaseConf);
+ HBaseTablespace space = (HBaseTablespace) TableSpaceManager.get(uri).get();
+ HConnection hconn = space.getConnection();
htable = hconn.getTable(columnMapping.getHbaseTableName());
htable.setAutoFlushTo(false);
htable.setWriteBufferSize(5 * 1024 * 1024);
http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java
index 992c13c..16f4c14 100644
--- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java
@@ -117,7 +117,7 @@ public class HBaseScanner implements Scanner {
targets = schema.toArray();
}
- columnMapping = new ColumnMapping(schema, meta);
+ columnMapping = new ColumnMapping(schema, meta.getOptions());
targetIndexes = new int[targets.length];
int index = 0;
for (Column eachTargetColumn: targets) {
@@ -133,8 +133,8 @@ public class HBaseScanner implements Scanner {
rowKeyDelimiter = columnMapping.getRowKeyDelimiter();
rowKeyFieldIndexes = columnMapping.getRowKeyFieldIndexes();
- hbaseConf = HBaseTablespace.getHBaseConfiguration(conf, meta);
-
+ HBaseTablespace space = (HBaseTablespace) TableSpaceManager.get(fragment.getUri()).get();
+ hbaseConf = space.getHbaseConf();
initScanner();
}
@@ -181,8 +181,7 @@ public class HBaseScanner implements Scanner {
}
if (htable == null) {
- HConnection hconn = ((HBaseTablespace) TableSpaceManager.getStorageManager(conf, "HBASE"))
- .getConnection(hbaseConf);
+ HConnection hconn = ((HBaseTablespace) TableSpaceManager.get(fragment.getUri()).get()).getConnection();
htable = hconn.getTable(fragment.getHbaseTableName());
}
scanner = htable.getScanner(scan);