You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2015/05/15 11:16:59 UTC
[2/3] tajo git commit: TAJO-1603: Refactor StorageManager. (hyunsik)
http://git-wip-us.apache.org/repos/asf/tajo/blob/5491f0e7/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageManager.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageManager.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageManager.java
index 24d2dfa..0751035 100644
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageManager.java
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageManager.java
@@ -18,33 +18,28 @@
package org.apache.tajo.storage;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Maps;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.*;
-import org.apache.tajo.*;
-import org.apache.tajo.catalog.*;
-import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.OverridableConf;
+import org.apache.tajo.TajoConstants;
+import org.apache.tajo.TaskAttemptId;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.SortSpec;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.catalog.TableMeta;
import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.conf.TajoConf.ConfVars;
import org.apache.tajo.plan.LogicalPlan;
import org.apache.tajo.plan.logical.LogicalNode;
-import org.apache.tajo.plan.logical.NodeType;
import org.apache.tajo.plan.logical.ScanNode;
import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule;
import org.apache.tajo.storage.fragment.Fragment;
import org.apache.tajo.storage.fragment.FragmentConvertor;
-import org.apache.tajo.util.TUtil;
import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.text.NumberFormat;
import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
/**
* StorageManager manages the functions of storing and reading data.
@@ -52,23 +47,7 @@ import java.util.concurrent.ConcurrentHashMap;
* For supporting such as HDFS, HBASE, a specific StorageManager should be implemented by inheriting this class.
*
*/
-public abstract class StorageManager {
- private final Log LOG = LogFactory.getLog(StorageManager.class);
-
- private static final Class<?>[] DEFAULT_SCANNER_PARAMS = {
- Configuration.class,
- Schema.class,
- TableMeta.class,
- Fragment.class
- };
-
- private static final Class<?>[] DEFAULT_APPENDER_PARAMS = {
- Configuration.class,
- TaskAttemptId.class,
- Schema.class,
- TableMeta.class,
- Path.class
- };
+public abstract class StorageManager implements TableSpace {
public static final PathFilter hiddenFileFilter = new PathFilter() {
public boolean accept(Path p) {
@@ -80,31 +59,6 @@ public abstract class StorageManager {
protected TajoConf conf;
protected String storeType;
- /**
- * Cache of StorageManager.
- * Key is manager key(warehouse path) + store type
- */
- private static final Map<String, StorageManager> storageManagers = Maps.newHashMap();
-
- /**
- * Cache of scanner handlers for each storage type.
- */
- protected static final Map<String, Class<? extends Scanner>> SCANNER_HANDLER_CACHE
- = new ConcurrentHashMap<String, Class<? extends Scanner>>();
-
- /**
- * Cache of appender handlers for each storage type.
- */
- protected static final Map<String, Class<? extends Appender>> APPENDER_HANDLER_CACHE
- = new ConcurrentHashMap<String, Class<? extends Appender>>();
-
- /**
- * Cache of constructors for each class. Pins the classes so they
- * can't be garbage collected until ReflectionUtils can be collected.
- */
- private static final Map<Class<?>, Constructor<?>> CONSTRUCTOR_CACHE =
- new ConcurrentHashMap<Class<?>, Constructor<?>>();
-
public StorageManager(String storeType) {
this.storeType = storeType;
}
@@ -123,6 +77,7 @@ public abstract class StorageManager {
* @param ifNotExists Creates the table only when the table does not exist.
* @throws java.io.IOException
*/
+ @Override
public abstract void createTable(TableDesc tableDesc, boolean ifNotExists) throws IOException;
/**
@@ -132,6 +87,7 @@ public abstract class StorageManager {
* @param tableDesc
* @throws java.io.IOException
*/
+ @Override
public abstract void purgeTable(TableDesc tableDesc) throws IOException;
/**
@@ -143,6 +99,7 @@ public abstract class StorageManager {
* @return The list of input fragments.
* @throws java.io.IOException
*/
+ @Override
public abstract List<Fragment> getSplits(String fragmentId, TableDesc tableDesc,
ScanNode scanNode) throws IOException;
@@ -167,21 +124,11 @@ public abstract class StorageManager {
/**
* Release storage manager resource
*/
- public abstract void closeStorageManager();
+ @Override
+ public abstract void close();
/**
- * Clear all class cache
- */
- @VisibleForTesting
- protected synchronized static void clearCache() {
- CONSTRUCTOR_CACHE.clear();
- SCANNER_HANDLER_CACHE.clear();
- APPENDER_HANDLER_CACHE.clear();
- storageManagers.clear();
- }
-
- /**
* It is called by a Repartitioner for range shuffling when the SortRangeType of SortNode is USING_STORAGE_MANAGER.
* In general Repartitioner determines the partition range using previous output statistics data.
* In the special cases, such as HBase Repartitioner uses the result of this method.
@@ -237,19 +184,6 @@ public abstract class StorageManager {
}
/**
- * Close StorageManager
- * @throws java.io.IOException
- */
- public static void close() throws IOException {
- synchronized(storageManagers) {
- for (StorageManager eachStorageManager: storageManagers.values()) {
- eachStorageManager.closeStorageManager();
- }
- }
- clearCache();
- }
-
- /**
* Returns the splits that will serve as input for the scan tasks. The
* number of splits matches the number of regions in a table.
*
@@ -263,85 +197,6 @@ public abstract class StorageManager {
}
/**
- * Returns FileStorageManager instance.
- *
- * @param tajoConf Tajo system property.
- * @return
- * @throws java.io.IOException
- */
- public static StorageManager getFileStorageManager(TajoConf tajoConf) throws IOException {
- return getStorageManager(tajoConf, "CSV");
- }
-
- /**
- * Returns the proper StorageManager instance according to the storeType.
- *
- * @param tajoConf Tajo system property.
- * @param storeType Storage type
- * @return
- * @throws java.io.IOException
- */
- public static StorageManager getStorageManager(TajoConf tajoConf, String storeType) throws IOException {
- FileSystem fileSystem = TajoConf.getWarehouseDir(tajoConf).getFileSystem(tajoConf);
- if (fileSystem != null) {
- return getStorageManager(tajoConf, storeType, fileSystem.getUri().toString());
- } else {
- return getStorageManager(tajoConf, storeType, null);
- }
- }
-
- /**
- * Returns the proper StorageManager instance according to the storeType
- *
- * @param tajoConf Tajo system property.
- * @param storeType Storage type
- * @param managerKey Key that can identify each storage manager(may be a path)
- * @return
- * @throws java.io.IOException
- */
- private static synchronized StorageManager getStorageManager (
- TajoConf tajoConf, String storeType, String managerKey) throws IOException {
-
- String typeName;
- if (storeType.equalsIgnoreCase("HBASE")) {
- typeName = "hbase";
- } else {
- typeName = "hdfs";
- }
-
- synchronized (storageManagers) {
- String storeKey = typeName + "_" + managerKey;
- StorageManager manager = storageManagers.get(storeKey);
-
- if (manager == null) {
- Class<? extends StorageManager> storageManagerClass =
- tajoConf.getClass(String.format("tajo.storage.manager.%s.class", typeName), null, StorageManager.class);
-
- if (storageManagerClass == null) {
- throw new IOException("Unknown Storage Type: " + typeName);
- }
-
- try {
- Constructor<? extends StorageManager> constructor =
- (Constructor<? extends StorageManager>) CONSTRUCTOR_CACHE.get(storageManagerClass);
- if (constructor == null) {
- constructor = storageManagerClass.getDeclaredConstructor(new Class<?>[]{String.class});
- constructor.setAccessible(true);
- CONSTRUCTOR_CACHE.put(storageManagerClass, constructor);
- }
- manager = constructor.newInstance(new Object[]{storeType});
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- manager.init(tajoConf);
- storageManagers.put(storeKey, manager);
- }
-
- return manager;
- }
- }
-
- /**
* Returns Scanner instance.
*
* @param meta The table meta
@@ -351,6 +206,7 @@ public abstract class StorageManager {
* @return Scanner instance
* @throws java.io.IOException
*/
+ @Override
public Scanner getScanner(TableMeta meta, Schema schema, FragmentProto fragment, Schema target) throws IOException {
return getScanner(meta, schema, FragmentConvertor.convert(conf, fragment), target);
}
@@ -364,6 +220,7 @@ public abstract class StorageManager {
* @return Scanner instance
* @throws java.io.IOException
*/
+ @Override
public Scanner getScanner(TableMeta meta, Schema schema, Fragment fragment) throws IOException {
return getScanner(meta, schema, fragment, schema);
}
@@ -378,6 +235,7 @@ public abstract class StorageManager {
* @return Scanner instance
* @throws java.io.IOException
*/
+ @Override
public Scanner getScanner(TableMeta meta, Schema schema, Fragment fragment, Schema target) throws IOException {
if (fragment.isEmpty()) {
Scanner scanner = new NullScanner(conf, schema, meta, fragment);
@@ -389,29 +247,13 @@ public abstract class StorageManager {
Scanner scanner;
Class<? extends Scanner> scannerClass = getScannerClass(meta.getStoreType());
- scanner = newScannerInstance(scannerClass, conf, schema, meta, fragment);
+ scanner = TableSpaceManager.newScannerInstance(scannerClass, conf, schema, meta, fragment);
scanner.setTarget(target.toArray());
return scanner;
}
/**
- * Returns Scanner instance.
- *
- * @param conf The system property
- * @param meta The table meta
- * @param schema The input schema
- * @param fragment The fragment for scanning
- * @param target The output schema
- * @return Scanner instance
- * @throws java.io.IOException
- */
- public static synchronized SeekableScanner getSeekableScanner(
- TajoConf conf, TableMeta meta, Schema schema, Fragment fragment, Schema target) throws IOException {
- return (SeekableScanner)getStorageManager(conf, meta.getStoreType()).getScanner(meta, schema, fragment, target);
- }
-
- /**
* Returns Appender instance.
* @param queryContext Query property.
* @param taskAttemptId Task id.
@@ -429,82 +271,23 @@ public abstract class StorageManager {
Class<? extends Appender> appenderClass;
String handlerName = meta.getStoreType().toLowerCase();
- appenderClass = APPENDER_HANDLER_CACHE.get(handlerName);
+ appenderClass = TableSpaceManager.APPENDER_HANDLER_CACHE.get(handlerName);
if (appenderClass == null) {
appenderClass = conf.getClass(
String.format("tajo.storage.appender-handler.%s.class", handlerName), null, Appender.class);
- APPENDER_HANDLER_CACHE.put(handlerName, appenderClass);
+ TableSpaceManager.APPENDER_HANDLER_CACHE.put(handlerName, appenderClass);
}
if (appenderClass == null) {
throw new IOException("Unknown Storage Type: " + meta.getStoreType());
}
- appender = newAppenderInstance(appenderClass, conf, taskAttemptId, meta, schema, workDir);
+ appender = TableSpaceManager.newAppenderInstance(appenderClass, conf, taskAttemptId, meta, schema, workDir);
return appender;
}
/**
- * Creates a scanner instance.
- *
- * @param theClass Concrete class of scanner
- * @param conf System property
- * @param schema Input schema
- * @param meta Table meta data
- * @param fragment The fragment for scanning
- * @param <T>
- * @return The scanner instance
- */
- public static <T> T newScannerInstance(Class<T> theClass, Configuration conf, Schema schema, TableMeta meta,
- Fragment fragment) {
- T result;
- try {
- Constructor<T> meth = (Constructor<T>) CONSTRUCTOR_CACHE.get(theClass);
- if (meth == null) {
- meth = theClass.getDeclaredConstructor(DEFAULT_SCANNER_PARAMS);
- meth.setAccessible(true);
- CONSTRUCTOR_CACHE.put(theClass, meth);
- }
- result = meth.newInstance(new Object[]{conf, schema, meta, fragment});
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
-
- return result;
- }
-
- /**
- * Creates a scanner instance.
- *
- * @param theClass Concrete class of scanner
- * @param conf System property
- * @param taskAttemptId Task id
- * @param meta Table meta data
- * @param schema Input schema
- * @param workDir Working directory
- * @param <T>
- * @return The scanner instance
- */
- public static <T> T newAppenderInstance(Class<T> theClass, Configuration conf, TaskAttemptId taskAttemptId,
- TableMeta meta, Schema schema, Path workDir) {
- T result;
- try {
- Constructor<T> meth = (Constructor<T>) CONSTRUCTOR_CACHE.get(theClass);
- if (meth == null) {
- meth = theClass.getDeclaredConstructor(DEFAULT_APPENDER_PARAMS);
- meth.setAccessible(true);
- CONSTRUCTOR_CACHE.put(theClass, meth);
- }
- result = meth.newInstance(new Object[]{conf, taskAttemptId, schema, meta, workDir});
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
-
- return result;
- }
-
- /**
* Return the Scanner class for the StoreType that is defined in storage-default.xml.
*
* @param storeType store type
@@ -513,11 +296,11 @@ public abstract class StorageManager {
*/
public Class<? extends Scanner> getScannerClass(String storeType) throws IOException {
String handlerName = storeType.toLowerCase();
- Class<? extends Scanner> scannerClass = SCANNER_HANDLER_CACHE.get(handlerName);
+ Class<? extends Scanner> scannerClass = TableSpaceManager.SCANNER_HANDLER_CACHE.get(handlerName);
if (scannerClass == null) {
scannerClass = conf.getClass(
String.format("tajo.storage.scanner-handler.%s.class", handlerName), null, Scanner.class);
- SCANNER_HANDLER_CACHE.put(handlerName, scannerClass);
+ TableSpaceManager.SCANNER_HANDLER_CACHE.put(handlerName, scannerClass);
}
if (scannerClass == null) {
@@ -550,6 +333,7 @@ public abstract class StorageManager {
* @param outSchema The output schema of select query for inserting.
* @throws java.io.IOException
*/
+ @Override
public void verifyInsertTableSchema(TableDesc tableDesc, Schema outSchema) throws IOException {
// nothing to do
}
@@ -563,7 +347,9 @@ public abstract class StorageManager {
* @return The list of storage specified rewrite rules
* @throws java.io.IOException
*/
- public List<LogicalPlanRewriteRule> getRewriteRules(OverridableConf queryContext, TableDesc tableDesc) throws IOException {
+ @Override
+ public List<LogicalPlanRewriteRule> getRewriteRules(OverridableConf queryContext, TableDesc tableDesc)
+ throws IOException {
return null;
}
@@ -580,375 +366,8 @@ public abstract class StorageManager {
* @return Saved path
* @throws java.io.IOException
*/
- public Path commitOutputData(OverridableConf queryContext, ExecutionBlockId finalEbId,
+ @Override
+ public abstract Path commitOutputData(OverridableConf queryContext, ExecutionBlockId finalEbId,
LogicalPlan plan, Schema schema,
- TableDesc tableDesc) throws IOException {
- return commitOutputData(queryContext, finalEbId, plan, schema, tableDesc, true);
- }
-
- /**
- * Finalizes result data. Tajo stores result data in the staging directory.
- * If the query fails, clean up the staging directory.
- * Otherwise the query is successful, move to the final directory from the staging directory.
- *
- * @param queryContext The query property
- * @param finalEbId The final execution block id
- * @param plan The query plan
- * @param schema The final output schema
- * @param tableDesc The description of the target table
- * @param changeFileSeq If true change result file name with max sequence.
- * @return Saved path
- * @throws java.io.IOException
- */
- protected Path commitOutputData(OverridableConf queryContext, ExecutionBlockId finalEbId,
- LogicalPlan plan, Schema schema,
- TableDesc tableDesc, boolean changeFileSeq) throws IOException {
- Path stagingDir = new Path(queryContext.get(QueryVars.STAGING_DIR));
- Path stagingResultDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME);
- Path finalOutputDir;
- if (!queryContext.get(QueryVars.OUTPUT_TABLE_PATH, "").isEmpty()) {
- finalOutputDir = new Path(queryContext.get(QueryVars.OUTPUT_TABLE_PATH));
- try {
- FileSystem fs = stagingResultDir.getFileSystem(conf);
-
- if (queryContext.getBool(QueryVars.OUTPUT_OVERWRITE, false)) { // INSERT OVERWRITE INTO
-
- // It moves the original table into the temporary location.
- // Then it moves the new result table into the original table location.
- // Upon failed, it recovers the original table if possible.
- boolean movedToOldTable = false;
- boolean committed = false;
- Path oldTableDir = new Path(stagingDir, TajoConstants.INSERT_OVERWIRTE_OLD_TABLE_NAME);
- ContentSummary summary = fs.getContentSummary(stagingResultDir);
-
- if (!queryContext.get(QueryVars.OUTPUT_PARTITIONS, "").isEmpty() && summary.getFileCount() > 0L) {
- // This is a map for existing non-leaf directory to rename. A key is current directory and a value is
- // renaming directory.
- Map<Path, Path> renameDirs = TUtil.newHashMap();
- // This is a map for recovering existing partition directory. A key is current directory and a value is
- // temporary directory to back up.
- Map<Path, Path> recoveryDirs = TUtil.newHashMap();
-
- try {
- if (!fs.exists(finalOutputDir)) {
- fs.mkdirs(finalOutputDir);
- }
-
- visitPartitionedDirectory(fs, stagingResultDir, finalOutputDir, stagingResultDir.toString(),
- renameDirs, oldTableDir);
-
- // Rename target partition directories
- for(Map.Entry<Path, Path> entry : renameDirs.entrySet()) {
- // Backup existing data files for recovering
- if (fs.exists(entry.getValue())) {
- String recoveryPathString = entry.getValue().toString().replaceAll(finalOutputDir.toString(),
- oldTableDir.toString());
- Path recoveryPath = new Path(recoveryPathString);
- fs.rename(entry.getValue(), recoveryPath);
- fs.exists(recoveryPath);
- recoveryDirs.put(entry.getValue(), recoveryPath);
- }
- // Delete existing directory
- fs.delete(entry.getValue(), true);
- // Rename staging directory to final output directory
- fs.rename(entry.getKey(), entry.getValue());
- }
-
- } catch (IOException ioe) {
- // Remove created dirs
- for(Map.Entry<Path, Path> entry : renameDirs.entrySet()) {
- fs.delete(entry.getValue(), true);
- }
-
- // Recovery renamed dirs
- for(Map.Entry<Path, Path> entry : recoveryDirs.entrySet()) {
- fs.delete(entry.getValue(), true);
- fs.rename(entry.getValue(), entry.getKey());
- }
-
- throw new IOException(ioe.getMessage());
- }
- } else { // no partition
- try {
-
- // if the final output dir exists, move all contents to the temporary table dir.
- // Otherwise, just make the final output dir. As a result, the final output dir will be empty.
- if (fs.exists(finalOutputDir)) {
- fs.mkdirs(oldTableDir);
-
- for (FileStatus status : fs.listStatus(finalOutputDir, StorageManager.hiddenFileFilter)) {
- fs.rename(status.getPath(), oldTableDir);
- }
-
- movedToOldTable = fs.exists(oldTableDir);
- } else { // if the parent does not exist, make its parent directory.
- fs.mkdirs(finalOutputDir);
- }
-
- // Move the results to the final output dir.
- for (FileStatus status : fs.listStatus(stagingResultDir)) {
- fs.rename(status.getPath(), finalOutputDir);
- }
-
- // Check the final output dir
- committed = fs.exists(finalOutputDir);
-
- } catch (IOException ioe) {
- // recover the old table
- if (movedToOldTable && !committed) {
-
- // if commit is failed, recover the old data
- for (FileStatus status : fs.listStatus(finalOutputDir, StorageManager.hiddenFileFilter)) {
- fs.delete(status.getPath(), true);
- }
-
- for (FileStatus status : fs.listStatus(oldTableDir)) {
- fs.rename(status.getPath(), finalOutputDir);
- }
- }
-
- throw new IOException(ioe.getMessage());
- }
- }
- } else {
- String queryType = queryContext.get(QueryVars.COMMAND_TYPE);
-
- if (queryType != null && queryType.equals(NodeType.INSERT.name())) { // INSERT INTO an existing table
-
- NumberFormat fmt = NumberFormat.getInstance();
- fmt.setGroupingUsed(false);
- fmt.setMinimumIntegerDigits(3);
-
- if (!queryContext.get(QueryVars.OUTPUT_PARTITIONS, "").isEmpty()) {
- for(FileStatus eachFile: fs.listStatus(stagingResultDir)) {
- if (eachFile.isFile()) {
- LOG.warn("Partition table can't have file in a staging dir: " + eachFile.getPath());
- continue;
- }
- moveResultFromStageToFinal(fs, stagingResultDir, eachFile, finalOutputDir, fmt, -1, changeFileSeq);
- }
- } else {
- int maxSeq = StorageUtil.getMaxFileSequence(fs, finalOutputDir, false) + 1;
- for(FileStatus eachFile: fs.listStatus(stagingResultDir)) {
- if (eachFile.getPath().getName().startsWith("_")) {
- continue;
- }
- moveResultFromStageToFinal(fs, stagingResultDir, eachFile, finalOutputDir, fmt, maxSeq++, changeFileSeq);
- }
- }
- // checking all file moved and remove empty dir
- verifyAllFileMoved(fs, stagingResultDir);
- FileStatus[] files = fs.listStatus(stagingResultDir);
- if (files != null && files.length != 0) {
- for (FileStatus eachFile: files) {
- LOG.error("There are some unmoved files in staging dir:" + eachFile.getPath());
- }
- }
- } else { // CREATE TABLE AS SELECT (CTAS)
- if (fs.exists(finalOutputDir)) {
- for (FileStatus status : fs.listStatus(stagingResultDir)) {
- fs.rename(status.getPath(), finalOutputDir);
- }
- } else {
- fs.rename(stagingResultDir, finalOutputDir);
- }
- LOG.info("Moved from the staging dir to the output directory '" + finalOutputDir);
- }
- }
-
- // remove the staging directory if the final output dir is given.
- Path stagingDirRoot = stagingDir.getParent();
- fs.delete(stagingDirRoot, true);
- } catch (Throwable t) {
- LOG.error(t);
- throw new IOException(t);
- }
- } else {
- finalOutputDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME);
- }
-
- return finalOutputDir;
- }
-
- /**
- * Attach the sequence number to the output file name and than move the file into the final result path.
- *
- * @param fs FileSystem
- * @param stagingResultDir The staging result dir
- * @param fileStatus The file status
- * @param finalOutputPath Final output path
- * @param nf Number format
- * @param fileSeq The sequence number
- * @throws java.io.IOException
- */
- private void moveResultFromStageToFinal(FileSystem fs, Path stagingResultDir,
- FileStatus fileStatus, Path finalOutputPath,
- NumberFormat nf,
- int fileSeq, boolean changeFileSeq) throws IOException {
- if (fileStatus.isDirectory()) {
- String subPath = extractSubPath(stagingResultDir, fileStatus.getPath());
- if (subPath != null) {
- Path finalSubPath = new Path(finalOutputPath, subPath);
- if (!fs.exists(finalSubPath)) {
- fs.mkdirs(finalSubPath);
- }
- int maxSeq = StorageUtil.getMaxFileSequence(fs, finalSubPath, false);
- for (FileStatus eachFile : fs.listStatus(fileStatus.getPath())) {
- if (eachFile.getPath().getName().startsWith("_")) {
- continue;
- }
- moveResultFromStageToFinal(fs, stagingResultDir, eachFile, finalOutputPath, nf, ++maxSeq, changeFileSeq);
- }
- } else {
- throw new IOException("Wrong staging dir:" + stagingResultDir + "," + fileStatus.getPath());
- }
- } else {
- String subPath = extractSubPath(stagingResultDir, fileStatus.getPath());
- if (subPath != null) {
- Path finalSubPath = new Path(finalOutputPath, subPath);
- if (changeFileSeq) {
- finalSubPath = new Path(finalSubPath.getParent(), replaceFileNameSeq(finalSubPath, fileSeq, nf));
- }
- if (!fs.exists(finalSubPath.getParent())) {
- fs.mkdirs(finalSubPath.getParent());
- }
- if (fs.exists(finalSubPath)) {
- throw new IOException("Already exists data file:" + finalSubPath);
- }
- boolean success = fs.rename(fileStatus.getPath(), finalSubPath);
- if (success) {
- LOG.info("Moving staging file[" + fileStatus.getPath() + "] + " +
- "to final output[" + finalSubPath + "]");
- } else {
- LOG.error("Can't move staging file[" + fileStatus.getPath() + "] + " +
- "to final output[" + finalSubPath + "]");
- }
- }
- }
- }
-
- /**
- * Removes the path of the parent.
- * @param parentPath
- * @param childPath
- * @return
- */
- private String extractSubPath(Path parentPath, Path childPath) {
- String parentPathStr = parentPath.toUri().getPath();
- String childPathStr = childPath.toUri().getPath();
-
- if (parentPathStr.length() > childPathStr.length()) {
- return null;
- }
-
- int index = childPathStr.indexOf(parentPathStr);
- if (index != 0) {
- return null;
- }
-
- return childPathStr.substring(parentPathStr.length() + 1);
- }
-
- /**
- * Attach the sequence number to a path.
- *
- * @param path Path
- * @param seq sequence number
- * @param nf Number format
- * @return New path attached with sequence number
- * @throws java.io.IOException
- */
- private String replaceFileNameSeq(Path path, int seq, NumberFormat nf) throws IOException {
- String[] tokens = path.getName().split("-");
- if (tokens.length != 4) {
- throw new IOException("Wrong result file name:" + path);
- }
- return tokens[0] + "-" + tokens[1] + "-" + tokens[2] + "-" + nf.format(seq);
- }
-
- /**
- * Make sure all files are moved.
- * @param fs FileSystem
- * @param stagingPath The stagind directory
- * @return
- * @throws java.io.IOException
- */
- private boolean verifyAllFileMoved(FileSystem fs, Path stagingPath) throws IOException {
- FileStatus[] files = fs.listStatus(stagingPath);
- if (files != null && files.length != 0) {
- for (FileStatus eachFile: files) {
- if (eachFile.isFile()) {
- LOG.error("There are some unmoved files in staging dir:" + eachFile.getPath());
- return false;
- } else {
- if (verifyAllFileMoved(fs, eachFile.getPath())) {
- fs.delete(eachFile.getPath(), false);
- } else {
- return false;
- }
- }
- }
- }
-
- return true;
- }
-
- /**
- * This method sets a rename map which includes renamed staging directory to final output directory recursively.
- * If there exists some data files, this delete it for duplicate data.
- *
- *
- * @param fs
- * @param stagingPath
- * @param outputPath
- * @param stagingParentPathString
- * @throws java.io.IOException
- */
- private void visitPartitionedDirectory(FileSystem fs, Path stagingPath, Path outputPath,
- String stagingParentPathString,
- Map<Path, Path> renameDirs, Path oldTableDir) throws IOException {
- FileStatus[] files = fs.listStatus(stagingPath);
-
- for(FileStatus eachFile : files) {
- if (eachFile.isDirectory()) {
- Path oldPath = eachFile.getPath();
-
- // Make recover directory.
- String recoverPathString = oldPath.toString().replaceAll(stagingParentPathString,
- oldTableDir.toString());
- Path recoveryPath = new Path(recoverPathString);
- if (!fs.exists(recoveryPath)) {
- fs.mkdirs(recoveryPath);
- }
-
- visitPartitionedDirectory(fs, eachFile.getPath(), outputPath, stagingParentPathString,
- renameDirs, oldTableDir);
- // Find last order partition for renaming
- String newPathString = oldPath.toString().replaceAll(stagingParentPathString,
- outputPath.toString());
- Path newPath = new Path(newPathString);
- if (!isLeafDirectory(fs, eachFile.getPath())) {
- renameDirs.put(eachFile.getPath(), newPath);
- } else {
- if (!fs.exists(newPath)) {
- fs.mkdirs(newPath);
- }
- }
- }
- }
- }
-
- private boolean isLeafDirectory(FileSystem fs, Path path) throws IOException {
- boolean retValue = false;
-
- FileStatus[] files = fs.listStatus(path);
- for (FileStatus file : files) {
- if (fs.isDirectory(file.getPath())) {
- retValue = true;
- break;
- }
- }
-
- return retValue;
- }
+ TableDesc tableDesc) throws IOException;
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/5491f0e7/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableSpace.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableSpace.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableSpace.java
new file mode 100644
index 0000000..ef4aa9a
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableSpace.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.OverridableConf;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.plan.LogicalPlan;
+import org.apache.tajo.plan.logical.ScanNode;
+import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule;
+import org.apache.tajo.storage.fragment.Fragment;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * It manages each tablespace; e.g., HDFS, Local file system, and Amazon S3.
+ */
+public interface TableSpace extends Closeable {
+ //public void format() throws IOException;
+
+ void createTable(TableDesc tableDesc, boolean ifNotExists) throws IOException;
+
+ void purgeTable(TableDesc tableDesc) throws IOException;
+
+ List<Fragment> getSplits(String fragmentId, TableDesc tableDesc, ScanNode scanNode) throws IOException;
+
+ List<Fragment> getSplits(String fragmentId, TableDesc tableDesc) throws IOException;
+
+// public void renameTable() throws IOException;
+//
+// public void truncateTable() throws IOException;
+//
+// public long availableCapacity() throws IOException;
+//
+// public long totalCapacity() throws IOException;
+
+ Scanner getScanner(TableMeta meta, Schema schema, CatalogProtos.FragmentProto fragment, Schema target) throws IOException;
+
+ Scanner getScanner(TableMeta meta, Schema schema, Fragment fragment) throws IOException;
+
+ Scanner getScanner(TableMeta meta, Schema schema, Fragment fragment, Schema target) throws IOException;
+
+ Path commitOutputData(OverridableConf queryContext, ExecutionBlockId finalEbId,
+ LogicalPlan plan, Schema schema,
+ TableDesc tableDesc) throws IOException;
+
+ void verifyInsertTableSchema(TableDesc tableDesc, Schema outSchema) throws IOException;
+
+ List<LogicalPlanRewriteRule> getRewriteRules(OverridableConf queryContext, TableDesc tableDesc) throws IOException;
+
+ void close() throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/5491f0e7/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableSpaceManager.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableSpaceManager.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableSpaceManager.java
new file mode 100644
index 0000000..42a5e07
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableSpaceManager.java
@@ -0,0 +1,254 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.TaskAttemptId;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.storage.fragment.Fragment;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * It handles available table spaces and cache TableSpace instances.
+ */
+public class TableSpaceManager {
+
+ /**
+ * Cache of scanner handlers for each storage type.
+ */
+ protected static final Map<String, Class<? extends Scanner>> SCANNER_HANDLER_CACHE
+ = new ConcurrentHashMap<String, Class<? extends Scanner>>();
+ /**
+ * Cache of appender handlers for each storage type.
+ */
+ protected static final Map<String, Class<? extends Appender>> APPENDER_HANDLER_CACHE
+ = new ConcurrentHashMap<String, Class<? extends Appender>>();
+ private static final Class<?>[] DEFAULT_SCANNER_PARAMS = {
+ Configuration.class,
+ Schema.class,
+ TableMeta.class,
+ Fragment.class
+ };
+ private static final Class<?>[] DEFAULT_APPENDER_PARAMS = {
+ Configuration.class,
+ TaskAttemptId.class,
+ Schema.class,
+ TableMeta.class,
+ Path.class
+ };
+ /**
+ * Cache of StorageManager.
+ * Key is manager key(warehouse path) + store type
+ */
+ private static final Map<String, StorageManager> storageManagers = Maps.newHashMap();
+ /**
+ * Cache of constructors for each class. Pins the classes so they
+ * can't be garbage collected until ReflectionUtils can be collected.
+ */
+ private static final Map<Class<?>, Constructor<?>> CONSTRUCTOR_CACHE =
+ new ConcurrentHashMap<Class<?>, Constructor<?>>();
+
+ /**
+ * Clear all class cache
+ */
+ @VisibleForTesting
+ protected synchronized static void clearCache() {
+ CONSTRUCTOR_CACHE.clear();
+ SCANNER_HANDLER_CACHE.clear();
+ APPENDER_HANDLER_CACHE.clear();
+ storageManagers.clear();
+ }
+
+ /**
+ * Close StorageManager
+ * @throws java.io.IOException
+ */
+ public static void shutdown() throws IOException {
+ synchronized(storageManagers) {
+ for (StorageManager eachStorageManager: storageManagers.values()) {
+ eachStorageManager.close();
+ }
+ }
+ clearCache();
+ }
+
+ /**
+ * Returns FileStorageManager instance.
+ *
+ * @param tajoConf Tajo system property.
+ * @return
+ * @throws IOException
+ */
+ public static StorageManager getFileStorageManager(TajoConf tajoConf) throws IOException {
+ return getStorageManager(tajoConf, "CSV");
+ }
+
+ /**
+ * Returns the proper StorageManager instance according to the storeType.
+ *
+ * @param tajoConf Tajo system property.
+ * @param storeType Storage type
+ * @return
+ * @throws IOException
+ */
+ public static StorageManager getStorageManager(TajoConf tajoConf, String storeType) throws IOException {
+ FileSystem fileSystem = TajoConf.getWarehouseDir(tajoConf).getFileSystem(tajoConf);
+ if (fileSystem != null) {
+ return getStorageManager(tajoConf, storeType, fileSystem.getUri().toString());
+ } else {
+ return getStorageManager(tajoConf, storeType, null);
+ }
+ }
+
+ /**
+ * Returns the proper StorageManager instance according to the storeType
+ *
+ * @param tajoConf Tajo system property.
+ * @param storeType Storage type
+ * @param managerKey Key that can identify each storage manager(may be a path)
+ * @return
+ * @throws IOException
+ */
+ private static synchronized StorageManager getStorageManager (
+ TajoConf tajoConf, String storeType, String managerKey) throws IOException {
+
+ String typeName;
+ if (storeType.equalsIgnoreCase("HBASE")) {
+ typeName = "hbase";
+ } else {
+ typeName = "hdfs";
+ }
+
+ synchronized (storageManagers) {
+ String storeKey = typeName + "_" + managerKey;
+ StorageManager manager = storageManagers.get(storeKey);
+
+ if (manager == null) {
+ Class<? extends StorageManager> storageManagerClass =
+ tajoConf.getClass(String.format("tajo.storage.manager.%s.class", typeName), null, StorageManager.class);
+
+ if (storageManagerClass == null) {
+ throw new IOException("Unknown Storage Type: " + typeName);
+ }
+
+ try {
+ Constructor<? extends StorageManager> constructor =
+ (Constructor<? extends StorageManager>) CONSTRUCTOR_CACHE.get(storageManagerClass);
+ if (constructor == null) {
+ constructor = storageManagerClass.getDeclaredConstructor(new Class<?>[]{String.class});
+ constructor.setAccessible(true);
+ CONSTRUCTOR_CACHE.put(storageManagerClass, constructor);
+ }
+ manager = constructor.newInstance(new Object[]{storeType});
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ manager.init(tajoConf);
+ storageManagers.put(storeKey, manager);
+ }
+
+ return manager;
+ }
+ }
+
+ /**
+ * Returns Scanner instance.
+ *
+ * @param conf The system property
+ * @param meta The table meta
+ * @param schema The input schema
+ * @param fragment The fragment for scanning
+ * @param target The output schema
+ * @return Scanner instance
+ * @throws IOException
+ */
+ public static synchronized SeekableScanner getSeekableScanner(
+ TajoConf conf, TableMeta meta, Schema schema, Fragment fragment, Schema target) throws IOException {
+ return (SeekableScanner)getStorageManager(conf, meta.getStoreType()).getScanner(meta, schema, fragment, target);
+ }
+
+ /**
+ * Creates a scanner instance.
+ *
+ * @param theClass Concrete class of scanner
+ * @param conf System property
+ * @param schema Input schema
+ * @param meta Table meta data
+ * @param fragment The fragment for scanning
+ * @param <T>
+ * @return The scanner instance
+ */
+ public static <T> T newScannerInstance(Class<T> theClass, Configuration conf, Schema schema, TableMeta meta,
+ Fragment fragment) {
+ T result;
+ try {
+ Constructor<T> meth = (Constructor<T>) CONSTRUCTOR_CACHE.get(theClass);
+ if (meth == null) {
+ meth = theClass.getDeclaredConstructor(DEFAULT_SCANNER_PARAMS);
+ meth.setAccessible(true);
+ CONSTRUCTOR_CACHE.put(theClass, meth);
+ }
+ result = meth.newInstance(new Object[]{conf, schema, meta, fragment});
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ return result;
+ }
+
+ /**
+ * Creates a scanner instance.
+ *
+ * @param theClass Concrete class of scanner
+ * @param conf System property
+ * @param taskAttemptId Task id
+ * @param meta Table meta data
+ * @param schema Input schema
+ * @param workDir Working directory
+ * @param <T>
+ * @return The scanner instance
+ */
+ public static <T> T newAppenderInstance(Class<T> theClass, Configuration conf, TaskAttemptId taskAttemptId,
+ TableMeta meta, Schema schema, Path workDir) {
+ T result;
+ try {
+ Constructor<T> meth = (Constructor<T>) CONSTRUCTOR_CACHE.get(theClass);
+ if (meth == null) {
+ meth = theClass.getDeclaredConstructor(DEFAULT_APPENDER_PARAMS);
+ meth.setAccessible(true);
+ CONSTRUCTOR_CACHE.put(theClass, meth);
+ }
+ result = meth.newInstance(new Object[]{conf, taskAttemptId, schema, meta, workDir});
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/5491f0e7/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java
index 7f3cb04..09a86b4 100644
--- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java
@@ -28,7 +28,7 @@ import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.TableMeta;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.datum.Datum;
-import org.apache.tajo.storage.StorageManager;
+import org.apache.tajo.storage.TableSpaceManager;
import org.apache.tajo.storage.Tuple;
import java.io.IOException;
@@ -47,7 +47,7 @@ public class HBasePutAppender extends AbstractHBaseAppender {
super.init();
Configuration hbaseConf = HBaseStorageManager.getHBaseConfiguration(conf, meta);
- HConnection hconn = ((HBaseStorageManager) StorageManager.getStorageManager((TajoConf)conf, "HBASE"))
+ HConnection hconn = ((HBaseStorageManager) TableSpaceManager.getStorageManager((TajoConf) conf, "HBASE"))
.getConnection(hbaseConf);
htable = hconn.getTable(columnMapping.getHbaseTableName());
htable.setAutoFlushTo(false);
http://git-wip-us.apache.org/repos/asf/tajo/blob/5491f0e7/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java
index df60bb3..24bfd4d 100644
--- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java
@@ -36,10 +36,7 @@ import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.datum.Datum;
import org.apache.tajo.datum.NullDatum;
import org.apache.tajo.datum.TextDatum;
-import org.apache.tajo.storage.Scanner;
-import org.apache.tajo.storage.StorageManager;
-import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.storage.*;
import org.apache.tajo.storage.fragment.Fragment;
import org.apache.tajo.util.BytesUtils;
@@ -184,7 +181,7 @@ public class HBaseScanner implements Scanner {
}
if (htable == null) {
- HConnection hconn = ((HBaseStorageManager)StorageManager.getStorageManager(conf, "HBASE"))
+ HConnection hconn = ((HBaseStorageManager) TableSpaceManager.getStorageManager(conf, "HBASE"))
.getConnection(hbaseConf);
htable = hconn.getTable(fragment.getHbaseTableName());
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/5491f0e7/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseStorageManager.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseStorageManager.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseStorageManager.java
index 5f0695c..3653574 100644
--- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseStorageManager.java
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseStorageManager.java
@@ -18,6 +18,7 @@
package org.apache.tajo.storage.hbase;
+import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -42,6 +43,7 @@ import org.apache.tajo.common.TajoDataTypes.Type;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.datum.Datum;
import org.apache.tajo.datum.TextDatum;
+import org.apache.tajo.exception.UnimplementedException;
import org.apache.tajo.plan.LogicalPlan;
import org.apache.tajo.plan.expr.*;
import org.apache.tajo.plan.logical.CreateTableNode;
@@ -78,7 +80,7 @@ public class HBaseStorageManager extends StorageManager {
}
@Override
- public void closeStorageManager() {
+ public void close() {
synchronized (connMap) {
for (HConnection eachConn: connMap.values()) {
try {
@@ -942,6 +944,8 @@ public class HBaseStorageManager extends StorageManager {
if (tableDesc == null) {
throw new IOException("TableDesc is null while calling loadIncrementalHFiles: " + finalEbId);
}
+ Preconditions.checkArgument(tableDesc.getName() != null && tableDesc.getPath() == null);
+
Path stagingDir = new Path(queryContext.get(QueryVars.STAGING_DIR));
Path stagingResultDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME);
@@ -960,29 +964,23 @@ public class HBaseStorageManager extends StorageManager {
}
committer.commitJob(jobContext);
- if (tableDesc.getName() == null && tableDesc.getPath() != null) {
-
- // insert into location
- return super.commitOutputData(queryContext, finalEbId, plan, schema, tableDesc, false);
- } else {
- // insert into table
- String tableName = tableDesc.getMeta().getOption(HBaseStorageConstants.META_TABLE_KEY);
+ // insert into table
+ String tableName = tableDesc.getMeta().getOption(HBaseStorageConstants.META_TABLE_KEY);
- HTable htable = new HTable(hbaseConf, tableName);
+ HTable htable = new HTable(hbaseConf, tableName);
+ try {
+ LoadIncrementalHFiles loadIncrementalHFiles = null;
try {
- LoadIncrementalHFiles loadIncrementalHFiles = null;
- try {
- loadIncrementalHFiles = new LoadIncrementalHFiles(hbaseConf);
- } catch (Exception e) {
- LOG.error(e.getMessage(), e);
- throw new IOException(e.getMessage(), e);
- }
- loadIncrementalHFiles.doBulkLoad(stagingResultDir, htable);
-
- return stagingResultDir;
- } finally {
- htable.close();
+ loadIncrementalHFiles = new LoadIncrementalHFiles(hbaseConf);
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ throw new IOException(e.getMessage(), e);
}
+ loadIncrementalHFiles.doBulkLoad(stagingResultDir, htable);
+
+ return stagingResultDir;
+ } finally {
+ htable.close();
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/5491f0e7/tajo-storage/tajo-storage-hbase/src/test/java/org/apache/tajo/storage/hbase/TestHBaseStorageManager.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/test/java/org/apache/tajo/storage/hbase/TestHBaseStorageManager.java b/tajo-storage/tajo-storage-hbase/src/test/java/org/apache/tajo/storage/hbase/TestHBaseStorageManager.java
index aa7aa28..39ccf44 100644
--- a/tajo-storage/tajo-storage-hbase/src/test/java/org/apache/tajo/storage/hbase/TestHBaseStorageManager.java
+++ b/tajo-storage/tajo-storage-hbase/src/test/java/org/apache/tajo/storage/hbase/TestHBaseStorageManager.java
@@ -19,14 +19,13 @@
package org.apache.tajo.storage.hbase;
import org.apache.tajo.catalog.Column;
-import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
import org.apache.tajo.common.TajoDataTypes.Type;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.datum.Datum;
import org.apache.tajo.datum.TextDatum;
import org.apache.tajo.plan.expr.*;
import org.apache.tajo.plan.logical.ScanNode;
-import org.apache.tajo.storage.StorageManager;
+import org.apache.tajo.storage.TableSpaceManager;
import org.apache.tajo.util.Pair;
import org.junit.Test;
@@ -48,7 +47,7 @@ public class TestHBaseStorageManager {
scanNode.setQual(evalNodeA);
HBaseStorageManager storageManager =
- (HBaseStorageManager) StorageManager.getStorageManager(new TajoConf(), "HBASE");
+ (HBaseStorageManager) TableSpaceManager.getStorageManager(new TajoConf(), "HBASE");
List<Set<EvalNode>> indexEvals = storageManager.findIndexablePredicateSet(scanNode, new Column[]{rowkeyColumn});
assertNotNull(indexEvals);
assertEquals(1, indexEvals.size());
http://git-wip-us.apache.org/repos/asf/tajo/blob/5491f0e7/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileAppender.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileAppender.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileAppender.java
index 3daed96..c041771 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileAppender.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileAppender.java
@@ -57,7 +57,7 @@ public abstract class FileAppender implements Appender {
throw new IllegalArgumentException("Configuration must be an instance of TajoConf");
}
- this.path = ((FileStorageManager)StorageManager.getFileStorageManager((TajoConf) conf))
+ this.path = ((FileStorageManager) TableSpaceManager.getFileStorageManager((TajoConf) conf))
.getAppenderFilePath(taskAttemptId, workDir);
} else {
this.path = workDir;
http://git-wip-us.apache.org/repos/asf/tajo/blob/5491f0e7/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileStorageManager.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileStorageManager.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileStorageManager.java
index 635dade..4efc3b7 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileStorageManager.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileStorageManager.java
@@ -27,17 +27,18 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DistributedFileSystem;
-import org.apache.tajo.OverridableConf;
-import org.apache.tajo.TajoConstants;
-import org.apache.tajo.TaskAttemptId;
+import org.apache.tajo.*;
import org.apache.tajo.catalog.*;
import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.plan.LogicalPlan;
import org.apache.tajo.plan.logical.LogicalNode;
+import org.apache.tajo.plan.logical.NodeType;
import org.apache.tajo.plan.logical.ScanNode;
import org.apache.tajo.storage.fragment.FileFragment;
import org.apache.tajo.storage.fragment.Fragment;
import org.apache.tajo.util.Bytes;
+import org.apache.tajo.util.TUtil;
import java.io.IOException;
import java.text.NumberFormat;
@@ -864,7 +865,7 @@ public class FileStorageManager extends StorageManager {
}
@Override
- public void closeStorageManager() {
+ public void close() {
}
@Override
@@ -876,6 +877,12 @@ public class FileStorageManager extends StorageManager {
}
@Override
+ public Path commitOutputData(OverridableConf queryContext, ExecutionBlockId finalEbId, LogicalPlan plan,
+ Schema schema, TableDesc tableDesc) throws IOException {
+ return commitOutputData(queryContext, true);
+ }
+
+ @Override
public TupleRange[] getInsertSortRanges(OverridableConf queryContext, TableDesc tableDesc,
Schema inputSchema, SortSpec[] sortSpecs, TupleRange dataRange)
throws IOException {
@@ -899,6 +906,366 @@ public class FileStorageManager extends StorageManager {
FileStatus status = fs.getFileStatus(path);
FileFragment fragment = new FileFragment(path.getName(), path, 0, status.getLen());
- return getSeekableScanner(conf, meta, schema, fragment, schema);
+ return TableSpaceManager.getSeekableScanner(conf, meta, schema, fragment, schema);
+ }
+
+ /**
+ * Finalizes result data. Tajo stores result data in the staging directory.
+ * If the query fails, clean up the staging directory.
+ * Otherwise the query is successful, move to the final directory from the staging directory.
+ *
+ * @param queryContext The query property
+ * @param changeFileSeq If true change result file name with max sequence.
+ * @return Saved path
+ * @throws java.io.IOException
+ */
+ protected Path commitOutputData(OverridableConf queryContext, boolean changeFileSeq) throws IOException {
+ Path stagingDir = new Path(queryContext.get(QueryVars.STAGING_DIR));
+ Path stagingResultDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME);
+ Path finalOutputDir;
+ if (!queryContext.get(QueryVars.OUTPUT_TABLE_PATH, "").isEmpty()) {
+ finalOutputDir = new Path(queryContext.get(QueryVars.OUTPUT_TABLE_PATH));
+ try {
+ FileSystem fs = stagingResultDir.getFileSystem(conf);
+
+ if (queryContext.getBool(QueryVars.OUTPUT_OVERWRITE, false)) { // INSERT OVERWRITE INTO
+
+ // It moves the original table into the temporary location.
+ // Then it moves the new result table into the original table location.
+ // Upon failed, it recovers the original table if possible.
+ boolean movedToOldTable = false;
+ boolean committed = false;
+ Path oldTableDir = new Path(stagingDir, TajoConstants.INSERT_OVERWIRTE_OLD_TABLE_NAME);
+ ContentSummary summary = fs.getContentSummary(stagingResultDir);
+
+ if (!queryContext.get(QueryVars.OUTPUT_PARTITIONS, "").isEmpty() && summary.getFileCount() > 0L) {
+ // This is a map for existing non-leaf directory to rename. A key is current directory and a value is
+ // renaming directory.
+ Map<Path, Path> renameDirs = TUtil.newHashMap();
+ // This is a map for recovering existing partition directory. A key is current directory and a value is
+ // temporary directory to back up.
+ Map<Path, Path> recoveryDirs = TUtil.newHashMap();
+
+ try {
+ if (!fs.exists(finalOutputDir)) {
+ fs.mkdirs(finalOutputDir);
+ }
+
+ visitPartitionedDirectory(fs, stagingResultDir, finalOutputDir, stagingResultDir.toString(),
+ renameDirs, oldTableDir);
+
+ // Rename target partition directories
+ for(Map.Entry<Path, Path> entry : renameDirs.entrySet()) {
+ // Backup existing data files for recovering
+ if (fs.exists(entry.getValue())) {
+ String recoveryPathString = entry.getValue().toString().replaceAll(finalOutputDir.toString(),
+ oldTableDir.toString());
+ Path recoveryPath = new Path(recoveryPathString);
+ fs.rename(entry.getValue(), recoveryPath);
+ fs.exists(recoveryPath);
+ recoveryDirs.put(entry.getValue(), recoveryPath);
+ }
+ // Delete existing directory
+ fs.delete(entry.getValue(), true);
+ // Rename staging directory to final output directory
+ fs.rename(entry.getKey(), entry.getValue());
+ }
+
+ } catch (IOException ioe) {
+ // Remove created dirs
+ for(Map.Entry<Path, Path> entry : renameDirs.entrySet()) {
+ fs.delete(entry.getValue(), true);
+ }
+
+ // Recovery renamed dirs
+ for(Map.Entry<Path, Path> entry : recoveryDirs.entrySet()) {
+ fs.delete(entry.getValue(), true);
+ fs.rename(entry.getValue(), entry.getKey());
+ }
+
+ throw new IOException(ioe.getMessage());
+ }
+ } else { // no partition
+ try {
+
+ // if the final output dir exists, move all contents to the temporary table dir.
+ // Otherwise, just make the final output dir. As a result, the final output dir will be empty.
+ if (fs.exists(finalOutputDir)) {
+ fs.mkdirs(oldTableDir);
+
+ for (FileStatus status : fs.listStatus(finalOutputDir, StorageManager.hiddenFileFilter)) {
+ fs.rename(status.getPath(), oldTableDir);
+ }
+
+ movedToOldTable = fs.exists(oldTableDir);
+ } else { // if the parent does not exist, make its parent directory.
+ fs.mkdirs(finalOutputDir);
+ }
+
+ // Move the results to the final output dir.
+ for (FileStatus status : fs.listStatus(stagingResultDir)) {
+ fs.rename(status.getPath(), finalOutputDir);
+ }
+
+ // Check the final output dir
+ committed = fs.exists(finalOutputDir);
+
+ } catch (IOException ioe) {
+ // recover the old table
+ if (movedToOldTable && !committed) {
+
+ // if commit is failed, recover the old data
+ for (FileStatus status : fs.listStatus(finalOutputDir, StorageManager.hiddenFileFilter)) {
+ fs.delete(status.getPath(), true);
+ }
+
+ for (FileStatus status : fs.listStatus(oldTableDir)) {
+ fs.rename(status.getPath(), finalOutputDir);
+ }
+ }
+
+ throw new IOException(ioe.getMessage());
+ }
+ }
+ } else {
+ String queryType = queryContext.get(QueryVars.COMMAND_TYPE);
+
+ if (queryType != null && queryType.equals(NodeType.INSERT.name())) { // INSERT INTO an existing table
+
+ NumberFormat fmt = NumberFormat.getInstance();
+ fmt.setGroupingUsed(false);
+ fmt.setMinimumIntegerDigits(3);
+
+ if (!queryContext.get(QueryVars.OUTPUT_PARTITIONS, "").isEmpty()) {
+ for(FileStatus eachFile: fs.listStatus(stagingResultDir)) {
+ if (eachFile.isFile()) {
+ LOG.warn("Partition table can't have file in a staging dir: " + eachFile.getPath());
+ continue;
+ }
+ moveResultFromStageToFinal(fs, stagingResultDir, eachFile, finalOutputDir, fmt, -1, changeFileSeq);
+ }
+ } else {
+ int maxSeq = StorageUtil.getMaxFileSequence(fs, finalOutputDir, false) + 1;
+ for(FileStatus eachFile: fs.listStatus(stagingResultDir)) {
+ if (eachFile.getPath().getName().startsWith("_")) {
+ continue;
+ }
+ moveResultFromStageToFinal(fs, stagingResultDir, eachFile, finalOutputDir, fmt, maxSeq++, changeFileSeq);
+ }
+ }
+ // checking all file moved and remove empty dir
+ verifyAllFileMoved(fs, stagingResultDir);
+ FileStatus[] files = fs.listStatus(stagingResultDir);
+ if (files != null && files.length != 0) {
+ for (FileStatus eachFile: files) {
+ LOG.error("There are some unmoved files in staging dir:" + eachFile.getPath());
+ }
+ }
+ } else { // CREATE TABLE AS SELECT (CTAS)
+ if (fs.exists(finalOutputDir)) {
+ for (FileStatus status : fs.listStatus(stagingResultDir)) {
+ fs.rename(status.getPath(), finalOutputDir);
+ }
+ } else {
+ fs.rename(stagingResultDir, finalOutputDir);
+ }
+ LOG.info("Moved from the staging dir to the output directory '" + finalOutputDir);
+ }
+ }
+
+ // remove the staging directory if the final output dir is given.
+ Path stagingDirRoot = stagingDir.getParent();
+ fs.delete(stagingDirRoot, true);
+ } catch (Throwable t) {
+ LOG.error(t);
+ throw new IOException(t);
+ }
+ } else {
+ finalOutputDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME);
+ }
+
+ return finalOutputDir;
+ }
+
+ /**
+ * Attach the sequence number to the output file name and than move the file into the final result path.
+ *
+ * @param fs FileSystem
+ * @param stagingResultDir The staging result dir
+ * @param fileStatus The file status
+ * @param finalOutputPath Final output path
+ * @param nf Number format
+ * @param fileSeq The sequence number
+ * @throws java.io.IOException
+ */
+ private void moveResultFromStageToFinal(FileSystem fs, Path stagingResultDir,
+ FileStatus fileStatus, Path finalOutputPath,
+ NumberFormat nf,
+ int fileSeq, boolean changeFileSeq) throws IOException {
+ if (fileStatus.isDirectory()) {
+ String subPath = extractSubPath(stagingResultDir, fileStatus.getPath());
+ if (subPath != null) {
+ Path finalSubPath = new Path(finalOutputPath, subPath);
+ if (!fs.exists(finalSubPath)) {
+ fs.mkdirs(finalSubPath);
+ }
+ int maxSeq = StorageUtil.getMaxFileSequence(fs, finalSubPath, false);
+ for (FileStatus eachFile : fs.listStatus(fileStatus.getPath())) {
+ if (eachFile.getPath().getName().startsWith("_")) {
+ continue;
+ }
+ moveResultFromStageToFinal(fs, stagingResultDir, eachFile, finalOutputPath, nf, ++maxSeq, changeFileSeq);
+ }
+ } else {
+ throw new IOException("Wrong staging dir:" + stagingResultDir + "," + fileStatus.getPath());
+ }
+ } else {
+ String subPath = extractSubPath(stagingResultDir, fileStatus.getPath());
+ if (subPath != null) {
+ Path finalSubPath = new Path(finalOutputPath, subPath);
+ if (changeFileSeq) {
+ finalSubPath = new Path(finalSubPath.getParent(), replaceFileNameSeq(finalSubPath, fileSeq, nf));
+ }
+ if (!fs.exists(finalSubPath.getParent())) {
+ fs.mkdirs(finalSubPath.getParent());
+ }
+ if (fs.exists(finalSubPath)) {
+ throw new IOException("Already exists data file:" + finalSubPath);
+ }
+ boolean success = fs.rename(fileStatus.getPath(), finalSubPath);
+ if (success) {
+ LOG.info("Moving staging file[" + fileStatus.getPath() + "] + " +
+ "to final output[" + finalSubPath + "]");
+ } else {
+ LOG.error("Can't move staging file[" + fileStatus.getPath() + "] + " +
+ "to final output[" + finalSubPath + "]");
+ }
+ }
+ }
+ }
+
+ /**
+ * Removes the path of the parent.
+ * @param parentPath
+ * @param childPath
+ * @return
+ */
+ private String extractSubPath(Path parentPath, Path childPath) {
+ String parentPathStr = parentPath.toUri().getPath();
+ String childPathStr = childPath.toUri().getPath();
+
+ if (parentPathStr.length() > childPathStr.length()) {
+ return null;
+ }
+
+ int index = childPathStr.indexOf(parentPathStr);
+ if (index != 0) {
+ return null;
+ }
+
+ return childPathStr.substring(parentPathStr.length() + 1);
+ }
+
+ /**
+ * Attach the sequence number to a path.
+ *
+ * @param path Path
+ * @param seq sequence number
+ * @param nf Number format
+ * @return New path attached with sequence number
+ * @throws java.io.IOException
+ */
+ private String replaceFileNameSeq(Path path, int seq, NumberFormat nf) throws IOException {
+ String[] tokens = path.getName().split("-");
+ if (tokens.length != 4) {
+ throw new IOException("Wrong result file name:" + path);
+ }
+ return tokens[0] + "-" + tokens[1] + "-" + tokens[2] + "-" + nf.format(seq);
+ }
+
+ /**
+ * Make sure all files are moved.
+ * @param fs FileSystem
+ * @param stagingPath The stagind directory
+ * @return
+ * @throws java.io.IOException
+ */
+ private boolean verifyAllFileMoved(FileSystem fs, Path stagingPath) throws IOException {
+ FileStatus[] files = fs.listStatus(stagingPath);
+ if (files != null && files.length != 0) {
+ for (FileStatus eachFile: files) {
+ if (eachFile.isFile()) {
+ LOG.error("There are some unmoved files in staging dir:" + eachFile.getPath());
+ return false;
+ } else {
+ if (verifyAllFileMoved(fs, eachFile.getPath())) {
+ fs.delete(eachFile.getPath(), false);
+ } else {
+ return false;
+ }
+ }
+ }
+ }
+
+ return true;
+ }
+
+ /**
+ * This method sets a rename map which includes renamed staging directory to final output directory recursively.
+ * If there exists some data files, this delete it for duplicate data.
+ *
+ *
+ * @param fs
+ * @param stagingPath
+ * @param outputPath
+ * @param stagingParentPathString
+ * @throws java.io.IOException
+ */
+ private void visitPartitionedDirectory(FileSystem fs, Path stagingPath, Path outputPath,
+ String stagingParentPathString,
+ Map<Path, Path> renameDirs, Path oldTableDir) throws IOException {
+ FileStatus[] files = fs.listStatus(stagingPath);
+
+ for(FileStatus eachFile : files) {
+ if (eachFile.isDirectory()) {
+ Path oldPath = eachFile.getPath();
+
+ // Make recover directory.
+ String recoverPathString = oldPath.toString().replaceAll(stagingParentPathString,
+ oldTableDir.toString());
+ Path recoveryPath = new Path(recoverPathString);
+ if (!fs.exists(recoveryPath)) {
+ fs.mkdirs(recoveryPath);
+ }
+
+ visitPartitionedDirectory(fs, eachFile.getPath(), outputPath, stagingParentPathString,
+ renameDirs, oldTableDir);
+ // Find last order partition for renaming
+ String newPathString = oldPath.toString().replaceAll(stagingParentPathString,
+ outputPath.toString());
+ Path newPath = new Path(newPathString);
+ if (!isLeafDirectory(fs, eachFile.getPath())) {
+ renameDirs.put(eachFile.getPath(), newPath);
+ } else {
+ if (!fs.exists(newPath)) {
+ fs.mkdirs(newPath);
+ }
+ }
+ }
+ }
+ }
+
+ private boolean isLeafDirectory(FileSystem fs, Path path) throws IOException {
+ boolean retValue = false;
+
+ FileStatus[] files = fs.listStatus(path);
+ for (FileStatus file : files) {
+ if (fs.isDirectory(file.getPath())) {
+ retValue = true;
+ break;
+ }
+ }
+
+ return retValue;
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/5491f0e7/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java
index 4635b76..1846ed6 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java
@@ -30,11 +30,9 @@ import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.TableMeta;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.conf.TajoConf.ConfVars;
-import org.apache.tajo.util.Pair;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -84,7 +82,7 @@ public class HashShuffleAppenderManager {
if (!fs.exists(dataFile.getParent())) {
fs.mkdirs(dataFile.getParent());
}
- FileAppender appender = (FileAppender)((FileStorageManager)StorageManager.getFileStorageManager(tajoConf))
+ FileAppender appender = (FileAppender)((FileStorageManager) TableSpaceManager.getFileStorageManager(tajoConf))
.getAppender(meta, outSchema, dataFile);
appender.enableStats();
appender.init();
http://git-wip-us.apache.org/repos/asf/tajo/blob/5491f0e7/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
index 68a2cf2..779f908 100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
@@ -30,7 +30,6 @@ import org.apache.hadoop.util.NativeCodeLoader;
import org.apache.tajo.catalog.CatalogUtil;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.common.TajoDataTypes.Type;
import org.apache.tajo.conf.TajoConf;
@@ -123,7 +122,7 @@ public class TestCompressionStorages {
String fileName = "Compression_" + codec.getSimpleName();
Path tablePath = new Path(testDir, fileName);
- Appender appender = ((FileStorageManager)StorageManager.getFileStorageManager(conf)).getAppender(meta, schema, tablePath);
+ Appender appender = ((FileStorageManager) TableSpaceManager.getFileStorageManager(conf)).getAppender(meta, schema, tablePath);
appender.enableStats();
appender.init();
@@ -155,7 +154,7 @@ public class TestCompressionStorages {
FileFragment[] tablets = new FileFragment[1];
tablets[0] = new FileFragment(fileName, tablePath, 0, fileLen);
- Scanner scanner = StorageManager.getFileStorageManager(conf).getScanner(meta, schema, tablets[0], schema);
+ Scanner scanner = TableSpaceManager.getFileStorageManager(conf).getScanner(meta, schema, tablets[0], schema);
if (storeType.equalsIgnoreCase("CSV")) {
if (SplittableCompressionCodec.class.isAssignableFrom(codec)) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/5491f0e7/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestDelimitedTextFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestDelimitedTextFile.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestDelimitedTextFile.java
index 6e15c51..2260d2a 100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestDelimitedTextFile.java
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestDelimitedTextFile.java
@@ -26,7 +26,6 @@ import org.apache.hadoop.fs.Path;
import org.apache.tajo.catalog.CatalogUtil;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.common.TajoDataTypes.Type;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.datum.Datum;
@@ -104,7 +103,7 @@ public class TestDelimitedTextFile {
TableMeta meta = CatalogUtil.newTableMeta("JSON");
meta.putOption(StorageUtil.TEXT_ERROR_TOLERANCE_MAXNUM, "-1");
FileFragment fragment = getFileFragment("testErrorTolerance1.json");
- Scanner scanner = StorageManager.getFileStorageManager(conf).getScanner(meta, schema, fragment);
+ Scanner scanner = TableSpaceManager.getFileStorageManager(conf).getScanner(meta, schema, fragment);
scanner.init();
Tuple tuple;
@@ -126,7 +125,7 @@ public class TestDelimitedTextFile {
TableMeta meta = CatalogUtil.newTableMeta("JSON");
meta.putOption(StorageUtil.TEXT_ERROR_TOLERANCE_MAXNUM, "1");
FileFragment fragment = getFileFragment("testErrorTolerance1.json");
- Scanner scanner = StorageManager.getFileStorageManager(conf).getScanner(meta, schema, fragment);
+ Scanner scanner = TableSpaceManager.getFileStorageManager(conf).getScanner(meta, schema, fragment);
scanner.init();
assertNotNull(scanner.next());
@@ -148,7 +147,7 @@ public class TestDelimitedTextFile {
TableMeta meta = CatalogUtil.newTableMeta("JSON");
meta.putOption(StorageUtil.TEXT_ERROR_TOLERANCE_MAXNUM, "0");
FileFragment fragment = getFileFragment("testErrorTolerance2.json");
- Scanner scanner = StorageManager.getFileStorageManager(conf).getScanner(meta, schema, fragment);
+ Scanner scanner = TableSpaceManager.getFileStorageManager(conf).getScanner(meta, schema, fragment);
scanner.init();
try {
@@ -167,7 +166,7 @@ public class TestDelimitedTextFile {
TableMeta meta = CatalogUtil.newTableMeta("JSON");
meta.putOption(StorageUtil.TEXT_ERROR_TOLERANCE_MAXNUM, "1");
FileFragment fragment = getFileFragment("testErrorTolerance3.json");
- Scanner scanner = StorageManager.getFileStorageManager(conf).getScanner(meta, schema, fragment);
+ Scanner scanner = TableSpaceManager.getFileStorageManager(conf).getScanner(meta, schema, fragment);
scanner.init();
try {
http://git-wip-us.apache.org/repos/asf/tajo/blob/5491f0e7/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileStorageManager.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileStorageManager.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileStorageManager.java
index 7d5eee1..41c6c67 100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileStorageManager.java
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileStorageManager.java
@@ -26,7 +26,6 @@ import org.apache.hadoop.hdfs.*;
import org.apache.tajo.catalog.CatalogUtil;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
import org.apache.tajo.common.TajoDataTypes.Type;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.datum.Datum;
@@ -81,7 +80,7 @@ public class TestFileStorageManager {
Path path = StorageUtil.concatPath(testDir, "testGetScannerAndAppender", "table.csv");
fs.mkdirs(path.getParent());
- FileStorageManager fileStorageManager = (FileStorageManager)StorageManager.getFileStorageManager(conf);
+ FileStorageManager fileStorageManager = (FileStorageManager) TableSpaceManager.getFileStorageManager(conf);
assertEquals(fs.getUri(), fileStorageManager.getFileSystem().getUri());
Appender appender = fileStorageManager.getAppender(meta, schema, path);
@@ -128,7 +127,7 @@ public class TestFileStorageManager {
}
assertTrue(fs.exists(tablePath));
- FileStorageManager sm = (FileStorageManager)StorageManager.getFileStorageManager(tajoConf);
+ FileStorageManager sm = (FileStorageManager) TableSpaceManager.getFileStorageManager(tajoConf);
assertEquals(fs.getUri(), sm.getFileSystem().getUri());
Schema schema = new Schema();
@@ -182,7 +181,7 @@ public class TestFileStorageManager {
DFSTestUtil.createFile(fs, tmpFile, 10, (short) 2, 0xDEADDEADl);
}
assertTrue(fs.exists(tablePath));
- FileStorageManager sm = (FileStorageManager)StorageManager.getFileStorageManager(tajoConf);
+ FileStorageManager sm = (FileStorageManager) TableSpaceManager.getFileStorageManager(tajoConf);
assertEquals(fs.getUri(), sm.getFileSystem().getUri());
Schema schema = new Schema();
@@ -221,11 +220,11 @@ public class TestFileStorageManager {
try {
/* Local FileSystem */
- FileStorageManager sm = (FileStorageManager)StorageManager.getStorageManager(conf, "CSV");
+ FileStorageManager sm = (FileStorageManager) TableSpaceManager.getStorageManager(conf, "CSV");
assertEquals(fs.getUri(), sm.getFileSystem().getUri());
/* Distributed FileSystem */
- sm = (FileStorageManager)StorageManager.getStorageManager(tajoConf, "CSV");
+ sm = (FileStorageManager) TableSpaceManager.getStorageManager(tajoConf, "CSV");
assertNotEquals(fs.getUri(), sm.getFileSystem().getUri());
assertEquals(cluster.getFileSystem().getUri(), sm.getFileSystem().getUri());
} finally {
http://git-wip-us.apache.org/repos/asf/tajo/blob/5491f0e7/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileSystems.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileSystems.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileSystems.java
index b4a60fc..1222fae 100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileSystems.java
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileSystems.java
@@ -25,7 +25,6 @@ import org.apache.hadoop.fs.Path;
import org.apache.tajo.catalog.CatalogUtil;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
import org.apache.tajo.common.TajoDataTypes.Type;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.datum.Datum;
@@ -58,7 +57,7 @@ public class TestFileSystems {
public TestFileSystems(FileSystem fs) throws IOException {
this.fs = fs;
this.conf = new TajoConf(fs.getConf());
- sm = (FileStorageManager)StorageManager.getFileStorageManager(conf);
+ sm = (FileStorageManager) TableSpaceManager.getFileStorageManager(conf);
testDir = getTestDir(this.fs, TEST_PATH);
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/5491f0e7/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestLineReader.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestLineReader.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestLineReader.java
index 1078b84..266f906 100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestLineReader.java
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestLineReader.java
@@ -28,7 +28,6 @@ import org.apache.hadoop.io.compress.DeflateCodec;
import org.apache.tajo.catalog.CatalogUtil;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
import org.apache.tajo.common.TajoDataTypes.Type;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.datum.DatumFactory;
@@ -66,7 +65,7 @@ public class TestLineReader {
TableMeta meta = CatalogUtil.newTableMeta("TEXT");
Path tablePath = new Path(testDir, "line.data");
- FileAppender appender = (FileAppender) StorageManager.getFileStorageManager(conf).getAppender(
+ FileAppender appender = (FileAppender) TableSpaceManager.getFileStorageManager(conf).getAppender(
null, null, meta, schema, tablePath);
appender.enableStats();
appender.init();
@@ -119,7 +118,7 @@ public class TestLineReader {
meta.putOption("compression.codec", DeflateCodec.class.getCanonicalName());
Path tablePath = new Path(testDir, "testLineDelimitedReaderWithCompression." + DeflateCodec.class.getSimpleName());
- FileAppender appender = (FileAppender) StorageManager.getFileStorageManager(conf).getAppender(
+ FileAppender appender = (FileAppender) TableSpaceManager.getFileStorageManager(conf).getAppender(
null, null, meta, schema, tablePath);
appender.enableStats();
appender.init();
@@ -177,7 +176,7 @@ public class TestLineReader {
TableMeta meta = CatalogUtil.newTableMeta("TEXT");
Path tablePath = new Path(testDir, "testLineDelimitedReader");
- FileAppender appender = (FileAppender) StorageManager.getFileStorageManager(conf).getAppender(
+ FileAppender appender = (FileAppender) TableSpaceManager.getFileStorageManager(conf).getAppender(
null, null, meta, schema, tablePath);
appender.enableStats();
appender.init();
@@ -280,7 +279,7 @@ public class TestLineReader {
TableMeta meta = CatalogUtil.newTableMeta("TEXT");
Path tablePath = new Path(testDir, "testSeekableByteBufLineReader.data");
- FileAppender appender = (FileAppender) StorageManager.getFileStorageManager(conf).getAppender(
+ FileAppender appender = (FileAppender) TableSpaceManager.getFileStorageManager(conf).getAppender(
null, null, meta, schema, tablePath);
appender.enableStats();
appender.init();
http://git-wip-us.apache.org/repos/asf/tajo/blob/5491f0e7/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestMergeScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestMergeScanner.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestMergeScanner.java
index 2c856e1..82acaf3 100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestMergeScanner.java
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestMergeScanner.java
@@ -24,7 +24,6 @@ import org.apache.hadoop.fs.Path;
import org.apache.tajo.catalog.CatalogUtil;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.common.TajoDataTypes.Type;
import org.apache.tajo.conf.TajoConf;
@@ -95,7 +94,7 @@ public class TestMergeScanner {
conf.setStrings("tajo.storage.projectable-scanner", "rcfile", "parquet", "avro");
testDir = CommonTestingUtil.getTestDir(TEST_PATH);
fs = testDir.getFileSystem(conf);
- sm = StorageManager.getFileStorageManager(conf);
+ sm = TableSpaceManager.getFileStorageManager(conf);
}
@Test
@@ -115,7 +114,7 @@ public class TestMergeScanner {
}
Path table1Path = new Path(testDir, storeType + "_1.data");
- Appender appender1 = StorageManager.getFileStorageManager(conf).getAppender(null, null, meta, schema, table1Path);
+ Appender appender1 = TableSpaceManager.getFileStorageManager(conf).getAppender(null, null, meta, schema, table1Path);
appender1.enableStats();
appender1.init();
int tupleNum = 10000;
@@ -137,7 +136,7 @@ public class TestMergeScanner {
}
Path table2Path = new Path(testDir, storeType + "_2.data");
- Appender appender2 = StorageManager.getFileStorageManager(conf).getAppender(null, null, meta, schema, table2Path);
+ Appender appender2 = TableSpaceManager.getFileStorageManager(conf).getAppender(null, null, meta, schema, table2Path);
appender2.enableStats();
appender2.init();