You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2013/10/21 08:38:17 UTC
[1/3] TAJO-261: Rearrange default port numbers and config names.
(hyunsik)
Updated Branches:
refs/heads/master 93b435d28 -> d51283284
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d5128328/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/RawFile.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/RawFile.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/RawFile.java
index d48ddf4..d89a2fc 100644
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/RawFile.java
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/RawFile.java
@@ -19,7 +19,6 @@
package org.apache.tajo.storage;
import com.google.protobuf.CodedInputStream;
-import com.google.protobuf.CodedOutputStream;
import com.google.protobuf.Message;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -28,7 +27,9 @@ import org.apache.hadoop.fs.Path;
import org.apache.tajo.catalog.TableMeta;
import org.apache.tajo.catalog.statistics.TableStat;
import org.apache.tajo.common.TajoDataTypes.DataType;
-import org.apache.tajo.datum.*;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.datum.ProtobufDatumFactory;
import org.apache.tajo.util.BitArray;
import java.io.File;
@@ -191,21 +192,30 @@ public class RawFile {
tuple.put(i, DatumFactory.createFloat8(buffer.getDouble()));
break;
- case TEXT : {
+ case TEXT :
// TODO - shoud use CharsetEncoder / CharsetDecoder
- byte [] rawBytes = getColumnBytes();
- tuple.put(i, DatumFactory.createText(new String(rawBytes)));
+ //byte [] rawBytes = getColumnBytes();
+ int strSize2 = buffer.getInt();
+ byte [] strBytes2 = new byte[strSize2];
+ buffer.get(strBytes2);
+ tuple.put(i, DatumFactory.createText(new String(strBytes2)));
break;
- }
case BLOB : {
- byte [] rawBytes = getColumnBytes();
+ //byte [] rawBytes = getColumnBytes();
+ int byteSize = buffer.getInt();
+ byte [] rawBytes = new byte[byteSize];
+ buffer.get(rawBytes);
tuple.put(i, DatumFactory.createBlob(rawBytes));
break;
}
case PROTOBUF: {
- byte [] rawBytes = getColumnBytes();
+ //byte [] rawBytes = getColumnBytes();
+ int byteSize = buffer.getInt();
+ byte [] rawBytes = new byte[byteSize];
+ buffer.get(rawBytes);
+
ProtobufDatumFactory factory = ProtobufDatumFactory.get(columnTypes[i]);
Message.Builder builder = factory.newBuilder();
builder.mergeFrom(rawBytes);
@@ -427,19 +437,42 @@ public class RawFile {
break;
case TEXT:
- case BLOB:
+ byte [] strBytes2 = t.get(i).asByteArray();
+ if (flushBufferAndReplace(recordOffset, strBytes2.length + 4)) {
+ recordOffset = 0;
+ }
+ buffer.putInt(strBytes2.length);
+ buffer.put(strBytes2);
+ break;
+
+ case BLOB : {
+ byte [] rawBytes = t.get(i).asByteArray();
+ if (flushBufferAndReplace(recordOffset, rawBytes.length + 4)) {
+ recordOffset = 0;
+ }
+ buffer.putInt(rawBytes.length);
+ buffer.put(rawBytes);
+ break;
+ }
+
case PROTOBUF: {
- byte [] lengthByte = new byte[4];
- byte [] byteArray = t.get(i).asByteArray();
- CodedOutputStream outputStream = CodedOutputStream.newInstance(lengthByte);
- outputStream.writeUInt32NoTag(byteArray.length);
- outputStream.flush();
- int legnthByteLength = CodedOutputStream.computeInt32SizeNoTag(byteArray.length);
- if (flushBufferAndReplace(recordOffset, byteArray.length + legnthByteLength)) {
+ // TODO - to be fixed
+// byte [] lengthByte = new byte[4];
+// byte [] byteArray = t.get(i).asByteArray();
+// CodedOutputStream outputStream = CodedOutputStream.newInstance(lengthByte);
+// outputStream.writeUInt32NoTag(byteArray.length);
+// outputStream.flush();
+// int legnthByteLength = CodedOutputStream.computeInt32SizeNoTag(byteArray.length);
+// if (flushBufferAndReplace(recordOffset, byteArray.length + legnthByteLength)) {
+// recordOffset = 0;
+// }
+// buffer.put(lengthByte, 0, legnthByteLength);
+ byte [] rawBytes = t.get(i).asByteArray();
+ if (flushBufferAndReplace(recordOffset, rawBytes.length + 4)) {
recordOffset = 0;
}
- buffer.put(lengthByte, 0, legnthByteLength);
- buffer.put(byteArray);
+ buffer.putInt(rawBytes.length);
+ buffer.put(rawBytes);
break;
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d5128328/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/StorageManagerFactory.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/StorageManagerFactory.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/StorageManagerFactory.java
index 8b7c2ca..5d6a298 100644
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/StorageManagerFactory.java
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/StorageManagerFactory.java
@@ -18,6 +18,9 @@
package org.apache.tajo.storage;
+import com.google.common.collect.Maps;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -28,49 +31,49 @@ import org.apache.tajo.storage.v2.StorageManagerV2;
import java.io.IOException;
import java.net.URI;
-import java.util.HashMap;
import java.util.Map;
+import static org.apache.tajo.conf.TajoConf.ConfVars;
+
public class StorageManagerFactory {
- private static Map<String, AbstractStorageManager> storageManagers =
- new HashMap<String, AbstractStorageManager>();
+ private static final Map<String, AbstractStorageManager> storageManagers = Maps.newConcurrentMap();
public static AbstractStorageManager getStorageManager(TajoConf conf) throws IOException {
return getStorageManager(conf, null);
}
public static synchronized AbstractStorageManager getStorageManager (
- TajoConf conf, Path dataRoot) throws IOException {
- return getStorageManager(conf, dataRoot, conf.getBoolean("tajo.storage.manager.v2", false));
+ TajoConf conf, Path warehouseDir) throws IOException {
+ return getStorageManager(conf, warehouseDir, conf.getBoolVar(ConfVars.STORAGE_MANAGER_VERSION_2));
}
private static synchronized AbstractStorageManager getStorageManager (
- TajoConf conf, Path dataRoot, boolean v2) throws IOException {
- if(dataRoot != null) {
- conf.setVar(TajoConf.ConfVars.ROOT_DIR, dataRoot.toString());
- }
+ TajoConf conf, Path warehouseDir, boolean v2) throws IOException {
URI uri;
- if(dataRoot == null) {
- uri = FileSystem.get(conf).getUri();
- } else {
- uri = dataRoot.toUri();
+ TajoConf localConf = new TajoConf(conf);
+ if (warehouseDir != null) {
+ localConf.setVar(ConfVars.WAREHOUSE_DIR, warehouseDir.toUri().toString());
}
- String key = "file".equals(uri.getScheme()) ? "file" : uri.getScheme() + uri.getHost() + uri.getPort();
+
+ uri = TajoConf.getWarehouseDir(localConf).toUri();
+
+ String key = "file".equals(uri.getScheme()) ? "file" : uri.toString();
if(v2) {
key += "_v2";
}
if(storageManagers.containsKey(key)) {
- return storageManagers.get(key);
+ AbstractStorageManager sm = storageManagers.get(key);
+ return sm;
} else {
- AbstractStorageManager storageManager = null;
+ AbstractStorageManager storageManager;
if(v2) {
- storageManager = new StorageManagerV2(conf);
+ storageManager = new StorageManagerV2(localConf);
} else {
- storageManager = new StorageManager(conf);
+ storageManager = new StorageManager(localConf);
}
storageManagers.put(key, storageManager);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d5128328/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskFileScanScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskFileScanScheduler.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskFileScanScheduler.java
index 2f97d41..8ecf1de 100644
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskFileScanScheduler.java
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskFileScanScheduler.java
@@ -29,6 +29,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import static org.apache.tajo.conf.TajoConf.ConfVars;
+
public class DiskFileScanScheduler extends Thread {
private static final Log LOG = LogFactory.getLog(DiskFileScanScheduler.class);
@@ -156,7 +158,7 @@ public class DiskFileScanScheduler extends Thread {
private void initScannerPool() {
// TODO finally implements heuristic, currently set with property
- scanConcurrency = smContext.getConf().getInt("tajo.storage.manager.concurrency.perDisk", 1);
+ scanConcurrency = smContext.getConf().getIntVar(ConfVars.STORAGE_MANAGER_CONCURRENCY_PER_DISK);
}
public int getTotalQueueSize() {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d5128328/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/RCFile.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/RCFile.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/RCFile.java
index 7322c7a..acddf79 100644
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/RCFile.java
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/RCFile.java
@@ -1438,7 +1438,6 @@ public class RCFile {
seekToNextKeyBuffer();
currentRecordLength = readRecordLength();
if (currentRecordLength == -1) {
- //System.out.println(">>>>>currentRecordLength is minus");
keyInit = false;
return -1;
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d5128328/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/RCFileScanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/RCFileScanner.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/RCFileScanner.java
index a62ed4f..f2da623 100644
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/RCFileScanner.java
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/RCFileScanner.java
@@ -275,7 +275,6 @@ public class RCFileScanner extends FileScannerV2 {
public boolean isFetchProcessing() {
//TODO row group size
if(sin != null && sin.getAvaliableSize() > maxBytesPerSchedule * 3) {
-// System.out.println(">>>>>sin.getAvaliableSize()>" + sin.getAvaliableSize());
return true;
} else {
return false;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d5128328/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/ScanScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/ScanScheduler.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/ScanScheduler.java
index 4cd04f3..8763cfa 100644
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/ScanScheduler.java
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/ScanScheduler.java
@@ -25,6 +25,8 @@ import org.apache.tajo.storage.v2.StorageManagerV2.StorgaeManagerContext;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
+import static org.apache.tajo.conf.TajoConf.ConfVars;
+
public class ScanScheduler extends Thread {
private static final Log LOG = LogFactory.getLog(ScanScheduler.class);
@@ -63,7 +65,7 @@ public class ScanScheduler extends Thread {
LOG.error(e.getMessage(), e);
}
- final int reportInterval = context.getConf().getInt("tajo.disk.scheduler.report.interval", 60 * 1000);
+ final int reportInterval = context.getConf().getIntVar(ConfVars.STORAGE_MANAGER_DISK_SCHEDULER_REPORT_INTERVAL);
if(reportInterval > 0) {
schedulerStatusReportThread = new Thread() {
public void run() {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d5128328/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/StorageManagerV2.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/StorageManagerV2.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/StorageManagerV2.java
index dcf523a..0625d28 100644
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/StorageManagerV2.java
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/StorageManagerV2.java
@@ -112,7 +112,7 @@ public class StorageManagerV2 extends AbstractStorageManager {
}
public int getMaxReadBytesPerScheduleSlot() {
- return conf.getInt("tajo.storage.manager.maxReadBytes", 8 * 1024 * 1024); //8MB
+ return conf.getIntVar(TajoConf.ConfVars.STORAGE_MANAGER_DISK_SCHEDULER_MAX_READ_BYTES_PER_SLOT);
}
public void requestFileScan(FileScannerV2 fileScanner) {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d5128328/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/v2/TestCSVCompression.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/v2/TestCSVCompression.java b/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/v2/TestCSVCompression.java
index 8827ded..e5dc00f 100644
--- a/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/v2/TestCSVCompression.java
+++ b/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/v2/TestCSVCompression.java
@@ -55,7 +55,7 @@ public class TestCSVCompression {
public TestCSVCompression(CatalogProtos.StoreType type) throws IOException {
this.storeType = type;
conf = new TajoConf();
- conf.set("tajo.storage.manager.v2", "true");
+ conf.setBoolVar(TajoConf.ConfVars.STORAGE_MANAGER_VERSION_2, true);
testDir = CommonTestingUtil.getTestDir(TEST_PATH);
fs = testDir.getFileSystem(conf);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d5128328/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/v2/TestCSVScanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/v2/TestCSVScanner.java b/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/v2/TestCSVScanner.java
index bf56943..f647a4c 100644
--- a/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/v2/TestCSVScanner.java
+++ b/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/v2/TestCSVScanner.java
@@ -36,6 +36,7 @@ import org.junit.Test;
import java.io.IOException;
+import static org.apache.tajo.conf.TajoConf.ConfVars;
import static org.junit.Assert.assertEquals;
public class TestCSVScanner {
@@ -48,7 +49,7 @@ public class TestCSVScanner {
@Before
public void setUp() throws Exception {
conf = new TajoConf();
- conf.set("tajo.storage.manager.v2", "true");
+ conf.setBoolVar(ConfVars.STORAGE_MANAGER_VERSION_2, true);
testDir = CommonTestingUtil.getTestDir(TEST_PATH);
fs = testDir.getFileSystem(conf);
sm = StorageManagerFactory.getStorageManager(conf, testDir);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d5128328/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/v2/TestStorages.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/v2/TestStorages.java b/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/v2/TestStorages.java
index 3d1b6f7..91dd935 100644
--- a/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/v2/TestStorages.java
+++ b/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/v2/TestStorages.java
@@ -43,6 +43,7 @@ import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
+import static org.apache.tajo.conf.TajoConf.ConfVars;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -63,7 +64,7 @@ public class TestStorages {
this.statsable = statsable;
conf = new TajoConf();
- conf.set("tajo.storage.manager.v2", "true");
+ conf.setBoolVar(ConfVars.STORAGE_MANAGER_VERSION_2, true);
if (storeType == StoreType.RCFILE) {
conf.setInt(RCFile.RECORD_INTERVAL_CONF_STR, 100);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d5128328/tajo-dist/src/main/bin/tajo
----------------------------------------------------------------------
diff --git a/tajo-dist/src/main/bin/tajo b/tajo-dist/src/main/bin/tajo
index fb62acf..d4ea664 100755
--- a/tajo-dist/src/main/bin/tajo
+++ b/tajo-dist/src/main/bin/tajo
@@ -31,8 +31,11 @@
# by doing
# export TAJO_USER_CLASSPATH_FIRST=true
#
-# TAJO_HEAPSIZE The maximum amount of heap to use, in MB.
-# Default is 1000.
+# TAJO_MASTER_HEAPSIZE The maximum amount of heap to use, in MB.
+# Default is 1000.
+#
+# TAJO_WORKER_HEAPSIZE The maximum amount of heap to use, in MB.
+# Default is 1000.
#
# TAJO_OPTS Extra Java runtime options.
@@ -101,15 +104,22 @@ if [ "$JAVA_HOME" = "" ]; then
fi
JAVA=$JAVA_HOME/bin/java
-JAVA_HEAP_MAX=-Xmx1000m
+JAVA_TAJO_MASTER_HEAP_MAX=-Xmx1000m
+JAVA_WORKER_HEAP_MAX=-Xmx1000m
# check envvars which might override default args
-if [ "$TAJO_HEAPSIZE" != "" ]; then
- #echo "run with heapsize $TAJO_HEAPSIZE"
- JAVA_HEAP_MAX="-Xmx""$TAJO_HEAPSIZE""m"
- #echo $JAVA_HEAP_MAX
+if [ "$TAJO_MASTER_HEAPSIZE" != "" ]; then
+ #echo "run with heapsize $TAJO_MASTER_HEAPSIZE"
+ JAVA_TAJO_MASTER_HEAP_MAX="-Xmx""$TAJO_MASTER_HEAPSIZE""m"
+ #echo $JAVA_MASTER_HEAP_MAX
+fi
+if [ "$TAJO_WORKER_HEAPSIZE" != "" ]; then
+ #echo "run with heapsize $TAJO_WORKER_HEAPSIZE"
+ JAVA_WORKER_HEAP_MAX="-Xmx""$TAJO_WORKER_HEAPSIZE""m"
+ #echo $JAVA_WORKER_HEAP_MAX
fi
+
##############################################################################
# Hadoop Version Checking Section Start
##############################################################################
@@ -273,10 +283,10 @@ if [ "$COMMAND" = "classpath" ] ; then
exit
elif [ "$COMMAND" = "master" ] ; then
CLASS='org.apache.tajo.master.TajoMaster'
- TAJO_OPTS="$TAJO_OPTS $TAJO_MASTER_OPTS"
+ TAJO_OPTS="$TAJO_OPTS $JAVA_TAJO_MASTER_HEAP_MAX $TAJO_MASTER_OPTS"
elif [ "$COMMAND" = "worker" ] ; then
CLASS='org.apache.tajo.worker.TajoWorker'
- TAJO_OPTS="$TAJO_OPTS $TAJO_WORKER_OPTS"
+ TAJO_OPTS="$TAJO_OPTS $JAVA_WORKER_HEAP_MAX $TAJO_WORKER_OPTS"
elif [ "$COMMAND" = "catalog" ] ; then
CLASS='org.apache.tajo.catalog.CatalogServer'
TAJO_OPTS="$TAJO_OPTS $TAJO_CATALOG_OPTS"
@@ -336,23 +346,5 @@ if [ "x$JAVA_LIBRARY_PATH" != "x" ]; then
fi
TAJO_OPTS="$TAJO_OPTS -Dtajo.policy.file=$TAJO_POLICYFILE"
-# Check to see if we should start a secure datanode
-if [ "$starting_secure_dn" = "true" ]; then
- if [ "$TAJO_PID_DIR" = "" ]; then
- TAJO_SECURE_DN_PID="/tmp/tajo_secure_dn.pid"
- else
- TAJO_SECURE_DN_PID="$TAJO_PID_DIR/tajo_secure_dn.pid"
- fi
-
- exec "$TAJO_HOME/bin/jsvc" -Dproc_$COMMAND -outfile "$TAJO_LOG_DIR/jsvc.out" \
- -errfile "$TAJO_LOG_DIR/jsvc.err" \
- -pidfile "$TAJO_SECURE_DN_PID" \
- -nodetach \
- -user "$TAJO_SECURE_DN_USER" \
- -cp "$CLASSPATH" \
- $JAVA_HEAP_MAX $TAJO_OPTS \
- org.apache.tajo.hdfs.server.datanode.SecureDataNodeStarter "$@"
-else
- # run it
- exec "$JAVA" -Dproc_$COMMAND $JAVA_HEAP_MAX $TAJO_OPTS -classpath "$CLASSPATH" $CLASS "$@"
-fi
+# run it
+exec "$JAVA" -Dproc_$COMMAND $TAJO_OPTS -classpath "$CLASSPATH" $CLASS "$@"
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d5128328/tajo-dist/src/main/conf/tajo-env.sh
----------------------------------------------------------------------
diff --git a/tajo-dist/src/main/conf/tajo-env.sh b/tajo-dist/src/main/conf/tajo-env.sh
index 3791904..3654fee 100755
--- a/tajo-dist/src/main/conf/tajo-env.sh
+++ b/tajo-dist/src/main/conf/tajo-env.sh
@@ -21,6 +21,9 @@
# set JAVA_HOME in this file, so that it is correctly defined on
# remote nodes.
+# Hadoop home. Required
+# export HADOOP_HOME=
+
# The java implementation to use. Required.
# export JAVA_HOME=/usr
@@ -28,11 +31,13 @@
# export TAJO_CLASSPATH=
# The maximum amount of heap to use, in MB. Default is 1000.
-# export TAJO_HEAPSIZE=1000
+# export TAJO_MASTER_HEAPSIZE=1000
+
+# The maximum amount of heap to use, in MB. Default is 1000.
+# export TAJO_WORKER_HEAPSIZE=1000
# Extra Java runtime options. Empty by default.
# export TAJO_OPTS=-server
-export TAJO_OPTS=-XX:+PrintGCTimeStamps
# Extra TajoMaster's java runtime options for TajoMaster. Empty by default
# export TAJO_MASTER_OPTS=
@@ -52,7 +57,5 @@ export TAJO_OPTS=-XX:+PrintGCTimeStamps
# The scheduling priority for daemon processes. See 'man nice'.
# export TAJO_NICENESS=10
-# Tajo Distributed Execution Mode
-# the default mode is on-demand mode using YarnTajoResourceManager.
-# export TAJO_WORKER_STANDBY_MODE=true
-
+# Tajo cluster mode. the default mode is standby mode.
+export TAJO_WORKER_STANDBY_MODE=true
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d5128328/tajo-rpc/src/main/java/org/apache/tajo/util/NetUtils.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/util/NetUtils.java b/tajo-rpc/src/main/java/org/apache/tajo/util/NetUtils.java
deleted file mode 100644
index 6a43be3..0000000
--- a/tajo-rpc/src/main/java/org/apache/tajo/util/NetUtils.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.util;
-
-import java.net.*;
-
-public class NetUtils {
- public static String normalizeInetSocketAddress(InetSocketAddress addr) {
- return addr.getAddress().getHostAddress() + ":" + addr.getPort();
- }
-
- public static InetSocketAddress createSocketAddr(String addr) {
- String [] splitted = addr.split(":");
- return new InetSocketAddress(splitted[0], Integer.parseInt(splitted[1]));
- }
-
- /**
- * Util method to build socket addr from either:
- * <host>
- * <host>:<port>
- * <fs>://<host>:<port>/<path>
- */
- public static InetSocketAddress createSocketAddr(String host, int port) {
- return new InetSocketAddress(host, port);
- }
-
- public static InetSocketAddress createUnresolved(String addr) {
- String [] splitted = addr.split(":");
- return InetSocketAddress.createUnresolved(splitted[0], Integer.parseInt(splitted[1]));
- }
-
- /**
- * Returns InetSocketAddress that a client can use to
- * connect to the server. NettyServerBase.getListenerAddress() is not correct when
- * the server binds to "0.0.0.0". This returns "hostname:port" of the server,
- * or "127.0.0.1:port" when the getListenerAddress() returns "0.0.0.0:port".
- *
- * @param addr of a listener
- * @return socket address that a client can use to connect to the server.
- */
- public static InetSocketAddress getConnectAddress(InetSocketAddress addr) {
- if (!addr.isUnresolved() && addr.getAddress().isAnyLocalAddress()) {
- try {
- addr = new InetSocketAddress(InetAddress.getLocalHost(), addr.getPort());
- } catch (UnknownHostException uhe) {
- // shouldn't get here unless the host doesn't have a loopback iface
- addr = new InetSocketAddress("127.0.0.1", addr.getPort());
- }
- }
- return addr;
- }
-
- /**
- * Given an InetAddress, checks to see if the address is a local address, by
- * comparing the address with all the interfaces on the node.
- * @param addr address to check if it is local node's address
- * @return true if the address corresponds to the local node
- */
- public static boolean isLocalAddress(InetAddress addr) {
- // Check if the address is any local or loop back
- boolean local = addr.isAnyLocalAddress() || addr.isLoopbackAddress();
-
- // Check if the address is defined on any interface
- if (!local) {
- try {
- local = NetworkInterface.getByInetAddress(addr) != null;
- } catch (SocketException e) {
- local = false;
- }
- }
- return local;
- }
-
- public static String normalizeHost(String host) {
- try {
- InetAddress address = InetAddress.getByName(host);
- if (isLocalAddress(address)) {
- return InetAddress.getLocalHost().getHostAddress();
- } else {
- return address.getHostAddress();
- }
- } catch (UnknownHostException e) {
- }
- return host;
- }
-}
\ No newline at end of file
[2/3] TAJO-261: Rearrange default port numbers and config names.
(hyunsik)
Posted by hy...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d5128328/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
index cb35954..e7cb7a0 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
@@ -512,7 +512,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
LOG.info("Smaller Table's volume is approximately " + mb + " MB");
// determine the number of task
int taskNum = (int) Math.ceil((double)mb /
- conf.getIntVar(ConfVars.JOIN_PARTITION_VOLUME));
+ conf.getIntVar(ConfVars.DIST_QUERY_JOIN_PARTITION_VOLUME));
LOG.info("The determined number of join partitions is " + taskNum);
return taskNum;
@@ -528,7 +528,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
LOG.info("Table's volume is approximately " + mb + " MB");
// determine the number of task
int taskNum = (int) Math.ceil((double)mb /
- conf.getIntVar(ConfVars.AGGREGATION_PARTITION_VOLUME));
+ conf.getIntVar(ConfVars.DIST_QUERY_GROUPBY_PARTITION_VOLUME));
LOG.info("The determined number of aggregation partitions is " + taskNum);
return taskNum;
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d5128328/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
index d1a0c96..db06b88 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.tajo.ExecutionBlockId;
import org.apache.tajo.QueryId;
import org.apache.tajo.QueryIdFactory;
+import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.ipc.TajoMasterProtocol;
import org.apache.tajo.master.TajoMaster;
import org.apache.tajo.master.querymaster.QueryInProgress;
@@ -66,8 +67,8 @@ public class TajoWorkerResourceManager implements WorkerResourceManager {
public TajoWorkerResourceManager(TajoMaster.MasterContext masterContext) {
this.masterContext = masterContext;
this.queryIdSeed = String.valueOf(System.currentTimeMillis());
- this.queryMasterMemoryMB = masterContext.getConf().getInt("tajo.querymaster.memoryMB", 512);
- this.queryMasterDiskSlot = masterContext.getConf().getInt("tajo.querymaster.diskSlot", 1);
+ this.queryMasterMemoryMB = masterContext.getConf().getIntVar(TajoConf.ConfVars.YARN_RM_QUERY_MASTER_MEMORY_MB);
+ this.queryMasterDiskSlot = masterContext.getConf().getIntVar(TajoConf.ConfVars.YARN_RM_QUERY_MASTER_DISKS);
requestQueue = new LinkedBlockingDeque<WorkerResourceRequest>();
reAllocationList = new ArrayList<WorkerResourceRequest>();
@@ -222,7 +223,7 @@ public class TajoWorkerResourceManager implements WorkerResourceManager {
for(WorkerResource eachWorker: workerResources) {
workerHosts.add(TajoMasterProtocol.WorkerAllocatedResource.newBuilder()
- .setWorkerHostAndPort(eachWorker.getAllocatedHost() + ":" + eachWorker.getManagerPort())
+ .setWorkerHostAndPort(eachWorker.getAllocatedHost() + ":" + eachWorker.getPeerRpcPort())
.setWorkerPullServerPort(eachWorker.getPullServerPort())
.build());
}
@@ -368,7 +369,7 @@ public class TajoWorkerResourceManager implements WorkerResourceManager {
int[] ports = new int[] { request.getTajoWorkerPort(), request.getTajoWorkerClientPort() };
- workerResource.setManagerPort(request.getTajoWorkerPort());
+ workerResource.setPeerRpcPort(request.getTajoWorkerPort());
workerResource.setClientPort(request.getTajoWorkerClientPort());
workerResource.setPullServerPort(request.getTajoWorkerPullServerPort());
workerResource.setHttpPort(request.getTajoWorkerHttpPort());
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d5128328/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerResource.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerResource.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerResource.java
index a1a4c3e..fad129a 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerResource.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerResource.java
@@ -28,7 +28,7 @@ public class WorkerResource {
private static final Log LOG = LogFactory.getLog(WorkerResource.class);
private String allocatedHost;
- private int managerPort;
+ private int peerRpcPort;
private int clientPort;
private int pullServerPort;
private int httpPort;
@@ -58,11 +58,11 @@ public class WorkerResource {
private long lastHeartbeat;
public String getId() {
- return allocatedHost + ":" + managerPort;
+ return allocatedHost + ":" + peerRpcPort;
}
public void copyId(WorkerResource workerResource) {
- managerPort = workerResource.getManagerPort();
+ peerRpcPort = workerResource.getPeerRpcPort();
allocatedHost = workerResource.getAllocatedHost();
}
@@ -140,7 +140,7 @@ public class WorkerResource {
}
public String portsToStr() {
- return managerPort + "," + clientPort + "," + pullServerPort;
+ return peerRpcPort + "," + clientPort + "," + pullServerPort;
}
public void setLastHeartbeat(long heartbeatTime) {
@@ -222,7 +222,7 @@ public class WorkerResource {
public int getSlots() {
//TODO what is slot? 512MB = 1slot?
- return getMemoryMBSlots()/512;
+ return getMemoryMBSlots() / 512;
}
public int getAvaliableSlots() {
@@ -235,12 +235,12 @@ public class WorkerResource {
return getUsedMemoryMBSlots()/512;
}
- public int getManagerPort() {
- return managerPort;
+ public int getPeerRpcPort() {
+ return peerRpcPort;
}
- public void setManagerPort(int managerPort) {
- this.managerPort = managerPort;
+ public void setPeerRpcPort(int peerRpcPort) {
+ this.peerRpcPort = peerRpcPort;
}
public int getClientPort() {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d5128328/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/JSPUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/JSPUtil.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/JSPUtil.java
index 8b2a234..28598d2 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/JSPUtil.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/JSPUtil.java
@@ -19,6 +19,7 @@
package org.apache.tajo.util;
import org.apache.hadoop.conf.Configuration;
+import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.master.querymaster.QueryInProgress;
import org.apache.tajo.master.querymaster.QueryMasterTask;
import org.apache.tajo.master.querymaster.QueryUnit;
@@ -28,6 +29,8 @@ import org.apache.tajo.worker.TaskRunner;
import java.text.DecimalFormat;
import java.util.*;
+import static org.apache.tajo.conf.TajoConf.ConfVars;
+
public class JSPUtil {
static DecimalFormat decimalF = new DecimalFormat("###.0");
@@ -55,9 +58,9 @@ public class JSPUtil {
public static String getTajoMasterHttpAddr(Configuration config) {
try {
- String[] masterAddr = config.get("tajo.master.manager.addr").split(":");
-
- return masterAddr[0] + ":" + config.getInt("tajo.master.http.port", 8080);
+ TajoConf conf = (TajoConf) config;
+ String [] masterAddr = conf.getVar(ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS).split(":");
+ return masterAddr[0] + ":" + conf.getVar(ConfVars.TAJO_MASTER_INFO_ADDRESS).split(":")[1];
} catch (Exception e) {
e.printStackTrace();
return e.getMessage();
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d5128328/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/webapp/StaticHttpServer.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/webapp/StaticHttpServer.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/webapp/StaticHttpServer.java
index b057561..09426e0 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/webapp/StaticHttpServer.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/webapp/StaticHttpServer.java
@@ -21,9 +21,12 @@ package org.apache.tajo.webapp;
import org.apache.hadoop.conf.Configuration;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.conf.TajoConf.ConfVars;
+import org.apache.tajo.master.TajoMaster;
+import org.apache.tajo.worker.TajoWorker;
import org.mortbay.jetty.Connector;
import java.io.IOException;
+import java.net.Inet4Address;
public class StaticHttpServer extends HttpServer {
private static StaticHttpServer instance = null;
@@ -43,7 +46,11 @@ public class StaticHttpServer extends HttpServer {
String addr = bindAddress;
if(instance == null) {
if(bindAddress == null || bindAddress.compareTo("") == 0) {
- addr = conf.getVar(ConfVars.TASKRUNNER_LISTENER_ADDRESS).split(":")[0];
+ if (containerObject instanceof TajoMaster) {
+ addr = conf.getVar(ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS).split(":")[0];
+ } else if (containerObject instanceof TajoWorker) {
+ addr = Inet4Address.getLocalHost().getHostName();
+ }
}
instance = new StaticHttpServer(containerObject, name, addr, port,
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d5128328/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
index 385add1..e42de27 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
@@ -74,7 +74,7 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
public TajoResourceAllocator(QueryMasterTask.QueryMasterTaskContext queryTaskContext) {
this.queryTaskContext = queryTaskContext;
executorService = Executors.newFixedThreadPool(
- queryTaskContext.getConf().getIntVar(TajoConf.ConfVars.AM_TASKRUNNER_LAUNCH_PARALLEL_NUM));
+ queryTaskContext.getConf().getIntVar(TajoConf.ConfVars.YARN_RM_TASKRUNNER_LAUNCH_PARALLEL_NUM));
}
@Override
@@ -277,7 +277,7 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
WorkerResource workerResource = new WorkerResource();
workerResource.setAllocatedHost(nodeId.getHost());
- workerResource.setManagerPort(nodeId.getPort());
+ workerResource.setPeerRpcPort(nodeId.getPort());
workerResource.setPullServerPort(eachWorker.getWorkerPullServerPort());
workerResource.setMemoryMBSlots(requiredMemoryMBSlot);
workerResource.setDiskSlots(requiredDiskSlots);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d5128328/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java
index 96ea95e..6152da7 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java
@@ -55,13 +55,15 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import static org.apache.tajo.conf.TajoConf.ConfVars;
+
public class TajoWorker extends CompositeService {
public static PrimitiveProtos.BoolProto TRUE_PROTO = PrimitiveProtos.BoolProto.newBuilder().setValue(true).build();
public static PrimitiveProtos.BoolProto FALSE_PROTO = PrimitiveProtos.BoolProto.newBuilder().setValue(false).build();
private static final Log LOG = LogFactory.getLog(TajoWorker.class);
- private TajoConf tajoConf;
+ private TajoConf systemConf;
private StaticHttpServer webServer;
@@ -107,26 +109,24 @@ public class TajoWorker extends CompositeService {
public void init(Configuration conf) {
Runtime.getRuntime().addShutdownHook(new Thread(new ShutdownHook()));
- this.tajoConf = (TajoConf)conf;
- RackResolver.init(tajoConf);
+ this.systemConf = (TajoConf)conf;
+ RackResolver.init(systemConf);
workerContext = new WorkerContext();
- String resourceManagerClassName = conf.get("tajo.resource.manager",
- TajoWorkerResourceManager.class.getCanonicalName());
+ String resourceManagerClassName = systemConf.getVar(ConfVars.RESOURCE_MANAGER_CLASS);
boolean randomPort = true;
if(resourceManagerClassName.indexOf(TajoWorkerResourceManager.class.getName()) >= 0) {
randomPort = false;
}
- int clientPort = tajoConf.getInt("tajo.worker.client.rpc.port", 8091);
- int managerPort = tajoConf.getInt("tajo.worker.manager.rpc.port", 8092);
+ int clientPort = systemConf.getSocketAddrVar(ConfVars.WORKER_CLIENT_RPC_ADDRESS).getPort();
+ int peerRpcPort = systemConf.getSocketAddrVar(ConfVars.WORKER_PEER_RPC_ADDRESS).getPort();
if(randomPort) {
clientPort = 0;
- managerPort = 0;
- tajoConf.setInt(TajoConf.ConfVars.PULLSERVER_PORT.varname, 0);
- //infoPort = 0;
+ peerRpcPort = 0;
+ systemConf.setIntVar(ConfVars.PULLSERVER_PORT, 0);
}
if(!"qm".equals(daemonMode)) {
@@ -143,16 +143,16 @@ public class TajoWorker extends CompositeService {
tajoWorkerClientService = new TajoWorkerClientService(workerContext, clientPort);
addService(tajoWorkerClientService);
- tajoWorkerManagerService = new TajoWorkerManagerService(workerContext, managerPort);
+ tajoWorkerManagerService = new TajoWorkerManagerService(workerContext, peerRpcPort);
addService(tajoWorkerManagerService);
- LOG.info("Tajo worker started: mode=" + daemonMode + ", clientPort=" + clientPort + ", managerPort="
- + managerPort);
+ LOG.info("Tajo worker started: mode=" + daemonMode + ", clientPort=" + clientPort + ", peerRpcPort="
+ + peerRpcPort);
- if (!tajoConf.get(CommonTestingUtil.TAJO_TEST, "FALSE").equalsIgnoreCase("TRUE")) {
+ if (!systemConf.get(CommonTestingUtil.TAJO_TEST, "FALSE").equalsIgnoreCase("TRUE")) {
try {
- httpPort = tajoConf.getInt("tajo.worker.http.port", 28080);
+ httpPort = systemConf.getSocketAddrVar(ConfVars.WORKER_INFO_ADDRESS).getPort();
webServer = StaticHttpServer.getInstance(this ,"worker", null, httpPort ,
- true, null, tajoConf, null);
+ true, null, systemConf, null);
webServer.start();
httpPort = webServer.getPort();
LOG.info("Worker info server started:" + httpPort);
@@ -160,8 +160,8 @@ public class TajoWorker extends CompositeService {
LOG.error(e.getMessage(), e);
}
}
- LOG.info("Tajo worker started: mode=" + daemonMode + ", clientPort=" + clientPort + ", managerPort="
- + managerPort);
+ LOG.info("Tajo worker started: mode=" + daemonMode + ", clientPort=" + clientPort + ", peerRpcPort="
+ + peerRpcPort);
} else {
LOG.info("Tajo worker started: mode=" + daemonMode);
@@ -291,7 +291,7 @@ public class TajoWorker extends CompositeService {
} else if("tr".equals(daemonMode)) { //TaskRunner mode
taskRunnerManager.startTask(params);
} else { //Standby mode
- connectToTajoMaster(tajoConf.get("tajo.master.manager.addr"));
+ connectToTajoMaster(systemConf.getVar(ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS));
connectToCatalog();
workerHeartbeatThread = new WorkerHeartbeatThread();
workerHeartbeatThread.start();
@@ -320,11 +320,8 @@ public class TajoWorker extends CompositeService {
}
private void connectToCatalog() {
- // TODO: To be improved. it's a hack. It assumes that CatalogServer is embedded in TajoMaster.
- String catalogAddr = tajoConf.getVar(TajoConf.ConfVars.CATALOG_ADDRESS);
- //int port = Integer.parseInt(tajoConf.getVar(TajoConf.ConfVars.CATALOG_ADDRESS).split(":")[1]);
try {
- catalogClient = new CatalogClient(tajoConf);
+ catalogClient = new CatalogClient(systemConf);
} catch (IOException e) {
e.printStackTrace();
}
@@ -332,16 +329,15 @@ public class TajoWorker extends CompositeService {
class WorkerHeartbeatThread extends Thread {
TajoMasterProtocol.ServerStatusProto.System systemInfo;
- List<TajoMasterProtocol.ServerStatusProto.Disk> diskInfos =
- new ArrayList<TajoMasterProtocol.ServerStatusProto.Disk>();
- int workerDiskSlots;
+ List<TajoMasterProtocol.ServerStatusProto.Disk> diskInfos = new ArrayList<TajoMasterProtocol.ServerStatusProto.Disk>();
+ int workerDisksNum;
List<File> mountPaths;
public WorkerHeartbeatThread() {
- int workerMemoryMBSlots;
- int workerCpuCoreSlots;
+ int workerMemoryMB;
+ int workerCpuCoreNum;
- boolean useSystemInfo = tajoConf.getBoolean("tajo.worker.slots.use.os.info", false);
+ boolean dedicatedResource = systemConf.getBoolVar(ConfVars.WORKER_RESOURCE_DEDICATED);
try {
mountPaths = getMountPath();
@@ -349,29 +345,29 @@ public class TajoWorker extends CompositeService {
LOG.error(e.getMessage(), e);
}
- if(useSystemInfo) {
- float memoryRatio = tajoConf.getFloat("tajo.worker.slots.os.memory.ratio", 0.8f);
- workerMemoryMBSlots = getTotalMemoryMB();
- workerMemoryMBSlots = (int)((float)(workerMemoryMBSlots) * memoryRatio);
- workerCpuCoreSlots = Runtime.getRuntime().availableProcessors();
+ if(dedicatedResource) {
+ float dedicatedMemoryRatio = systemConf.getFloatVar(ConfVars.WORKER_RESOURCE_DEDICATED_MEMORY_RATIO);
+ int totalMemory = getTotalMemoryMB();
+ workerMemoryMB = (int) ((float) (totalMemory) * dedicatedMemoryRatio);
+ workerCpuCoreNum = Runtime.getRuntime().availableProcessors();
if(mountPaths == null) {
- workerDiskSlots = 2;
+ workerDisksNum = ConfVars.WORKER_RESOURCE_AVAILABLE_DISKS.defaultIntVal;
} else {
- workerDiskSlots = mountPaths.size();
+ workerDisksNum = mountPaths.size();
}
} else {
- workerMemoryMBSlots = tajoConf.getInt("tajo.worker.slots.memoryMB", 2048);
- workerDiskSlots = tajoConf.getInt("tajo.worker.slots.disk", 2);
- workerCpuCoreSlots = tajoConf.getInt("tajo.worker.slots.cpu.core", 4);
+ // TODO - it's a hack and it must be fixed
+ //workerMemoryMB = systemConf.getIntVar(ConfVars.WORKER_RESOURCE_AVAILABLE_MEMORY_MB);
+ workerMemoryMB = 512 * systemConf.getIntVar(ConfVars.WORKER_EXECUTION_MAX_SLOTS);
+ workerDisksNum = systemConf.getIntVar(ConfVars.WORKER_RESOURCE_AVAILABLE_DISKS);
+ workerCpuCoreNum = systemConf.getIntVar(ConfVars.WORKER_RESOURCE_AVAILABLE_CPU_CORES);
}
- workerDiskSlots = workerDiskSlots * tajoConf.getInt("tajo.worker.slots.disk.concurrency", 4);
-
systemInfo = TajoMasterProtocol.ServerStatusProto.System.newBuilder()
- .setAvailableProcessors(workerCpuCoreSlots)
+ .setAvailableProcessors(workerCpuCoreNum)
.setFreeMemoryMB(0)
.setMaxMemoryMB(0)
- .setTotalMemoryMB(workerMemoryMBSlots)
+ .setTotalMemoryMB(workerMemoryMB)
.build();
}
@@ -408,7 +404,7 @@ public class TajoWorker extends CompositeService {
.addAllDisk(diskInfos)
.setRunningTaskNum(taskRunnerManager == null ? 1 : taskRunnerManager.getNumTasks()) //TODO
.setSystem(systemInfo)
- .setDiskSlots(workerDiskSlots)
+ .setDiskSlots(workerDisksNum)
.setJvmHeap(jvmHeap)
.build();
@@ -526,8 +522,6 @@ public class TajoWorker extends CompositeService {
break;
}
- System.out.println(line);
-
int indexStart = line.indexOf(" on /");
int indexEnd = line.indexOf(" ", indexStart + 4);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d5128328/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
index 177e920..ef10254 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
@@ -88,7 +88,6 @@ public class TajoWorkerClientService extends AbstractService {
}
// Get the master address
LOG.info(TajoWorkerClientService.class.getSimpleName() + " is bind to " + addr);
- //queryConf.setVar(TajoConf.ConfVars.TASKRUNNER_LISTENER_ADDRESS, addr);
super.init(conf);
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d5128328/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
index 4011829..c3e7130 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
@@ -30,8 +30,8 @@ import org.apache.tajo.QueryId;
import org.apache.tajo.QueryUnitAttemptId;
import org.apache.tajo.TajoIdProtos;
import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.ipc.TajoWorkerProtocol;
import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
import org.apache.tajo.master.TaskSchedulerImpl;
import org.apache.tajo.master.event.*;
import org.apache.tajo.master.querymaster.QueryMaster;
@@ -67,6 +67,7 @@ public class TajoWorkerManagerService extends CompositeService
@Override
public void init(Configuration conf) {
+ TajoConf tajoConf = (TajoConf) conf;
try {
// Setup RPC server
InetSocketAddress initIsa =
@@ -91,9 +92,8 @@ public class TajoWorkerManagerService extends CompositeService
}
// Get the master address
LOG.info("TajoWorkerManagerService is bind to " + addr);
- ((TajoConf)conf).setVar(TajoConf.ConfVars.TASKRUNNER_LISTENER_ADDRESS, addr);
-
- super.init(conf);
+ tajoConf.setVar(TajoConf.ConfVars.WORKER_PEER_RPC_ADDRESS, NetUtils.normalizeInetSocketAddress(bindAddr));
+ super.init(tajoConf);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d5128328/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
index d845f4f..e30f8c4 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
@@ -23,10 +23,7 @@ import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalDirAllocator;
-import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.*;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.util.ConverterUtils;
@@ -450,7 +447,7 @@ public class Task {
private Fragment[] localizeFetchedData(File file, String name, TableMeta meta)
throws IOException {
Configuration c = new Configuration(systemConf);
- c.set("fs.default.name", "file:///");
+ c.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, "file:///");
FileSystem fs = FileSystem.get(c);
Path tablePath = new Path(file.getAbsolutePath());
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d5128328/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java
index 94c1d8f..a5bb55c 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java
@@ -18,7 +18,6 @@
package org.apache.tajo.worker;
-import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -114,8 +113,8 @@ public class TaskRunner extends AbstractService {
try {
final ExecutionBlockId executionBlockId = TajoIdUtils.createExecutionBlockId(args[1]);
- LOG.info("NM Local Dir: " + conf.get(ConfVars.TASK_LOCAL_DIR.varname));
LOG.info("Tajo Root Dir: " + conf.getVar(ConfVars.ROOT_DIR));
+ LOG.info("Worker Local Dir: " + conf.getVar(ConfVars.WORKER_TEMPORAL_DIR));
UserGroupInformation.setConfiguration(conf);
@@ -132,7 +131,7 @@ public class TaskRunner extends AbstractService {
LOG.info("QueryMaster Address:" + masterAddr);
// TODO - 'load credential' should be implemented
// Getting taskOwner
- UserGroupInformation taskOwner = UserGroupInformation.createRemoteUser(conf.getVar(ConfVars.TAJO_USERNAME));
+ UserGroupInformation taskOwner = UserGroupInformation.createRemoteUser(conf.getVar(ConfVars.USERNAME));
//taskOwner.addToken(token);
// initialize MasterWorkerProtocol as an actual task owner.
@@ -166,14 +165,14 @@ public class TaskRunner extends AbstractService {
try {
// initialize DFS and LocalFileSystems
- defaultFS = TajoConf.getTajoRootPath(systemConf).getFileSystem(conf);
+ defaultFS = TajoConf.getTajoRootDir(systemConf).getFileSystem(conf);
localFS = FileSystem.getLocal(conf);
// the base dir for an output dir
baseDir = queryId.toString() + "/output" + "/" + executionBlockId.getId();
// initialize LocalDirAllocator
- lDirAllocator = new LocalDirAllocator(ConfVars.TASK_LOCAL_DIR.varname);
+ lDirAllocator = new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname);
baseDirPath = localFS.makeQualified(lDirAllocator.getLocalPathForWrite(baseDir, conf));
LOG.info("TaskRunner basedir is created (" + baseDir +")");
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d5128328/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
index 0572aac..85b1e6b 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
@@ -22,9 +22,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.service.CompositeService;
-import org.apache.tajo.QueryId;
import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.master.querymaster.QueryMasterTask;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -137,11 +135,11 @@ public class TaskRunnerManager extends CompositeService {
class FinishedTaskCleanThread extends Thread {
public void run() {
- int expireIntervalTime = tajoConf.getInt("tajo.worker.history.expire.interval.min", 12 * 60); //12 hour
- LOG.info("FinishedQueryMasterTaskCleanThread started: expireIntervalTime=" + expireIntervalTime);
+ int expireIntervalTime = tajoConf.getIntVar(TajoConf.ConfVars.WORKER_HISTORY_EXPIRE_PERIOD);
+ LOG.info("FinishedQueryMasterTaskCleanThread started: expire interval minutes = " + expireIntervalTime);
while(!stop.get()) {
try {
- Thread.sleep(60 * 1000 * 60); //hourly
+ Thread.sleep(60 * 1000 * 60); // hourly check
} catch (InterruptedException e) {
break;
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d5128328/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/YarnResourceAllocator.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/YarnResourceAllocator.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/YarnResourceAllocator.java
index 8fc3884..d694b7c 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/YarnResourceAllocator.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/YarnResourceAllocator.java
@@ -68,7 +68,7 @@ public class YarnResourceAllocator extends AbstractResourceAllocator {
int numClusterNodes = workerContext.getNumClusterNodes();
TajoConf conf = (TajoConf)workerContext.getQueryMaster().getConfig();
- int workerNum = conf.getIntVar(TajoConf.ConfVars.MAX_WORKER_PER_NODE);
+ int workerNum = conf.getIntVar(TajoConf.ConfVars.YARN_RM_WORKER_NUMBER_PER_NODE);
return numClusterNodes == 0 ? numTasks: Math.min(numTasks, numClusterNodes * workerNum);
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d5128328/tajo-core/tajo-core-backend/src/main/proto/ClientProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/proto/ClientProtocol.proto b/tajo-core/tajo-core-backend/src/main/proto/ClientProtocol.proto
deleted file mode 100644
index 43d99ef..0000000
--- a/tajo-core/tajo-core-backend/src/main/proto/ClientProtocol.proto
+++ /dev/null
@@ -1,171 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-option java_package = "org.apache.tajo.client";
-option java_outer_classname = "ClientProtocol";
-option java_generic_services = true;
-option java_generate_equals_and_hash = true;
-
-import "yarn_protos.proto";
-import "tajo_protos.proto";
-import "TajoIdProtos.proto";
-import "CatalogProtos.proto";
-import "PrimitiveProtos.proto";
-
-enum ResultCode {
- OK = 0;
- ERROR = 1;
-}
-
-message UpdateSessionVariableRequest {
- optional SessionIdProto sessionId = 1;
- repeated KeyValueProto setVariables = 2;
- repeated string unsetVariables = 3;
-}
-
-message QueryRequest {
- optional SessionIdProto sessionId = 1;
- required string query = 2;
- repeated KeyValueProto setVariables = 3;
-}
-
-message UpdateQueryResponse {
- required ResultCode resultCode = 1;
- optional string errorMessage = 2;
-}
-
-message SubmitQueryResponse {
- required ResultCode resultCode = 1;
- optional string queryId = 2;
- optional string errorMessage = 3;
-}
-
-message GetQueryResultRequest {
- optional SessionIdProto sessionId = 1;
- required string queryId = 2;
-}
-
-message GetQueryResultResponse {
- optional TableDescProto tableDesc = 1;
- optional string errorMessage = 2;
-}
-
-message GetQueryListRequest {
- optional SessionIdProto sessionId = 1;
-}
-
-message BriefQueryStatus {
- required string queryId = 1;
- required QueryState state = 2;
- required int32 executionTime = 3;
-}
-
-message GetQueryListResponse {
- repeated BriefQueryStatus queryList = 1;
-}
-
-message GetQueryStatusRequest {
- optional SessionIdProto sessionId = 1;
- required string queryId = 2;
-}
-
-message GetQueryStatusResponse {
- required ResultCode resultCode = 1;
- required string queryId = 2;
- optional QueryState state = 3;
- optional float progress = 4;
- optional int64 submitTime = 5;
- optional int64 initTime = 6;
- optional int64 finishTime = 7;
- optional bool hasResult = 8;
- optional string errorMessage = 9;
- optional string queryMasterHost = 10;
- optional int32 queryMasterPort = 11;
-}
-
-message GetClusterInfoRequest {
- optional SessionIdProto sessionId = 1;
-}
-
-message GetClusterInfoResponse {
- repeated string serverName = 1;
-}
-
-message GetTableListRequest {
- optional SessionIdProto sessionId = 1;
-}
-
-message GetTableListResponse {
- repeated string tables = 1;
-}
-
-message GetTableDescRequest {
- optional SessionIdProto sessionId = 1;
- required string tableName = 2;
-}
-
-message CreateTableRequest {
- required string name = 1;
- required string path = 2;
- required TableProto meta = 3;
-}
-
-message AttachTableRequest {
- required string name = 1;
- required string path = 2;
-}
-
-message TableResponse {
- optional TableDescProto tableDesc = 1;
- optional string errorMessage = 2;
-}
-
-service ClientProtocolService {
- rpc updateSessionVariables(UpdateSessionVariableRequest) returns (BoolProto);
- rpc submitQuery(QueryRequest) returns (SubmitQueryResponse);
- rpc updateQuery(QueryRequest) returns (UpdateQueryResponse);
- rpc getQueryResult(GetQueryResultRequest) returns (GetQueryResultResponse);
- rpc getQueryList(GetQueryListRequest) returns (GetQueryListResponse);
- rpc getQueryStatus(GetQueryStatusRequest) returns (GetQueryStatusResponse);
- rpc killQuery(StringProto) returns (BoolProto);
- rpc getClusterInfo(GetClusterInfoRequest) returns (GetClusterInfoResponse);
- rpc existTable(StringProto) returns (BoolProto);
- rpc getTableList(GetTableListRequest) returns (GetTableListResponse);
- rpc getTableDesc(GetTableDescRequest) returns (TableResponse);
- rpc createTable(CreateTableRequest) returns (TableResponse);
- rpc dropTable(StringProto) returns (BoolProto);
- rpc attachTable(AttachTableRequest) returns (TableResponse);
- rpc detachTable(StringProto) returns (BoolProto);
-
-
- // TODO - to be implemented
- //
- // authenticate
- //
- // getSessionVariableList
- // dropTable
- // detachTable
- // createIndex
- // dropIndex
- // registerUDF
- // dropUDF
- // listUdfs
- // getUDFDesc
- // registerJars
- // getListRegisteredJars
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d5128328/tajo-core/tajo-core-backend/src/main/proto/ClientProtos.proto
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/proto/ClientProtos.proto b/tajo-core/tajo-core-backend/src/main/proto/ClientProtos.proto
index f3b1005..5a70f91 100644
--- a/tajo-core/tajo-core-backend/src/main/proto/ClientProtos.proto
+++ b/tajo-core/tajo-core-backend/src/main/proto/ClientProtos.proto
@@ -131,6 +131,7 @@ message AttachTableRequest {
}
message TableResponse {
- optional TableDescProto tableDesc = 1;
- optional string errorMessage = 2;
+ required ResultCode resultCode = 1;
+ optional TableDescProto tableDesc = 2;
+ optional string errorMessage = 3;
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d5128328/tajo-core/tajo-core-backend/src/main/proto/TajoMasterClientProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/proto/TajoMasterClientProtocol.proto b/tajo-core/tajo-core-backend/src/main/proto/TajoMasterClientProtocol.proto
index 26dbbed..17b237b 100644
--- a/tajo-core/tajo-core-backend/src/main/proto/TajoMasterClientProtocol.proto
+++ b/tajo-core/tajo-core-backend/src/main/proto/TajoMasterClientProtocol.proto
@@ -41,7 +41,7 @@ service TajoMasterClientProtocolService {
rpc existTable(StringProto) returns (BoolProto);
rpc getTableList(GetTableListRequest) returns (GetTableListResponse);
rpc getTableDesc(GetTableDescRequest) returns (TableResponse);
- rpc createTable(CreateTableRequest) returns (TableResponse);
+ rpc createExternalTable(CreateTableRequest) returns (TableResponse);
rpc dropTable(StringProto) returns (BoolProto);
rpc attachTable(AttachTableRequest) returns (TableResponse);
rpc detachTable(StringProto) returns (BoolProto);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d5128328/tajo-core/tajo-core-backend/src/main/resources/catalog-default.xml
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/resources/catalog-default.xml b/tajo-core/tajo-core-backend/src/main/resources/catalog-default.xml
new file mode 100644
index 0000000..52f204a
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/resources/catalog-default.xml
@@ -0,0 +1,32 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+
+<configuration>
+ <property>
+ <name>tajo.catalog.store.class</name>
+ <value>org.apache.tajo.catalog.store.DerbyStore</value>
+ </property>
+
+ <property>
+ <name>tajo.catalog.jdbc.uri</name>
+ <value>jdbc:derby:/tmp/tajo-catalog-${user.name}/db;create=true</value>
+ </property>
+</configuration>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d5128328/tajo-core/tajo-core-backend/src/main/resources/tajo-default.xml
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/resources/tajo-default.xml b/tajo-core/tajo-core-backend/src/main/resources/tajo-default.xml
index ac17805..ab24740 100644
--- a/tajo-core/tajo-core-backend/src/main/resources/tajo-default.xml
+++ b/tajo-core/tajo-core-backend/src/main/resources/tajo-default.xml
@@ -21,91 +21,7 @@
<configuration>
<property>
- <name>tajo.cluster.distributed</name>
- <value>false</value>
- </property>
-
- <property>
- <name>tajo.rootdir</name>
- <value>/tajo</value>
- <description>A base for other temporary directories.</description>
- </property>
-
- <property>
- <name>tajo.staging.root.dir</name>
- <value>/tmp/tajo-${user.name}/staging</value>
- </property>
-
- <property>
- <name>tajo.task.localdir</name>
- <value>/tmp/tajo-localdir</value>
- </property>
-
- <property>
- <name>tajo.master.manager.addr</name>
- <value>127.0.0.1:9004</value>
- </property>
-
- <property>
- <name>tajo.query.session.timeout</name>
- <value>60000</value>
- <description>ms</description>
- </property>
-
- <property>
- <name>tajo.query.session.timeout</name>
- <value>60000</value>
- <description>ms</description>
- </property>
-
- <property>
- <name>tajo.resource.manager</name>
- <value>org.apache.tajo.master.rm.YarnTajoResourceManager</value>
- <description>This can be org.apache.tajo.master.rm.TajoWorkerResourceManager or org.apache.tajo.master.rm.YarnTajoResourceManager</description>
- </property>
-
- <property>
- <name>tajo.querymaster.memoryMB</name>
- <value>512</value>
- <description>the memory slot size for a QeuryMaster</description>
- </property>
-
- <property>
- <name>tajo.worker.slots.use.os.info</name>
- <value>true</value>
- <description>If true, Tajo system obtains the physical resource information from OS.
- If false, the physical resource information is obtained from the below configs.</description>
- </property>
-
- <!-- Default Node's Physical information -->
- <!-- The below configs are used if tajo.worker.slots.use.os.info is set to true. -->
- <property>
- <name>tajo.worker.slots.os.memory.ratio</name>
- <value>0.8f</value>
- <description>The ratio of allocatable memory to the total system memory</description>
- </property>
-
- <property>
- <name>tajo.worker.slots.memoryMB</name>
- <value>2048</value>
- <description></description>
- </property>
-
- <property>
- <name>tajo.worker.slots.disk</name>
- <value>2</value>
- <description>The number of disks on a worker</description>
- </property>
-
- <property>
- <name>tajo.worker.slots.disk.concurrency</name>
- <value>4</value>
- <description>the maximum concurrency number per disk slot</description>
- </property>
-
- <property>
- <name>tajo.worker.slots.cpu.core</name>
- <value>4</value>
- <description>The number of CPU cores on a worker</description>
+ <name>tajo.worker.tmpdir.locations</name>
+ <value>/tmp/tajo-${user.name}/tmpdir</value>
</property>
</configuration>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d5128328/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/cluster.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/cluster.jsp b/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/cluster.jsp
index 20dbc7c..8cf20ae 100644
--- a/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/cluster.jsp
+++ b/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/cluster.jsp
@@ -64,7 +64,7 @@
%>
<tr>
<td width='30' align='right'><%=no++%></td>
- <td><a href='<%=workerHttp%>'><%=worker.getAllocatedHost() + ":" + worker.getManagerPort()%></a></td>
+ <td><a href='<%=workerHttp%>'><%=worker.getAllocatedHost() + ":" + worker.getPeerRpcPort()%></a></td>
<td width='100'><%=worker.portsToStr()%></td>
<td width='100' align='right'><%=worker.getNumRunningTasks()%></td>
<td width='100' align='right'><%=worker.getUsedSlots()%>/<%=worker.getSlots()%></td>
@@ -101,7 +101,7 @@
%>
<tr>
<td width='30' align='right'><%=no++%></td>
- <td><%=worker.getAllocatedHost() + ":" + worker.getManagerPort()%></td>
+ <td><%=worker.getAllocatedHost() + ":" + worker.getPeerRpcPort()%></td>
<td><%=worker.portsToStr()%></td>
<td><%=worker.getNumRunningTasks()%></td>
<td><%=worker.getUsedSlots()%>/<%=worker.getSlots()%></td>
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d5128328/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/index.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/index.jsp b/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/index.jsp
index f592c7d..684de7d 100644
--- a/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/index.jsp
+++ b/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/index.jsp
@@ -1,12 +1,16 @@
<%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%>
-<%@ page import="java.util.*" %>
-<%@ page import="org.apache.tajo.webapp.StaticHttpServer" %>
-<%@ page import="org.apache.tajo.master.*" %>
-<%@ page import="org.apache.tajo.master.rm.*" %>
-<%@ page import="org.apache.tajo.catalog.*" %>
+<%@ page import="org.apache.hadoop.fs.FileSystem" %>
+<%@ page import="org.apache.tajo.conf.TajoConf" %>
+<%@ page import="org.apache.tajo.master.TajoMaster" %>
<%@ page import="org.apache.tajo.master.querymaster.QueryInProgress" %>
+<%@ page import="org.apache.tajo.master.rm.WorkerResource" %>
+<%@ page import="org.apache.tajo.master.rm.WorkerStatus" %>
<%@ page import="org.apache.tajo.util.NetUtils" %>
+<%@ page import="org.apache.tajo.webapp.StaticHttpServer" %>
+<%@ page import="java.util.Collection" %>
+<%@ page import="java.util.Date" %>
+<%@ page import="java.util.Map" %>
<%
TajoMaster master = (TajoMaster) StaticHttpServer.getInstance().getAttribute("tajo.info.server.object");
@@ -79,26 +83,31 @@
<hr/>
<h3>Master Status</h3>
<table border='0'>
- <tr><td width="100">Version:</td><td><%=master.getVersion()%></td></tr>
- <tr><td width="100">Started:</td><td><%=new Date(master.getStartTime())%></td></tr>
- <tr><td width="100">Meta Store:</td><td><%=master.getCatalogServer().getCatalogServerName()%></td></tr>
- <tr><td width="100">Client Service:</td><td><%=NetUtils.normalizeInetSocketAddress(master.getTajoMasterClientService().getBindAddress())%></td></tr>
- <tr><td width='100'>MaxHeap: </td><td><%=Runtime.getRuntime().maxMemory()/1024/1024%> MB</td>
- <tr><td width='100'>TotalHeap: </td><td><%=Runtime.getRuntime().totalMemory()/1024/1024%> MB</td>
- <tr><td width='100'>FreeHeap: </td><td><%=Runtime.getRuntime().freeMemory()/1024/1024%> MB</td>
- <tr><td width="100">Configuration:</td><td><a href='conf.jsp'>detail...</a></td></tr>
- <tr><td width="100">Environment:</td><td><a href='env.jsp'>detail...</a></td></tr>
- <tr><td width="100">Threads:</td><td><a href='thread.jsp'>thread dump...</a></tr>
+ <tr><td width='150'>Version:</td><td><%=master.getVersion()%></td></tr>
+ <tr><td width='150'>Started:</td><td><%=new Date(master.getStartTime())%></td></tr>
+ <tr><td width='150'>File System:</td><td><%=master.getContext().getConf().get(FileSystem.FS_DEFAULT_NAME_KEY)%></td></tr>
+ <tr><td width='150'>Root dir:</td><td><%=TajoConf.getTajoRootDir(master.getContext().getConf())%></td></tr>
+ <tr><td width='150'>System dir:</td><td><%=TajoConf.getSystemDir(master.getContext().getConf())%></td></tr>
+ <tr><td width='150'>Warehouse dir:</td><td><%=TajoConf.getWarehouseDir(master.getContext().getConf())%></td></tr>
+ <tr><td width='150'>Staging dir:</td><td><%=TajoConf.getStagingDir(master.getContext().getConf())%></td></tr>
+ <tr><td width='150'>Client Service:</td><td><%=NetUtils.normalizeInetSocketAddress(master.getTajoMasterClientService().getBindAddress())%></td></tr>
+ <tr><td width='150'>Catalog Service:</td><td><%=master.getCatalogServer().getCatalogServerName()%></td></tr>
+ <tr><td width='150'>MaxHeap: </td><td><%=Runtime.getRuntime().maxMemory()/1024/1024%> MB</td>
+ <tr><td width='150'>TotalHeap: </td><td><%=Runtime.getRuntime().totalMemory()/1024/1024%> MB</td>
+ <tr><td width='150'>FreeHeap: </td><td><%=Runtime.getRuntime().freeMemory()/1024/1024%> MB</td>
+ <tr><td width='150'>Configuration:</td><td><a href='conf.jsp'>detail...</a></td></tr>
+ <tr><td width='150'>Environment:</td><td><a href='env.jsp'>detail...</a></td></tr>
+ <tr><td width='150'>Threads:</td><td><a href='thread.jsp'>thread dump...</a></tr>
</table>
<hr/>
<h3>Cluster Summary</h3>
<table border='0' width="100%">
<tr>
- <td width="100"><a href='cluster.jsp'>Workers:</a></td><td>Total: <%=workers.size()%> Live: <%=numLiveWorkers%> Dead: <%=numDeadWorkers%></td>
+ <td width='150'><a href='cluster.jsp'>Workers:</a></td><td>Total: <%=workers.size()%> Live: <%=numLiveWorkers%> Dead: <%=numDeadWorkers%></td>
</tr>
<tr>
- <td width="100">Task Slots</td><td>Total: <%=totalSlot%> Occupied:<%=runningSlot%> Idle: <%=idleSlot%></td>
+ <td width='150'>Task Slots</td><td>Total: <%=totalSlot%> Occupied:<%=runningSlot%> Idle: <%=idleSlot%></td>
</tr>
</table>
<hr/>
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d5128328/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/query.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/query.jsp b/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/query.jsp
index 7a5fdee..241c371 100644
--- a/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/query.jsp
+++ b/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/query.jsp
@@ -1,14 +1,18 @@
<%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%>
-<%@ page import="java.util.*" %>
-<%@ page import="org.apache.tajo.webapp.StaticHttpServer" %>
-<%@ page import="org.apache.tajo.master.*" %>
-<%@ page import="org.apache.tajo.util.*" %>
+<%@ page import="org.apache.tajo.master.TajoMaster" %>
<%@ page import="org.apache.tajo.master.querymaster.QueryInProgress" %>
+<%@ page import="org.apache.tajo.util.JSPUtil" %>
+<%@ page import="org.apache.tajo.util.StringUtils" %>
+<%@ page import="org.apache.tajo.webapp.StaticHttpServer" %>
<%@ page import="java.text.SimpleDateFormat" %>
+<%@ page import="java.util.List" %>
+<%@ page import="static org.apache.tajo.conf.TajoConf.ConfVars.WORKER_INFO_ADDRESS" %>
+<%@ page import="org.apache.tajo.conf.TajoConf" %>
<%
TajoMaster master = (TajoMaster) StaticHttpServer.getInstance().getAttribute("tajo.info.server.object");
+ TajoConf conf = master.getContext().getConf();
List<QueryInProgress> runningQueries =
JSPUtil.sortQueryInProgress(master.getContext().getQueryJobManager().getRunningQueries(), true);
@@ -17,8 +21,7 @@
JSPUtil.sortQueryInProgress(master.getContext().getQueryJobManager().getFinishedQueries(), true);
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
- int workerHttpPort = master.getConfig().getInt("tajo.worker.http.port", 28080);
-
+ int workerHttpPort = Integer.valueOf(conf.getVar(WORKER_INFO_ADDRESS).split(":")[1]);
%>
<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d5128328/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/BackendTestingUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/BackendTestingUtil.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/BackendTestingUtil.java
index d1efc1c..61a8958 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/BackendTestingUtil.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/BackendTestingUtil.java
@@ -21,6 +21,8 @@
*/
package org.apache.tajo;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.tajo.catalog.CatalogUtil;
@@ -38,6 +40,7 @@ import java.io.IOException;
public class BackendTestingUtil {
public final static Schema mockupSchema;
public final static TableMeta mockupMeta;
+ private static final Log LOG = LogFactory.getLog(BackendTestingUtil.class);
static {
mockupSchema = new Schema();
@@ -46,23 +49,20 @@ public class BackendTestingUtil {
mockupMeta = CatalogUtil.newTableMeta(mockupSchema, StoreType.CSV);
}
- public static void writeTmpTable(TajoConf conf, Path path,
- String tableName, boolean writeMeta)
+ public static void writeTmpTable(TajoConf conf, Path tablePath)
throws IOException {
- AbstractStorageManager sm = StorageManagerFactory.getStorageManager(conf, path);
+ AbstractStorageManager sm = StorageManagerFactory.getStorageManager(conf, tablePath);
FileSystem fs = sm.getFileSystem();
+
Appender appender;
- Path tablePath = StorageUtil.concatPath(path, tableName, "table.csv");
- if (fs.exists(tablePath.getParent())) {
- fs.delete(tablePath.getParent(), true);
+ Path filePath = new Path(tablePath, "table.csv");
+ if (fs.exists(tablePath)) {
+ fs.delete(tablePath, true);
}
- fs.mkdirs(tablePath.getParent());
+ fs.mkdirs(tablePath);
- if (writeMeta) {
- FileUtil.writeProto(fs, new Path(tablePath.getParent(), ".meta"), mockupMeta.getProto());
- }
- appender = StorageManagerFactory.getStorageManager(conf).getAppender(mockupMeta, tablePath);
+ appender = sm.getAppender(mockupMeta, filePath);
appender.init();
int deptSize = 10000;
@@ -77,12 +77,4 @@ public class BackendTestingUtil {
}
appender.close();
}
-
- public static void writeTmpTable(TajoConf conf, String parent,
- String tableName, boolean writeMeta) throws IOException {
- writeTmpTable(conf, new Path(parent), tableName, writeMeta);
- }
-
- public BackendTestingUtil(TajoConf conf) throws IOException {
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d5128328/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java
index 40fc45f..178e8b3 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java
@@ -73,8 +73,7 @@ public class LocalTajoTestingUtility {
client = new TajoClient(conf);
FileSystem fs = util.getDefaultFileSystem();
- Path rootDir = util.getMaster().
- getStorageManager().getBaseDir();
+ Path rootDir = util.getMaster().getStorageManager().getWarehouseDir();
fs.mkdirs(rootDir);
for (int i = 0; i < tablepaths.length; i++) {
Path localPath = new Path(tablepaths[i]);
@@ -84,7 +83,7 @@ public class LocalTajoTestingUtility {
fs.copyFromLocalFile(localPath, dfsPath);
TableMeta meta = CatalogUtil.newTableMeta(schemas[i],
CatalogProtos.StoreType.CSV, option);
- client.createTable(names[i], tablePath, meta);
+ client.createExternalTable(names[i], tablePath, meta);
}
LOG.info("===================================================");
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d5128328/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/MiniTajoYarnCluster.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/MiniTajoYarnCluster.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/MiniTajoYarnCluster.java
index 3a70ac5..be0169d 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/MiniTajoYarnCluster.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/MiniTajoYarnCluster.java
@@ -39,7 +39,6 @@ import org.apache.hadoop.yarn.service.AbstractService;
import org.apache.hadoop.yarn.service.Service;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.pullserver.PullServerAuxService;
-import org.apache.tajo.util.NetUtils;
import java.io.File;
import java.io.IOException;
@@ -108,7 +107,7 @@ public class MiniTajoYarnCluster extends MiniYARNCluster {
conf.setInt(TajoConf.ConfVars.PULLSERVER_PORT.name(), 0);
// local directory
- conf.set(TajoConf.ConfVars.TASK_LOCAL_DIR.name(), "/tmp/tajo-localdir");
+ conf.set(TajoConf.ConfVars.WORKER_TEMPORAL_DIR.name(), "/tmp/tajo-localdir");
conf.setClass(YarnConfiguration.NM_CONTAINER_EXECUTOR,
DefaultContainerExecutor.class, ContainerExecutor.class);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d5128328/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java
index 7f0e407..ffbd3c2 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java
@@ -24,10 +24,8 @@ import com.google.common.io.Closeables;
import com.google.common.io.Files;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.*;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalFileSystem;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
@@ -35,6 +33,7 @@ import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.Options;
import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.client.TajoClient;
import org.apache.tajo.conf.TajoConf;
@@ -88,18 +87,17 @@ public class TajoTestingCluster {
void initPropertiesAndConfigs() {
if (System.getProperty(ConfVars.RESOURCE_MANAGER_CLASS.varname) != null) {
- String testResourceManager = System.getProperty("tajo.resource.manager");
+ String testResourceManager = System.getProperty(ConfVars.RESOURCE_MANAGER_CLASS.varname);
Preconditions.checkState(
testResourceManager.equals(TajoWorkerResourceManager.class.getCanonicalName()) ||
testResourceManager.equals(YarnTajoResourceManager.class.getCanonicalName()),
- "tajo.resource.manager must be either " + TajoWorkerResourceManager.class.getCanonicalName() + " or " +
+ ConfVars.RESOURCE_MANAGER_CLASS.varname + " must be either " + TajoWorkerResourceManager.class.getCanonicalName() + " or " +
YarnTajoResourceManager.class.getCanonicalName() +"."
);
conf.set(ConfVars.RESOURCE_MANAGER_CLASS.varname, System.getProperty(ConfVars.RESOURCE_MANAGER_CLASS.varname));
}
- this.standbyWorkerMode =
- conf.get(ConfVars.RESOURCE_MANAGER_CLASS.varname, TajoWorkerResourceManager.class.getCanonicalName())
- .indexOf(TajoWorkerResourceManager.class.getName()) >= 0;
+ this.standbyWorkerMode = conf.getVar(ConfVars.RESOURCE_MANAGER_CLASS)
+ .indexOf(TajoWorkerResourceManager.class.getName()) >= 0;
conf.set(CommonTestingUtil.TAJO_TEST, "TRUE");
}
@@ -194,11 +192,8 @@ public class TajoTestingCluster {
// Set this just-started cluser as our filesystem.
this.defaultFS = this.dfsCluster.getFileSystem();
- this.conf.set("fs.defaultFS", defaultFS.getUri().toString());
- // Do old style too just to be safe.
- this.conf.set("fs.default.name", defaultFS.getUri().toString());
-
- this.conf.set(TajoConf.ConfVars.ROOT_DIR.name(), defaultFS.getUri() + "/tajo");
+ this.conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, defaultFS.getUri().toString());
+ this.conf.setVar(TajoConf.ConfVars.ROOT_DIR, defaultFS.getUri() + "/tajo");
return this.dfsCluster;
}
@@ -266,14 +261,13 @@ public class TajoTestingCluster {
final int numSlaves,
boolean local) throws Exception {
TajoConf c = getConfiguration();
- c.setVar(ConfVars.TASKRUNNER_LISTENER_ADDRESS, "localhost:0");
- c.setVar(ConfVars.CLIENT_SERVICE_ADDRESS, "localhost:0");
- c.setVar(ConfVars.TAJO_MASTER_SERVICE_ADDRESS, "localhost:0");
-
+ c.setVar(ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS, "localhost:0");
+ c.setVar(ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS, "localhost:0");
+ c.setVar(ConfVars.WORKER_PEER_RPC_ADDRESS, "localhost:0");
c.setVar(ConfVars.CATALOG_ADDRESS, "localhost:0");
c.set(CatalogConstants.STORE_CLASS, "org.apache.tajo.catalog.store.MemStore");
c.set(CatalogConstants.JDBC_URI, "jdbc:derby:" + testBuildDir.getAbsolutePath() + "/db");
- c.setVar(ConfVars.TASK_LOCAL_DIR, "file://" + testBuildDir.getAbsolutePath() + "/tajo-localdir");
+ c.setVar(ConfVars.WORKER_TEMPORAL_DIR, "file://" + testBuildDir.getAbsolutePath() + "/tajo-localdir");
LOG.info("derby repository is set to "+conf.get(CatalogConstants.JDBC_URI));
@@ -289,12 +283,12 @@ public class TajoTestingCluster {
tajoMaster.init(c);
tajoMaster.start();
- this.conf.setVar(ConfVars.TASKRUNNER_LISTENER_ADDRESS, c.getVar(ConfVars.TASKRUNNER_LISTENER_ADDRESS));
- this.conf.setVar(ConfVars.CLIENT_SERVICE_ADDRESS, c.getVar(ConfVars.CLIENT_SERVICE_ADDRESS));
+ this.conf.setVar(ConfVars.WORKER_PEER_RPC_ADDRESS, c.getVar(ConfVars.WORKER_PEER_RPC_ADDRESS));
+ this.conf.setVar(ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS, c.getVar(ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS));
InetSocketAddress tajoMasterAddress = tajoMaster.getContext().getTajoMasterService().getBindAddress();
- this.conf.setVar(ConfVars.TAJO_MASTER_SERVICE_ADDRESS,
+ this.conf.setVar(ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS,
tajoMasterAddress.getHostName() + ":" + tajoMasterAddress.getPort());
this.conf.setVar(ConfVars.CATALOG_ADDRESS, c.getVar(ConfVars.CATALOG_ADDRESS));
@@ -311,10 +305,9 @@ public class TajoTestingCluster {
TajoConf workerConf = new TajoConf(this.conf);
- workerConf.setInt("tajo.worker.info.port", 0);
- workerConf.setInt("tajo.worker.client.rpc.port", 0);
- workerConf.setInt("tajo.worker.manager.rpc.port", 0);
- workerConf.setInt(TajoConf.ConfVars.PULLSERVER_PORT.varname, 0);
+ workerConf.setVar(ConfVars.WORKER_INFO_ADDRESS, "localhost:0");
+ workerConf.setVar(ConfVars.WORKER_CLIENT_RPC_ADDRESS, "localhost:0");
+ workerConf.setVar(ConfVars.WORKER_PEER_RPC_ADDRESS, "localhost:0");
tajoWorker.startWorker(workerConf, new String[]{"standby"});
@@ -524,7 +517,7 @@ public class TajoTestingCluster {
writeLines(tableFile, tables[i]);
TableMeta meta = CatalogUtil
.newTableMeta(schemas[i], CatalogProtos.StoreType.CSV, option);
- client.createTable(tableNames[i], new Path(tableDir.getAbsolutePath()), meta);
+ client.createExternalTable(tableNames[i], new Path(tableDir.getAbsolutePath()), meta);
}
Thread.sleep(1000);
ResultSet res = client.executeQueryAndGetResult(query);
@@ -550,7 +543,7 @@ public class TajoTestingCluster {
FileSystem fs = util.getDefaultFileSystem();
Path rootDir = util.getMaster().
- getStorageManager().getBaseDir();
+ getStorageManager().getWarehouseDir();
fs.mkdirs(rootDir);
for (int i = 0; i < names.length; i++) {
Path tablePath = new Path(rootDir, names[i]);
@@ -563,7 +556,7 @@ public class TajoTestingCluster {
out.close();
TableMeta meta = CatalogUtil.newTableMeta(schemas[i],
CatalogProtos.StoreType.CSV, option);
- client.createTable(names[i], tablePath, meta);
+ client.createExternalTable(names[i], tablePath, meta);
}
Thread.sleep(1000);
ResultSet res = client.executeQueryAndGetResult(query);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d5128328/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/client/TestTajoClient.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/client/TestTajoClient.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/client/TestTajoClient.java
index 530cc20..09d8ceb 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/client/TestTajoClient.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/client/TestTajoClient.java
@@ -20,6 +20,8 @@ package org.apache.tajo.client;
import com.google.common.collect.Sets;
import com.google.protobuf.ServiceException;
+import com.sun.org.apache.commons.logging.Log;
+import com.sun.org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.tajo.BackendTestingUtil;
@@ -30,12 +32,12 @@ import org.apache.tajo.catalog.TableDesc;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.storage.StorageUtil;
import org.apache.tajo.util.CommonTestingUtil;
-import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import java.io.IOException;
+import java.sql.SQLException;
import java.util.Set;
import static org.junit.Assert.*;
@@ -44,61 +46,54 @@ import static org.junit.Assert.*;
public class TestTajoClient {
private static TajoTestingCluster cluster;
private static TajoConf conf;
- private static TajoClient tajo;
- private static String TEST_PATH = "target/test-data/"
- + TestTajoClient.class.getName();
+ private static TajoClient client;
private static Path testDir;
@BeforeClass
public static void setUp() throws Exception {
cluster = TpchTestBase.getInstance().getTestingCluster();
conf = cluster.getConfiguration();
- tajo = new TajoClient(conf);
- testDir = CommonTestingUtil.getTestDir(TEST_PATH);
+ client = new TajoClient(conf);
+ testDir = CommonTestingUtil.getTestDir();
}
private static Path writeTmpTable(String tableName) throws IOException {
Path tablePath = StorageUtil.concatPath(testDir, tableName);
- BackendTestingUtil.writeTmpTable(conf, testDir, tableName, true);
+ BackendTestingUtil.writeTmpTable(conf, tablePath);
return tablePath;
}
@Test
- public final void testAttachTable() throws IOException, ServiceException {
- final String tableName = "attach";
- Path tablePath = writeTmpTable(tableName);
- assertFalse(tajo.existTable(tableName));
- tajo.attachTable(tableName, tablePath);
- assertTrue(tajo.existTable(tableName));
- tajo.detachTable(tableName);
- assertFalse(tajo.existTable(tableName));
- }
-
- @Test
public final void testUpdateQuery() throws IOException, ServiceException {
final String tableName = "testUpdateQuery";
Path tablePath = writeTmpTable(tableName);
- assertFalse(tajo.existTable(tableName));
+ assertFalse(client.existTable(tableName));
String sql =
"create external table " + tableName + " (deptname text, score integer) "
+ "using csv location '" + tablePath + "'";
- tajo.updateQuery(sql);
- assertTrue(tajo.existTable(tableName));
+ client.updateQuery(sql);
+ assertTrue(client.existTable(tableName));
+ client.detachTable(tableName);
+ assertFalse(client.existTable(tableName));
}
@Test
public final void testCreateAndDropTable()
- throws IOException, ServiceException {
+ throws IOException, ServiceException, SQLException {
final String tableName = "testCreateAndDropTable";
Path tablePath = writeTmpTable(tableName);
-
- assertFalse(tajo.existTable(tableName));
- tajo.createTable(tableName, tablePath, BackendTestingUtil.mockupMeta);
- assertTrue(tajo.existTable(tableName));
- tajo.dropTable(tableName);
- assertFalse(tajo.existTable(tableName));
+ LOG.error("Full path:" + tablePath.toUri().getRawPath());
FileSystem fs = tablePath.getFileSystem(conf);
+ assertTrue(fs.exists(tablePath));
+
+ assertFalse(client.existTable(tableName));
+
+ client.createExternalTable(tableName, tablePath, BackendTestingUtil.mockupMeta);
+ assertTrue(client.existTable(tableName));
+ client.dropTable(tableName);
+ assertFalse(client.existTable(tableName));
+ fs = tablePath.getFileSystem(conf);
assertFalse(fs.exists(tablePath));
}
@@ -107,40 +102,39 @@ public class TestTajoClient {
TajoConf conf = cluster.getConfiguration();
final String tableName = "testCreateAndDropExternalTableByExecuteQuery";
- BackendTestingUtil.writeTmpTable(conf, CommonTestingUtil.getTestDir(), tableName, false);
Path tablePath = writeTmpTable(tableName);
- assertFalse(tajo.existTable(tableName));
+ assertFalse(client.existTable(tableName));
String sql = "create external table " + tableName + " (deptname text, score int4) " + "using csv location '"
+ tablePath + "'";
- tajo.executeQueryAndGetResult(sql);
- assertTrue(tajo.existTable(tableName));
+ client.executeQueryAndGetResult(sql);
+ assertTrue(client.existTable(tableName));
- tajo.updateQuery("drop table " + tableName);
- assertFalse(tajo.existTable(tableName));
+ client.updateQuery("drop table " + tableName);
+ assertFalse(client.existTable(tableName));
FileSystem localFS = FileSystem.getLocal(conf);
assertFalse(localFS.exists(tablePath));
}
@Test
- public final void testCreateAndDropTableByExecuteQuery() throws IOException, ServiceException {
+ public final void testCreateAndDropTableByExecuteQuery() throws IOException, ServiceException, SQLException {
TajoConf conf = cluster.getConfiguration();
final String tableName = "testCreateAndDropTableByExecuteQuery";
- assertFalse(tajo.existTable(tableName));
+ assertFalse(client.existTable(tableName));
String sql = "create table " + tableName + " (deptname text, score int4)";
- tajo.updateQuery(sql);
- assertTrue(tajo.existTable(tableName));
+ client.updateQuery(sql);
+ assertTrue(client.existTable(tableName));
- Path tablePath = tajo.getTableDesc(tableName).getPath();
+ Path tablePath = client.getTableDesc(tableName).getPath();
FileSystem hdfs = tablePath.getFileSystem(conf);
assertTrue(hdfs.exists(tablePath));
- tajo.updateQuery("drop table " + tableName);
- assertFalse(tajo.existTable(tableName));
+ client.updateQuery("drop table " + tableName);
+ assertFalse(client.existTable(tableName));
assertFalse(hdfs.exists(tablePath));
}
@@ -148,44 +142,51 @@ public class TestTajoClient {
public final void testDDLByExecuteQuery() throws IOException, ServiceException {
TajoConf conf = cluster.getConfiguration();
final String tableName = "testDDLByExecuteQuery";
- BackendTestingUtil.writeTmpTable(conf, CommonTestingUtil.getTestDir(), tableName, false);
+ BackendTestingUtil.writeTmpTable(conf, CommonTestingUtil.getTestDir());
- assertFalse(tajo.existTable(tableName));
+ assertFalse(client.existTable(tableName));
String sql =
"create external table " + tableName + " (deptname text, score int4) "
+ "using csv location 'file:///tmp/" + tableName + "'";
- tajo.executeQueryAndGetResult(sql);
- assertTrue(tajo.existTable(tableName));
+ client.executeQueryAndGetResult(sql);
+ assertTrue(client.existTable(tableName));
}
@Test
public final void testGetTableList() throws IOException, ServiceException {
- final String tableName1 = "table1";
- final String tableName2 = "table2";
- Path table1Path = writeTmpTable(tableName1);
- Path table2Path = writeTmpTable(tableName2);
-
- assertFalse(tajo.existTable(tableName1));
- assertFalse(tajo.existTable(tableName2));
- tajo.attachTable(tableName1, table1Path);
- assertTrue(tajo.existTable(tableName1));
- Set<String> tables = Sets.newHashSet(tajo.getTableList());
- assertTrue(tables.contains(tableName1));
- tajo.attachTable(tableName2, table2Path);
- assertTrue(tajo.existTable(tableName2));
- tables = Sets.newHashSet(tajo.getTableList());
+ String tableName1 = "GetTableList1".toLowerCase();
+ String tableName2 = "GetTableList2".toLowerCase();
+
+ assertFalse(client.existTable(tableName1));
+ assertFalse(client.existTable(tableName2));
+ client.updateQuery("create table GetTableList1 (age int, name text);");
+ client.updateQuery("create table GetTableList2 (age int, name text);");
+
+ assertTrue(client.existTable(tableName1));
+ assertTrue(client.existTable(tableName2));
+
+ Set<String> tables = Sets.newHashSet(client.getTableList());
assertTrue(tables.contains(tableName1));
assertTrue(tables.contains(tableName2));
}
+ Log LOG = LogFactory.getLog(TestTajoClient.class);
+
@Test
- public final void testGetTableDesc() throws IOException, ServiceException {
+ public final void testGetTableDesc() throws IOException, ServiceException, SQLException {
final String tableName1 = "table3";
Path tablePath = writeTmpTable(tableName1);
- assertFalse(tajo.existTable(tableName1));
- tajo.attachTable(tableName1, tablePath);
- assertTrue(tajo.existTable(tableName1));
- TableDesc desc = tajo.getTableDesc(tableName1);
+ LOG.error("Full path:" + tablePath.toUri().getRawPath());
+ FileSystem fs = tablePath.getFileSystem(conf);
+ assertTrue(fs.exists(tablePath));
+
+ assertNotNull(tablePath);
+ assertFalse(client.existTable(tableName1));
+
+ client.createExternalTable("table3", tablePath, BackendTestingUtil.mockupMeta);
+ assertTrue(client.existTable(tableName1));
+
+ TableDesc desc = client.getTableDesc(tableName1);
assertNotNull(desc);
assertEquals(tableName1, desc.getName());
assertTrue(desc.getMeta().getStat().getNumBytes() > 0);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d5128328/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/storage/TestRowFile.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/storage/TestRowFile.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/storage/TestRowFile.java
index d5077c2..147fe1d 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/storage/TestRowFile.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/storage/TestRowFile.java
@@ -70,8 +70,7 @@ public class TestRowFile {
TableMeta meta = CatalogUtil.newTableMeta(schema, StoreType.ROWFILE);
- AbstractStorageManager sm = StorageManagerFactory.getStorageManager(conf,
- new Path(conf.get(TajoConf.ConfVars.ROOT_DIR.name())));
+ AbstractStorageManager sm = StorageManagerFactory.getStorageManager(conf, new Path(conf.getVar(ConfVars.ROOT_DIR)));
Path tablePath = new Path("/test");
Path metaPath = new Path(tablePath, ".meta");
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d5128328/tajo-core/tajo-core-backend/src/test/resources/catalog-default.xml
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/resources/catalog-default.xml b/tajo-core/tajo-core-backend/src/test/resources/catalog-default.xml
deleted file mode 100644
index 05083a4..0000000
--- a/tajo-core/tajo-core-backend/src/test/resources/catalog-default.xml
+++ /dev/null
@@ -1,37 +0,0 @@
-<?xml version="1.0"?>
-<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
-
-<!--
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- -->
-
-<configuration>
- <property>
- <name>tajo.catalog.master.addr</name>
- <value>127.0.0.1:9002</value>
- </property>
-
- <property>
- <name>catalog.store.class</name>
- <value>org.apache.tajo.catalog.store.MemStore</value>
- </property>
-
- <property>
- <name>catalog.jdbc.uri</name>
- <value>jdbc:derby:target/test-data/tcat/db</value>
- </property>
-</configuration>
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d5128328/tajo-core/tajo-core-backend/src/test/resources/tajo-default.xml
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/resources/tajo-default.xml b/tajo-core/tajo-core-backend/src/test/resources/tajo-default.xml
deleted file mode 100644
index 20573d3..0000000
--- a/tajo-core/tajo-core-backend/src/test/resources/tajo-default.xml
+++ /dev/null
@@ -1,43 +0,0 @@
-<?xml version="1.0"?>
-<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
-
-<!--
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- -->
-
-<configuration>
- <property>
- <name>tajo.cluster.distributed</name>
- <value>false</value>
- </property>
-
- <property>
- <name>tajo.rootdir</name>
- <value>/tajo</value>
- <description>A base for other temporary directories.</description>
- </property>
-
- <property>
- <name>tajo.staging.root.dir</name>
- <value>/tmp/tajo-${user.name}/staging</value>
- </property>
-
- <property>
- <name>tajo.resource.manager</name>
- <value>org.apache.tajo.master.rm.TajoWorkerResourceManager</value>
- </property>
-</configuration>
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d5128328/tajo-core/tajo-core-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerAuxService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerAuxService.java b/tajo-core/tajo-core-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerAuxService.java
index 93cf4e5..9574fc8 100644
--- a/tajo-core/tajo-core-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerAuxService.java
+++ b/tajo-core/tajo-core-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerAuxService.java
@@ -324,18 +324,13 @@ public class PullServerAuxService extends AbstractService
}
class PullServer extends SimpleChannelUpstreamHandler {
-
private final Configuration conf;
-// private final IndexCache indexCache;
- private final LocalDirAllocator lDirAlloc =
- new LocalDirAllocator(ConfVars.TASK_LOCAL_DIR.varname);
+ private final LocalDirAllocator lDirAlloc = new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname);
private int port;
public PullServer(Configuration conf) {
this.conf = conf;
-// indexCache = new IndexCache(new JobConf(conf));
- this.port = conf.getInt(ConfVars.PULLSERVER_PORT.varname,
- ConfVars.PULLSERVER_PORT.defaultIntVal);
+ this.port = conf.getInt(ConfVars.PULLSERVER_PORT.varname, ConfVars.PULLSERVER_PORT.defaultIntVal);
}
public void setPort(int port) {
@@ -397,7 +392,7 @@ public class PullServerAuxService extends AbstractService
LOG.info("PullServer request param: repartitionType=" + repartitionType +
", sid=" + sid + ", partitionId=" + partitionId + ", taskIds=" + taskIdList);
- String taskLocalDir = conf.get(ConfVars.TASK_LOCAL_DIR.varname);
+ String taskLocalDir = conf.get(ConfVars.WORKER_TEMPORAL_DIR.varname);
if (taskLocalDir == null ||
taskLocalDir.equals("")) {
LOG.error("Tajo local directory should be specified.");
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d5128328/tajo-core/tajo-core-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java b/tajo-core/tajo-core-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
index 3df3672..256f99c 100644
--- a/tajo-core/tajo-core-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
+++ b/tajo-core/tajo-core-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
@@ -319,7 +319,7 @@ public class TajoPullServerService extends AbstractService {
private final Configuration conf;
// private final IndexCache indexCache;
private final LocalDirAllocator lDirAlloc =
- new LocalDirAllocator(ConfVars.TASK_LOCAL_DIR.varname);
+ new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname);
private int port;
public PullServer(Configuration conf) {
@@ -390,7 +390,7 @@ public class TajoPullServerService extends AbstractService {
// the working dir of tajo worker for each query
String queryBaseDir = queryId.toString() + "/output";
- LOG.info("PullServer baseDir: " + conf.get(ConfVars.TASK_LOCAL_DIR.varname) + "/" + queryBaseDir);
+ LOG.info("PullServer baseDir: " + conf.get(ConfVars.WORKER_TEMPORAL_DIR.varname) + "/" + queryBaseDir);
// if a subquery requires a range partitioning
if (repartitionType.equals("r")) {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d5128328/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/AbstractStorageManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/AbstractStorageManager.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/AbstractStorageManager.java
index f8cf94d..ef02d2b 100644
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/AbstractStorageManager.java
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/AbstractStorageManager.java
@@ -45,7 +45,6 @@ public abstract class AbstractStorageManager {
protected final TajoConf conf;
protected final FileSystem fs;
- protected final Path baseDir;
protected final Path tableBaseDir;
protected final boolean blocksMetadataEnabled;
@@ -73,9 +72,8 @@ public abstract class AbstractStorageManager {
protected AbstractStorageManager(TajoConf conf) throws IOException {
this.conf = conf;
- this.baseDir = new Path(conf.getVar(TajoConf.ConfVars.ROOT_DIR));
- this.tableBaseDir = TajoConf.getWarehousePath(conf);
- this.fs = baseDir.getFileSystem(conf);
+ this.tableBaseDir = TajoConf.getWarehouseDir(conf);
+ this.fs = tableBaseDir.getFileSystem(conf);
this.blocksMetadataEnabled = conf.getBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED,
DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT);
if (!this.blocksMetadataEnabled)
@@ -99,11 +97,7 @@ public abstract class AbstractStorageManager {
return this.fs;
}
- public Path getBaseDir() {
- return this.baseDir;
- }
-
- public Path getTableBaseDir() {
+ public Path getWarehouseDir() {
return this.tableBaseDir;
}
[3/3] git commit: TAJO-261: Rearrange default port numbers and config
names. (hyunsik)
Posted by hy...@apache.org.
TAJO-261: Rearrange default port numbers and config names. (hyunsik)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/d5128328
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/d5128328
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/d5128328
Branch: refs/heads/master
Commit: d51283284688dfa25550cfe773dfc52391d6b266
Parents: 93b435d
Author: Hyunsik Choi <hy...@apache.org>
Authored: Mon Oct 21 15:37:50 2013 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Mon Oct 21 15:37:50 2013 +0900
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../apache/tajo/catalog/CatalogConstants.java | 1 -
.../src/main/resources/catalog-default.xml | 32 ----
.../org/apache/tajo/catalog/CatalogServer.java | 8 +-
.../src/test/resources/catalog-default.xml | 37 ----
.../java/org/apache/tajo/TajoConstants.java | 2 +-
.../java/org/apache/tajo/conf/TajoConf.java | 181 +++++++++++++------
.../java/org/apache/tajo/util/NetUtils.java | 102 +++++++++++
.../org/apache/tajo/benchmark/BenchmarkSet.java | 4 +-
.../java/org/apache/tajo/benchmark/TPCH.java | 7 +-
.../main/java/org/apache/tajo/cli/TajoCli.java | 138 +++++++++-----
.../java/org/apache/tajo/client/SQLStates.java | 33 ++++
.../java/org/apache/tajo/client/TajoClient.java | 25 ++-
.../java/org/apache/tajo/client/TajoDump.java | 16 +-
.../apache/tajo/engine/planner/LogicalPlan.java | 4 +-
.../engine/planner/PhysicalPlannerImpl.java | 8 +-
.../engine/planner/global/GlobalPlanner.java | 2 +-
.../planner/physical/ExternalSortExec.java | 2 +-
.../org/apache/tajo/master/GlobalEngine.java | 4 +-
.../java/org/apache/tajo/master/TajoMaster.java | 56 +++---
.../tajo/master/TajoMasterClientService.java | 61 +++++--
.../apache/tajo/master/TajoMasterService.java | 6 +-
.../apache/tajo/master/YarnContainerProxy.java | 4 +-
.../tajo/master/YarnTaskRunnerLauncherImpl.java | 2 +-
.../tajo/master/querymaster/QueryInfo.java | 4 +-
.../master/querymaster/QueryJobManager.java | 3 +-
.../tajo/master/querymaster/QueryMaster.java | 10 +-
.../master/querymaster/QueryMasterTask.java | 7 +-
.../master/querymaster/QueryUnitAttempt.java | 2 -
.../tajo/master/querymaster/Repartitioner.java | 2 +-
.../tajo/master/querymaster/SubQuery.java | 4 +-
.../master/rm/TajoWorkerResourceManager.java | 9 +-
.../apache/tajo/master/rm/WorkerResource.java | 18 +-
.../main/java/org/apache/tajo/util/JSPUtil.java | 9 +-
.../apache/tajo/webapp/StaticHttpServer.java | 9 +-
.../tajo/worker/TajoResourceAllocator.java | 4 +-
.../java/org/apache/tajo/worker/TajoWorker.java | 86 ++++-----
.../tajo/worker/TajoWorkerClientService.java | 1 -
.../tajo/worker/TajoWorkerManagerService.java | 8 +-
.../main/java/org/apache/tajo/worker/Task.java | 7 +-
.../java/org/apache/tajo/worker/TaskRunner.java | 9 +-
.../apache/tajo/worker/TaskRunnerManager.java | 8 +-
.../tajo/worker/YarnResourceAllocator.java | 2 +-
.../src/main/proto/ClientProtocol.proto | 171 ------------------
.../src/main/proto/ClientProtos.proto | 5 +-
.../main/proto/TajoMasterClientProtocol.proto | 2 +-
.../src/main/resources/catalog-default.xml | 32 ++++
.../src/main/resources/tajo-default.xml | 88 +--------
.../main/resources/webapps/admin/cluster.jsp | 4 +-
.../src/main/resources/webapps/admin/index.jsp | 43 +++--
.../src/main/resources/webapps/admin/query.jsp | 15 +-
.../org/apache/tajo/BackendTestingUtil.java | 30 ++-
.../apache/tajo/LocalTajoTestingUtility.java | 5 +-
.../org/apache/tajo/MiniTajoYarnCluster.java | 3 +-
.../org/apache/tajo/TajoTestingCluster.java | 49 +++--
.../org/apache/tajo/client/TestTajoClient.java | 129 ++++++-------
.../org/apache/tajo/storage/TestRowFile.java | 3 +-
.../src/test/resources/catalog-default.xml | 37 ----
.../src/test/resources/tajo-default.xml | 43 -----
.../tajo/pullserver/PullServerAuxService.java | 11 +-
.../tajo/pullserver/TajoPullServerService.java | 4 +-
.../tajo/storage/AbstractStorageManager.java | 12 +-
.../java/org/apache/tajo/storage/RawFile.java | 69 +++++--
.../tajo/storage/StorageManagerFactory.java | 39 ++--
.../tajo/storage/v2/DiskFileScanScheduler.java | 4 +-
.../java/org/apache/tajo/storage/v2/RCFile.java | 1 -
.../apache/tajo/storage/v2/RCFileScanner.java | 1 -
.../apache/tajo/storage/v2/ScanScheduler.java | 4 +-
.../tajo/storage/v2/StorageManagerV2.java | 2 +-
.../tajo/storage/v2/TestCSVCompression.java | 2 +-
.../apache/tajo/storage/v2/TestCSVScanner.java | 3 +-
.../apache/tajo/storage/v2/TestStorages.java | 3 +-
tajo-dist/src/main/bin/tajo | 50 +++--
tajo-dist/src/main/conf/tajo-env.sh | 15 +-
.../java/org/apache/tajo/util/NetUtils.java | 102 -----------
75 files changed, 882 insertions(+), 1038 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d5128328/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 7226328..3668ad2 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -56,6 +56,8 @@ Release 0.2.0 - unreleased
IMPROVEMENTS
+ TAJO-261: Rearrange default port numbers and config names. (hyunsik)
+
TAJO-236: Implement LogicalPlanVerifier to check if a logical plan is
valid. (hyunsik)
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d5128328/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogConstants.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogConstants.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogConstants.java
index 94736f7..d43efbf 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogConstants.java
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogConstants.java
@@ -21,7 +21,6 @@ package org.apache.tajo.catalog;
public class CatalogConstants {
public static final String STORE_CLASS="tajo.catalog.store.class";
- //public static final String JDBC_DRIVER = "tajo.catalog.jdbc.driver";
public static final String CONNECTION_ID = "tajo.catalog.jdbc.connection.id";
public static final String CONNECTION_PASSWORD = "tajo.catalog.jdbc.connection.password";
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d5128328/tajo-catalog/tajo-catalog-common/src/main/resources/catalog-default.xml
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/resources/catalog-default.xml b/tajo-catalog/tajo-catalog-common/src/main/resources/catalog-default.xml
deleted file mode 100644
index 0e9e109..0000000
--- a/tajo-catalog/tajo-catalog-common/src/main/resources/catalog-default.xml
+++ /dev/null
@@ -1,32 +0,0 @@
-<?xml version="1.0"?>
-<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
-
-<!--
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- -->
-
-<configuration>
- <property>
- <name>tajo.catalog.store.class</name>
- <value>org.apache.tajo.catalog.store.DerbyStore</value>
- </property>
-
- <property>
- <name>tajo.catalog.jdbc.uri</name>
- <value>jdbc:derby:/tmp/tcat-${user.name}/db;create=true</value>
- </property>
-</configuration>
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d5128328/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
index 70cdd28..672912a 100644
--- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
+++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
@@ -104,8 +104,7 @@ public class CatalogServer extends AbstractService {
Constructor<?> cons;
try {
- Class<?> storeClass =
- this.conf.getClass(CatalogConstants.STORE_CLASS, DerbyStore.class);
+ Class<?> storeClass = this.conf.getClass(CatalogConstants.STORE_CLASS, DerbyStore.class);
LOG.info("Catalog Store Class: " + storeClass.getCanonicalName());
cons = storeClass.
@@ -123,7 +122,8 @@ public class CatalogServer extends AbstractService {
}
public String getCatalogServerName() {
- return bindAddressStr + ", class=" + this.store.getClass().getSimpleName() + ", jdbc=" + conf.get(CatalogConstants.JDBC_URI);
+ return bindAddressStr + ", store=" + this.store.getClass().getSimpleName() + ", jdbc="
+ + conf.get(CatalogConstants.JDBC_URI);
}
private void initBuiltinFunctions(List<FunctionDesc> functions)
@@ -144,7 +144,7 @@ public class CatalogServer extends AbstractService {
this.bindAddress = NetUtils.getConnectAddress(this.rpcServer.getListenAddress());
this.bindAddressStr = NetUtils.normalizeInetSocketAddress(bindAddress);
- conf.set(ConfVars.CATALOG_ADDRESS.varname, bindAddressStr);
+ conf.setVar(ConfVars.CATALOG_ADDRESS, bindAddressStr);
} catch (Exception e) {
LOG.error("CatalogServer startup failed", e);
throw new CatalogException(e);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d5128328/tajo-catalog/tajo-catalog-server/src/test/resources/catalog-default.xml
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/test/resources/catalog-default.xml b/tajo-catalog/tajo-catalog-server/src/test/resources/catalog-default.xml
deleted file mode 100644
index a6b2183..0000000
--- a/tajo-catalog/tajo-catalog-server/src/test/resources/catalog-default.xml
+++ /dev/null
@@ -1,37 +0,0 @@
-<?xml version="1.0"?>
-<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
-
-<!--
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- -->
-
-<configuration>
- <property>
- <name>tajo.catalog.master.addr</name>
- <value>0.0.0.0:0</value>
- </property>
-
- <property>
- <name>tajo.catalog.store.class</name>
- <value>org.apache.tajo.catalog.store.MemStore</value>
- </property>
-
- <property>
- <name>tajo.catalog.jdbc.uri</name>
- <value>jdbc:derby:target/test-data/tcat/db;create=true</value>
- </property>
-</configuration>
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d5128328/tajo-common/src/main/java/org/apache/tajo/TajoConstants.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/TajoConstants.java b/tajo-common/src/main/java/org/apache/tajo/TajoConstants.java
index c84b8de..5bce0ae 100644
--- a/tajo-common/src/main/java/org/apache/tajo/TajoConstants.java
+++ b/tajo-common/src/main/java/org/apache/tajo/TajoConstants.java
@@ -19,11 +19,11 @@
package org.apache.tajo;
public class TajoConstants {
+ public static String TAJO_VERSION = "0.2.0-SNAPSHOT";
public static String SYSTEM_CONF_FILENAME = "system_conf.xml";
public static String SYSTEM_DIR_NAME = "system";
public static String WAREHOUSE_DIR_NAME = "warehouse";
- public static String STAGING_DIR_NAME = "staging";
public static String SYSTEM_RESOURCE_DIR_NAME = "resource";
public static String RESULT_DIR_NAME="RESULT";
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d5128328/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
index 8c2585e..475877b 100644
--- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
@@ -21,11 +21,14 @@ package org.apache.tajo.conf;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.tajo.TajoConstants;
+import org.apache.tajo.util.NetUtils;
import java.io.PrintStream;
+import java.net.InetSocketAddress;
import java.util.Map;
public class TajoConf extends YarnConfiguration {
@@ -57,41 +60,70 @@ public class TajoConf extends YarnConfiguration {
}
public static enum ConfVars {
+
//////////////////////////////////
// Tajo System Configuration
//////////////////////////////////
// a username for a running Tajo cluster
- TAJO_USERNAME("tajo.cluster.username", "tajo"),
+ ROOT_DIR("tajo.rootdir", "file:///tmp/tajo-${user.name}/"),
+ USERNAME("tajo.username", "${user.name}"),
// Configurable System Directories
- ROOT_DIR("tajo.rootdir", "/tajo"),
- WAREHOUSE_DIR("tajo.wahrehouse-dir", EMPTY_VALUE),
- STAGING_ROOT_DIR("tajo.staging.root.dir", ""),
- TASK_LOCAL_DIR("tajo.task.localdir", ""), // local directory for temporal files
- SYSTEM_CONF_PATH("tajo.system.conf.path", ""),
- SYSTEM_CONF_REPLICA_COUNT("tajo.system.conf.replica.count", 20),
-
- // Service Addresses
- TASKRUNNER_LISTENER_ADDRESS("tajo.master.taskrunnerlistener.addr", "0.0.0.0:0"),
- CLIENT_SERVICE_ADDRESS("tajo.master.clientservice.addr", "127.0.0.1:9004"),
- TAJO_MASTER_SERVICE_ADDRESS("tajo.master.manager.addr", "0.0.0.0:9005"),
+ WAREHOUSE_DIR("tajo.warehouse.directory", EMPTY_VALUE),
+ STAGING_ROOT_DIR("tajo.staging.directory", "/tmp/tajo-${user.name}/staging"),
+
+ SYSTEM_CONF_PATH("tajo.system-conf.path", EMPTY_VALUE),
+ SYSTEM_CONF_REPLICA_COUNT("tajo.system-conf.replica-count", 20),
+
+ // Tajo Master Service Addresses
+ TAJO_MASTER_UMBILICAL_RPC_ADDRESS("tajo.master.umbilical-rpc.address", "localhost:26001"),
+ TAJO_MASTER_CLIENT_RPC_ADDRESS("tajo.master.client-rpc.address", "localhost:26002"),
+ TAJO_MASTER_INFO_ADDRESS("tajo.master.info-http.address", "0.0.0.0:26080"),
+
+ // Tajo Worker Service Addresses
+ WORKER_INFO_ADDRESS("tajo.worker.info-http.address", "0.0.0.0:28080"),
+ WORKER_PEER_RPC_ADDRESS("tajo.worker.peer-rpc.address", "0.0.0.0:28091"),
+ WORKER_CLIENT_RPC_ADDRESS("tajo.worker.client-rpc.address", "0.0.0.0:28092"),
+
+ // Tajo Worker Temporal Directories
+ WORKER_TEMPORAL_DIR("tajo.worker.tmpdir.locations", "/tmp/tajo-${user.name}/tmpdir"),
+
+ // Tajo Worker Resources
+ WORKER_RESOURCE_AVAILABLE_CPU_CORES("tajo.worker.resource.cpu-cores", 1),
+ WORKER_RESOURCE_AVAILABLE_MEMORY_MB("tajo.worker.resource.memory-mb", 1024),
+ WORKER_RESOURCE_AVAILABLE_DISKS("tajo.worker.resource.disks", 1),
+ WORKER_EXECUTION_MAX_SLOTS("tajo.worker.parallel-execution.max-num", 2),
+
+ // Tajo Worker Dedicated Resources
+ WORKER_RESOURCE_DEDICATED("tajo.worker.resource.dedicated", false),
+ WORKER_RESOURCE_DEDICATED_MEMORY_RATIO("tajo.worker.resource.dedicated-memory-ratio", 0.8f),
+
+ // Tajo Worker History
+ WORKER_HISTORY_EXPIRE_PERIOD("tajo.worker.history.expire-interval-minutes", 12 * 60), // 12 hours
// Resource Manager
- RESOURCE_MANAGER_CLASS("tajo.resource.manager", "org.apache.tajo.master.rm.YarnTajoResourceManager"),
+ RESOURCE_MANAGER_CLASS("tajo.resource.manager", "org.apache.tajo.master.rm.TajoWorkerResourceManager"),
- // Catalog Address
- CATALOG_ADDRESS("tajo.catalog.master.addr", "0.0.0.0:9002"),
+ // Catalog
+ CATALOG_ADDRESS("tajo.catalog.client-rpc.address", "localhost:26005"),
//////////////////////////////////
- // Worker
+ // for Yarn Resource Manager
//////////////////////////////////
/** how many launching TaskRunners in parallel */
- AM_TASKRUNNER_LAUNCH_PARALLEL_NUM("tajo.master.taskrunnerlauncher.parallel.num", 16),
- MAX_WORKER_PER_NODE("tajo.query.max-workernum.per.node", 8),
+ YARN_RM_QUERY_MASTER_MEMORY_MB("tajo.querymaster.memory-mb", 512),
+ YARN_RM_QUERY_MASTER_DISKS("tajo.yarn-rm.querymaster.disks", 1),
+ YARN_RM_TASKRUNNER_LAUNCH_PARALLEL_NUM("tajo.yarn-rm.parallel-task-runner-launcher-num", 16),
+ YARN_RM_WORKER_NUMBER_PER_NODE("tajo.yarn-rm.max-worker-num-per-node", 8),
+
+ //////////////////////////////////
+ // Query Configuration
+ //////////////////////////////////
+ QUERY_SESSION_TIMEOUT("tajo.query.session.timeout-sec", 60),
//////////////////////////////////
- // Pull Server
+ // Shuffle Configuration
//////////////////////////////////
PULLSERVER_PORT("tajo.pullserver.port", 0),
SHUFFLE_SSL_ENABLED_KEY("tajo.pullserver.ssl.enabled", false),
@@ -103,34 +135,43 @@ public class TajoConf extends YarnConfiguration {
// for RCFile
HIVEUSEEXPLICITRCFILEHEADER("tajo.exec.rcfile.use.explicit.header", true),
-
- //////////////////////////////////
- // Physical Executors
- //////////////////////////////////
- EXTENAL_SORT_BUFFER_NUM("tajo.sort.external.buffer", 1000000),
- BROADCAST_JOIN_THRESHOLD("tajo.join.broadcast.threshold", (long)5 * 1048576),
- INMEMORY_HASH_TABLE_DEFAULT_SIZE("tajo.join.inmemory.table.num", (long)1000000),
- INMEMORY_INNER_HASH_JOIN_THRESHOLD("tajo.join.inner.memhash.threshold", (long)256 * 1048576),
- INMEMORY_HASH_AGGREGATION_THRESHOLD("tajo.aggregation.hash.threshold", (long)256 * 1048576),
- INMEMORY_OUTER_HASH_AGGREGATION_THRESHOLD("tajo.join.outer.memhash.threshold", (long)256 * 1048576),
+ // for Storage Manager v2
+ STORAGE_MANAGER_VERSION_2("tajo.storage-manager.v2", false),
+ STORAGE_MANAGER_DISK_SCHEDULER_MAX_READ_BYTES_PER_SLOT("tajo.storage-manager.max-read-bytes", 8 * 1024 * 1024),
+ STORAGE_MANAGER_DISK_SCHEDULER_REPORT_INTERVAL("tajo.storage-manager.disk-scheduler.report-interval", 60 * 1000),
+ STORAGE_MANAGER_CONCURRENCY_PER_DISK("tajo.storage-manager.disk-scheduler.per-disk-concurrency", 2),
//////////////////////////////////////////
// Distributed Query Execution Parameters
//////////////////////////////////////////
- JOIN_TASK_VOLUME("tajo.join.task-volume.mb", 128),
- SORT_TASK_VOLUME("tajo.sort.task-volume.mb", 128),
- AGGREGATION_TASK_VOLUME("tajo.task-aggregation.volume.mb", 128),
+ DIST_QUERY_BROADCAST_JOIN_THRESHOLD("tajo.dist-query.join.broadcast.threshold-bytes", (long)5 * 1048576),
+
+ DIST_QUERY_JOIN_TASK_VOLUME("tajo.dist-query.join.task-volume-mb", 128),
+ DIST_QUERY_SORT_TASK_VOLUME("tajo.dist-query.sort.task-volume-mb", 128),
+ DIST_QUERY_GROUPBY_TASK_VOLUME("tajo.dist-query.groupby.task-volume-mb", 128),
- JOIN_PARTITION_VOLUME("tajo.join.part-volume.mb", 128),
- SORT_PARTITION_VOLUME("tajo.sort.part-volume.mb", 256),
- AGGREGATION_PARTITION_VOLUME("tajo.aggregation.part-volume.mb", 256),
+ DIST_QUERY_JOIN_PARTITION_VOLUME("tajo.dist-query.join.partition-volume-mb", 128),
+ DIST_QUERY_SORT_PARTITION_VOLUME("tajo.dist-query.sort.partition-volume-mb", 256),
+ DIST_QUERY_GROUPBY_PARTITION_VOLUME("tajo.dist-query.groupby.partition-volume-mb", 256),
+
+ //////////////////////////////////
+ // Physical Executors
+ //////////////////////////////////
+ EXECUTOR_SORT_EXTENAL_BUFFER_SIZE("tajo.executor.sort.external-buffer-num", 1000000),
+ EXECUTOR_INNER_JOIN_INMEMORY_HASH_TABLE_SIZE("tajo.executor.join.inner.in-memory-table-num", (long)1000000),
+ EXECUTOR_INNER_JOIN_INMEMORY_HASH_THRESHOLD("tajo.executor.join.inner.in-memory-hash-threshold-bytes",
+ (long)256 * 1048576),
+ EXECUTOR_OUTER_JOIN_INMEMORY_HASH_THRESHOLD("tajo.eecutor.join.outer.in-memory-hash-threshold-bytes",
+ (long)256 * 1048576),
+ EXECUTOR_GROUPBY_INMEMORY_HASH_THRESHOLD("tajo.executor.groupby.in-memory-hash-threshold-bytes",
+ (long)256 * 1048576),
//////////////////////////////////
// The Below is reserved
//////////////////////////////////
// GeoIP
- GEOIP_DATA("tajo.geoip.data", "/usr/local/share/GeoIP/GeoIP.dat"),
+ GEOIP_DATA("tajo.geoip.data", ""),
//////////////////////////////////
// Hive Configuration
@@ -205,13 +246,10 @@ public class TajoConf extends YarnConfiguration {
enum VarType {
STRING { void checkType(String value) throws Exception { } },
- INT { void checkType(String value) throws Exception { Integer
- .valueOf(value); } },
+ INT { void checkType(String value) throws Exception { Integer.valueOf(value); } },
LONG { void checkType(String value) throws Exception { Long.valueOf(value); } },
- FLOAT { void checkType(String value) throws Exception { Float
- .valueOf(value); } },
- BOOLEAN { void checkType(String value) throws Exception { Boolean
- .valueOf(value); } };
+ FLOAT { void checkType(String value) throws Exception { Float.valueOf(value); } },
+ BOOLEAN { void checkType(String value) throws Exception { Boolean.valueOf(value); } };
boolean isType(String value) {
try { checkType(value); } catch (Exception e) { return false; }
@@ -338,22 +376,27 @@ public class TajoConf extends YarnConfiguration {
}
}
+ public InetSocketAddress getSocketAddrVar(ConfVars var) {
+ final String address = getVar(var);
+ return NetUtils.createSocketAddr(address);
+ }
+
/////////////////////////////////////////////////////////////////////////////
// Tajo System Specific Methods
/////////////////////////////////////////////////////////////////////////////
- public static Path getTajoRootPath(TajoConf conf) {
+ public static Path getTajoRootDir(TajoConf conf) {
String rootPath = conf.getVar(ConfVars.ROOT_DIR);
Preconditions.checkNotNull(rootPath,
ConfVars.ROOT_DIR.varname + " must be set before a Tajo Cluster starts up");
return new Path(rootPath);
}
- public static Path getWarehousePath(TajoConf conf) {
+ public static Path getWarehouseDir(TajoConf conf) {
String warehousePath = conf.getVar(ConfVars.WAREHOUSE_DIR);
if (warehousePath == null || warehousePath.equals("")) {
- Path rootDir = getTajoRootPath(conf);
- warehousePath = new Path(rootDir, TajoConstants.WAREHOUSE_DIR_NAME).toString();
+ Path rootDir = getTajoRootDir(conf);
+ warehousePath = new Path(rootDir, TajoConstants.WAREHOUSE_DIR_NAME).toUri().toString();
conf.setVar(ConfVars.WAREHOUSE_DIR, warehousePath);
return new Path(warehousePath);
} else {
@@ -361,25 +404,45 @@ public class TajoConf extends YarnConfiguration {
}
}
- public static Path getSystemPath(TajoConf conf) {
- Path rootPath = getTajoRootPath(conf);
+ public static Path getSystemDir(TajoConf conf) {
+ Path rootPath = getTajoRootDir(conf);
return new Path(rootPath, TajoConstants.SYSTEM_DIR_NAME);
}
- public static Path getSystemResourcePath(TajoConf conf) {
- return new Path(getSystemPath(conf), TajoConstants.SYSTEM_RESOURCE_DIR_NAME);
+ public static Path getSystemResourceDir(TajoConf conf) {
+ return new Path(getSystemDir(conf), TajoConstants.SYSTEM_RESOURCE_DIR_NAME);
}
- public static Path getStagingRoot(TajoConf conf) {
- String stagingRootDir = conf.getVar(ConfVars.STAGING_ROOT_DIR);
- Preconditions.checkState(stagingRootDir != null && !stagingRootDir.equals(""),
- TajoConstants.STAGING_DIR_NAME + " must be set before starting a Tajo Cluster starts up");
- return new Path(stagingRootDir);
+ private static boolean hasScheme(String path) {
+ return path.indexOf("file:/") == 0 || path.indexOf("hdfs:/") == 0;
}
- public static Path getSystemConf(TajoConf conf) {
- String stagingRootDir = conf.getVar(ConfVars.SYSTEM_CONF_PATH);
- Preconditions.checkNotNull(stagingRootDir, ConfVars.SYSTEM_CONF_PATH.varname + " is not set.");
- return new Path(stagingRootDir);
+ public static Path getStagingDir(TajoConf conf) {
+ String stagingDirString = conf.getVar(ConfVars.STAGING_ROOT_DIR);
+ if (!hasScheme(stagingDirString)) {
+ Path warehousePath = getWarehouseDir(conf);
+ FileSystem fs;
+ try {
+ fs = warehousePath.getFileSystem(conf);
+ } catch (Throwable e) {
+ throw null;
+ }
+ Path path = new Path(fs.getUri().toString(), stagingDirString);
+ conf.setVar(ConfVars.STAGING_ROOT_DIR, path.toString());
+ return path;
+ }
+ return new Path(stagingDirString);
+ }
+
+ public static Path getSystemConfPath(TajoConf conf) {
+ String systemConfPathStr = conf.getVar(ConfVars.SYSTEM_CONF_PATH);
+ if (systemConfPathStr == null || systemConfPathStr.equals("")) {
+ Path systemResourcePath = getSystemResourceDir(conf);
+ Path systemConfPath = new Path(systemResourcePath, TajoConstants.SYSTEM_CONF_FILENAME);
+ conf.setVar(ConfVars.SYSTEM_CONF_PATH, systemConfPath.toString());
+ return systemConfPath;
+ } else {
+ return new Path(systemConfPathStr);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d5128328/tajo-common/src/main/java/org/apache/tajo/util/NetUtils.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/util/NetUtils.java b/tajo-common/src/main/java/org/apache/tajo/util/NetUtils.java
new file mode 100644
index 0000000..6a43be3
--- /dev/null
+++ b/tajo-common/src/main/java/org/apache/tajo/util/NetUtils.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.util;
+
+import java.net.*;
+
+public class NetUtils {
+ public static String normalizeInetSocketAddress(InetSocketAddress addr) {
+ return addr.getAddress().getHostAddress() + ":" + addr.getPort();
+ }
+
+ public static InetSocketAddress createSocketAddr(String addr) {
+ String [] splitted = addr.split(":");
+ return new InetSocketAddress(splitted[0], Integer.parseInt(splitted[1]));
+ }
+
+ /**
+ * Util method to build socket addr from either:
+ * <host>
+ * <host>:<port>
+ * <fs>://<host>:<port>/<path>
+ */
+ public static InetSocketAddress createSocketAddr(String host, int port) {
+ return new InetSocketAddress(host, port);
+ }
+
+ public static InetSocketAddress createUnresolved(String addr) {
+ String [] splitted = addr.split(":");
+ return InetSocketAddress.createUnresolved(splitted[0], Integer.parseInt(splitted[1]));
+ }
+
+ /**
+ * Returns InetSocketAddress that a client can use to
+ * connect to the server. NettyServerBase.getListenerAddress() is not correct when
+ * the server binds to "0.0.0.0". This returns "hostname:port" of the server,
+ * or "127.0.0.1:port" when the getListenerAddress() returns "0.0.0.0:port".
+ *
+ * @param addr of a listener
+ * @return socket address that a client can use to connect to the server.
+ */
+ public static InetSocketAddress getConnectAddress(InetSocketAddress addr) {
+ if (!addr.isUnresolved() && addr.getAddress().isAnyLocalAddress()) {
+ try {
+ addr = new InetSocketAddress(InetAddress.getLocalHost(), addr.getPort());
+ } catch (UnknownHostException uhe) {
+ // shouldn't get here unless the host doesn't have a loopback iface
+ addr = new InetSocketAddress("127.0.0.1", addr.getPort());
+ }
+ }
+ return addr;
+ }
+
+ /**
+ * Given an InetAddress, checks to see if the address is a local address, by
+ * comparing the address with all the interfaces on the node.
+ * @param addr address to check if it is local node's address
+ * @return true if the address corresponds to the local node
+ */
+ public static boolean isLocalAddress(InetAddress addr) {
+ // Check if the address is any local or loop back
+ boolean local = addr.isAnyLocalAddress() || addr.isLoopbackAddress();
+
+ // Check if the address is defined on any interface
+ if (!local) {
+ try {
+ local = NetworkInterface.getByInetAddress(addr) != null;
+ } catch (SocketException e) {
+ local = false;
+ }
+ }
+ return local;
+ }
+
+ public static String normalizeHost(String host) {
+ try {
+ InetAddress address = InetAddress.getByName(host);
+ if (isLocalAddress(address)) {
+ return InetAddress.getLocalHost().getHostAddress();
+ } else {
+ return address.getHostAddress();
+ }
+ } catch (UnknownHostException e) {
+ }
+ return host;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d5128328/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/benchmark/BenchmarkSet.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/benchmark/BenchmarkSet.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/benchmark/BenchmarkSet.java
index af3d002..929dae8 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/benchmark/BenchmarkSet.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/benchmark/BenchmarkSet.java
@@ -42,9 +42,9 @@ public abstract class BenchmarkSet {
public void init(TajoConf conf, String dataDir) throws IOException {
this.dataDir = dataDir;
- if (System.getProperty(ConfVars.TASKRUNNER_LISTENER_ADDRESS.varname) != null) {
+ if (System.getProperty(ConfVars.WORKER_PEER_RPC_ADDRESS.varname) != null) {
tajo = new TajoClient(NetUtils.createSocketAddr(
- System.getProperty(ConfVars.TASKRUNNER_LISTENER_ADDRESS.varname)));
+ System.getProperty(ConfVars.WORKER_PEER_RPC_ADDRESS.varname)));
} else {
conf.set(CatalogConstants.STORE_CLASS, MemStore.class.getCanonicalName());
tajo = new TajoClient(conf);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d5128328/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/benchmark/TPCH.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/benchmark/TPCH.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/benchmark/TPCH.java
index 79915bb..7b33b09 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/benchmark/TPCH.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/benchmark/TPCH.java
@@ -30,6 +30,7 @@ import org.apache.tajo.common.TajoDataTypes.Type;
import org.apache.tajo.storage.CSVFile;
import java.io.IOException;
+import java.sql.SQLException;
public class TPCH extends BenchmarkSet {
private final Log LOG = LogFactory.getLog(TPCH.class);
@@ -166,6 +167,10 @@ public class TPCH extends BenchmarkSet {
private void loadTable(String tableName) throws ServiceException {
TableMeta meta = CatalogUtil.newTableMeta(getSchema(tableName), StoreType.CSV);
meta.putOption(CSVFile.DELIMITER, "|");
- tajo.createTable(tableName, new Path(dataDir, tableName), meta);
+ try {
+ tajo.createExternalTable(tableName, new Path(dataDir, tableName), meta);
+ } catch (SQLException s) {
+ throw new ServiceException(s);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d5128328/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/cli/TajoCli.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/cli/TajoCli.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/cli/TajoCli.java
index 8980eb7..984f6e4 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/cli/TajoCli.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/cli/TajoCli.java
@@ -18,7 +18,6 @@
package org.apache.tajo.cli;
-import com.google.protobuf.ServiceException;
import jline.console.ConsoleReader;
import jline.console.history.FileHistory;
import jline.console.history.PersistentHistory;
@@ -26,6 +25,7 @@ import org.apache.commons.cli.*;
import org.apache.commons.lang.StringUtils;
import org.apache.tajo.QueryId;
import org.apache.tajo.QueryIdFactory;
+import org.apache.tajo.TajoConstants;
import org.apache.tajo.TajoProtos.QueryState;
import org.apache.tajo.catalog.Column;
import org.apache.tajo.catalog.TableDesc;
@@ -64,9 +64,10 @@ public class TajoCli {
private static final Class [] registeredCommands = {
DescTableCommand.class,
HelpCommand.class,
- AttachCommand.class,
DetachCommand.class,
- ExitCommand.class
+ ExitCommand.class,
+ Copyright.class,
+ Version.class
};
private static final String HOME_DIR = System.getProperty("user.home");
@@ -101,17 +102,17 @@ public class TajoCli {
// if there is no "-h" option,
if(hostName == null) {
- if (conf.getVar(ConfVars.CLIENT_SERVICE_ADDRESS) != null) {
+ if (conf.getVar(ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS) != null) {
// it checks if the client service address is given in configuration and distributed mode.
// if so, it sets entryAddr.
- hostName = conf.getVar(ConfVars.CLIENT_SERVICE_ADDRESS).split(":")[0];
+ hostName = conf.getVar(ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS).split(":")[0];
}
}
if (port == null) {
- if (conf.getVar(ConfVars.CLIENT_SERVICE_ADDRESS) != null) {
+ if (conf.getVar(ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS) != null) {
// it checks if the client service address is given in configuration and distributed mode.
// if so, it sets entryAddr.
- port = Integer.parseInt(conf.getVar(ConfVars.CLIENT_SERVICE_ADDRESS).split(":")[1]);
+ port = Integer.parseInt(conf.getVar(ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS).split(":")[1]);
}
}
@@ -119,7 +120,7 @@ public class TajoCli {
System.err.println("ERROR: cannot find valid Tajo server address");
System.exit(-1);
} else if (hostName != null && port != null) {
- conf.setVar(ConfVars.CLIENT_SERVICE_ADDRESS, hostName+":"+port);
+ conf.setVar(ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS, hostName+":"+port);
client = new TajoClient(conf);
} else if (hostName == null && port == null) {
client = new TajoClient(conf);
@@ -254,12 +255,10 @@ public class TajoCli {
private void invokeCommand(String [] cmds) {
// this command should be moved to GlobalEngine
- Command invoked = null;
+ Command invoked;
try {
invoked = commands.get(cmds[0]);
invoked.invoke(cmds);
- } catch (IllegalArgumentException iae) {
- sout.println("usage: " + invoked.getCommand() + " " + invoked.getUsage());
} catch (Throwable t) {
sout.println(t.getMessage());
}
@@ -284,7 +283,7 @@ public class TajoCli {
sout.flush();
((PersistentHistory)this.reader.getHistory()).flush();
System.exit(0);
- } else if (cmds[0].equalsIgnoreCase("attach") || cmds[0].equalsIgnoreCase("detach")) {
+ } else if (cmds[0].equalsIgnoreCase("detach") && cmds.length > 1 && cmds[1].equalsIgnoreCase("table")) {
// this command should be moved to GlobalEngine
invokeCommand(cmds);
@@ -496,7 +495,7 @@ public class TajoCli {
@Override
public String getUsage() {
- return "[TB_NAME]";
+ return "[table_name]";
}
@Override
@@ -514,13 +513,28 @@ public class TajoCli {
@Override
public void invoke(String[] cmd) throws Exception {
- for (Map.Entry<String,Command> entry : commands.entrySet()) {
- sout.print(entry.getKey());
- sout.print(" ");
- sout.print(entry.getValue().getUsage());
- sout.print("\t");
- sout.println(entry.getValue().getDescription());
- }
+ sout.println();
+
+ sout.println("General");
+ sout.println(" \\copyright show Apache License 2.0");
+ sout.println(" \\version show Tajo version");
+ sout.println(" \\? show help");
+ sout.println(" \\q quit tsql");
+ sout.println();
+ sout.println();
+
+ sout.println("Informational");
+ sout.println(" \\d list tables");
+ sout.println(" \\d NAME describe table");
+ sout.println();
+ sout.println();
+
+ sout.println("Documentations");
+ sout.println(" tsql guide http://wiki.apache.org/tajo/tsql");
+ sout.println(" Query language http://wiki.apache.org/tajo/QueryLanguage");
+ sout.println(" Functions http://wiki.apache.org/tajo/Functions");
+ sout.println(" Backup & restore http://wiki.apache.org/tajo/BackupAndRestore");
+ sout.println(" Configuration http://wiki.apache.org/tajo/Configuration");
sout.println();
}
@@ -536,65 +550,97 @@ public class TajoCli {
}
// TODO - This should be dealt as a DDL statement instead of a command
- public class AttachCommand extends Command {
+ public class DetachCommand extends Command {
@Override
public String getCommand() {
- return "attach";
+ return "detach";
}
@Override
public void invoke(String[] cmd) throws Exception {
if (cmd.length != 3) {
- throw new IllegalArgumentException();
- }
- if (!client.existTable(cmd[1])) {
- client.attachTable(cmd[1], cmd[2]);
- sout.println("attached " + cmd[1] + " (" + cmd[2] + ")");
+ throw new IllegalArgumentException("usage: detach table [tb_name]");
} else {
- sout.println("ERROR: relation \"" + cmd[1] + "\" already exists");
+ if (client.existTable(cmd[2])) {
+ client.detachTable(cmd[2]);
+ sout.println("Table \"" + cmd[2] + "\" is detached.");
+ } else {
+ sout.println("ERROR: table \"" + cmd[1] + "\" does not exist");
+ }
}
}
@Override
public String getUsage() {
- return "TB_NAME PATH";
+ return "table [table_name]";
}
@Override
public String getDescription() {
- return "attach a existing table as a given table name";
+ return "detach a table, but it does not remove the table directory.";
}
}
- // TODO - This should be dealt as a DDL statement instead of a command
- public class DetachCommand extends Command {
+ public class Version extends Command {
+
@Override
public String getCommand() {
- return "detach";
+ return "\\version";
}
@Override
public void invoke(String[] cmd) throws Exception {
- if (cmd.length != 2) {
- throw new IllegalArgumentException();
- } else {
- if (client.existTable(cmd[1])) {
- client.detachTable(cmd[1]);
- sout.println("detached " + cmd[1] + " from tajo");
- } else {
- sout.println("ERROR: table \"" + cmd[1] + "\" does not exist");
- }
- }
+ sout.println(TajoConstants.TAJO_VERSION);
}
@Override
public String getUsage() {
- return "TB_NAME";
+ return "";
}
@Override
public String getDescription() {
- return "detach a table, but it does not remove the table directory.";
+ return "show Apache License 2.0";
+ }
+ }
+
+ public class Copyright extends Command {
+
+ @Override
+ public String getCommand() {
+ return "\\copyright";
+ }
+
+ @Override
+ public void invoke(String[] cmd) throws Exception {
+ sout.println();
+ sout.println(
+ " Licensed to the Apache Software Foundation (ASF) under one\n" +
+ " or more contributor license agreements. See the NOTICE file\n" +
+ " distributed with this work for additional information\n" +
+ " regarding copyright ownership. The ASF licenses this file\n" +
+ " to you under the Apache License, Version 2.0 (the\n" +
+ " \"License\"); you may not use this file except in compliance\n" +
+ " with the License. You may obtain a copy of the License at\n" +
+ "\n" +
+ " http://www.apache.org/licenses/LICENSE-2.0\n" +
+ "\n" +
+ " Unless required by applicable law or agreed to in writing, software\n" +
+ " distributed under the License is distributed on an \"AS IS\" BASIS,\n" +
+ " WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n" +
+ " See the License for the specific language governing permissions and\n" +
+ " limitations under the License.");
+ sout.println();
+ }
+
+ @Override
+ public String getUsage() {
+ return "";
+ }
+
+ @Override
+ public String getDescription() {
+ return "show Apache License 2.0";
}
}
@@ -637,7 +683,7 @@ public class TajoCli {
try {
invoked.invoke(arguments);
} catch (IllegalArgumentException ige) {
- sout.println("usage: " + invoked.getCommand() + " " + invoked.getUsage());
+ sout.println(ige.getMessage());
} catch (Exception e) {
sout.println(e.getMessage());
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d5128328/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/client/SQLStates.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/client/SQLStates.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/client/SQLStates.java
new file mode 100644
index 0000000..888170b
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/client/SQLStates.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.client;
+
+public enum SQLStates {
+ ER_NO_SUCH_TABLE("42S02");
+
+ private String state;
+
+ SQLStates(String state) {
+ this.state = state;
+ }
+
+ public String getState() {
+ return state;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d5128328/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/client/TajoClient.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/client/TajoClient.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/client/TajoClient.java
index 19fa618..d94c99f 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/client/TajoClient.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/client/TajoClient.java
@@ -43,6 +43,7 @@ import org.apache.tajo.util.NetUtils;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.sql.ResultSet;
+import java.sql.SQLException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -64,7 +65,7 @@ public class TajoClient {
public TajoClient(TajoConf conf) throws IOException {
this.conf = conf;
this.conf.set("tajo.disk.scheduler.report.interval", "0");
- String masterAddr = this.conf.getVar(ConfVars.CLIENT_SERVICE_ADDRESS);
+ String masterAddr = this.conf.getVar(ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS);
InetSocketAddress addr = NetUtils.createSocketAddr(masterAddr);
connect(addr);
}
@@ -299,14 +300,18 @@ public class TajoClient {
return tajoMasterService.detachTable(null, builder.build()).getValue();
}
- public TableDesc createTable(String name, Path path, TableMeta meta)
- throws ServiceException {
+ public TableDesc createExternalTable(String name, Path path, TableMeta meta)
+ throws SQLException, ServiceException {
CreateTableRequest.Builder builder = CreateTableRequest.newBuilder();
builder.setName(name);
- builder.setPath(path.toString());
+ builder.setPath(path.toUri().toString());
builder.setMeta(meta.getProto());
- TableResponse res = tajoMasterService.createTable(null, builder.build());
- return CatalogUtil.newTableDesc(res.getTableDesc());
+ TableResponse res = tajoMasterService.createExternalTable(null, builder.build());
+ if (res.getResultCode() == ResultCode.OK) {
+ return CatalogUtil.newTableDesc(res.getTableDesc());
+ } else {
+ throw new SQLException(res.getErrorMessage(), SQLStates.ER_NO_SUCH_TABLE.getState());
+ }
}
public boolean dropTable(String name) throws ServiceException {
@@ -325,14 +330,14 @@ public class TajoClient {
return res.getTablesList();
}
- public TableDesc getTableDesc(String tableName) throws ServiceException {
+ public TableDesc getTableDesc(String tableName) throws SQLException, ServiceException {
GetTableDescRequest.Builder build = GetTableDescRequest.newBuilder();
build.setTableName(tableName);
TableResponse res = tajoMasterService.getTableDesc(null, build.build());
- if (res == null) {
- return null;
- } else {
+ if (res.getResultCode() == ResultCode.OK) {
return CatalogUtil.newTableDesc(res.getTableDesc());
+ } else {
+ throw new SQLException(res.getErrorMessage(), SQLStates.ER_NO_SUCH_TABLE.getState());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d5128328/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/client/TajoDump.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/client/TajoDump.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/client/TajoDump.java
index 84d5646..486ff9f 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/client/TajoDump.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/client/TajoDump.java
@@ -21,16 +21,14 @@ package org.apache.tajo.client;
import com.google.common.collect.Lists;
import com.google.protobuf.ServiceException;
import org.apache.commons.cli.*;
-import org.apache.commons.cli.Options;
import org.apache.tajo.catalog.DDLBuilder;
import org.apache.tajo.catalog.TableDesc;
import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.engine.function.builtin.Date;
import java.io.IOException;
-import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.io.Writer;
+import java.sql.SQLException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Calendar;
@@ -51,7 +49,7 @@ public class TajoDump {
formatter.printHelp( "tajo_dump [options] [table_name]", options );
}
- public static void main(String [] args) throws ParseException, IOException, ServiceException {
+ public static void main(String [] args) throws ParseException, IOException, ServiceException, SQLException {
TajoConf conf = new TajoConf();
CommandLineParser parser = new PosixParser();
@@ -68,17 +66,17 @@ public class TajoDump {
// if there is no "-h" option,
if(hostName == null) {
- if (conf.getVar(TajoConf.ConfVars.CLIENT_SERVICE_ADDRESS) != null) {
+ if (conf.getVar(TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS) != null) {
// it checks if the client service address is given in configuration and distributed mode.
// if so, it sets entryAddr.
- hostName = conf.getVar(TajoConf.ConfVars.CLIENT_SERVICE_ADDRESS).split(":")[0];
+ hostName = conf.getVar(TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS).split(":")[0];
}
}
if (port == null) {
- if (conf.getVar(TajoConf.ConfVars.CLIENT_SERVICE_ADDRESS) != null) {
+ if (conf.getVar(TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS) != null) {
// it checks if the client service address is given in configuration and distributed mode.
// if so, it sets entryAddr.
- port = Integer.parseInt(conf.getVar(TajoConf.ConfVars.CLIENT_SERVICE_ADDRESS).split(":")[1]);
+ port = Integer.parseInt(conf.getVar(TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS).split(":")[1]);
}
}
@@ -87,7 +85,7 @@ public class TajoDump {
System.err.println("ERROR: cannot find valid Tajo server address");
System.exit(-1);
} else if (hostName != null && port != null) {
- conf.setVar(TajoConf.ConfVars.CLIENT_SERVICE_ADDRESS, hostName+":"+port);
+ conf.setVar(TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS, hostName+":"+port);
client = new TajoClient(conf);
} else if (hostName == null && port == null) {
client = new TajoClient(conf);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d5128328/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java
index df6081d..4a305ae 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java
@@ -84,7 +84,9 @@ public class LogicalPlan {
}
public String newNonameColumnName(String prefix) {
- return "?" + prefix + "_" + (noNameColumnId++);
+ String suffix = noNameColumnId == 0 ? "" : String.valueOf(noNameColumnId);
+ noNameColumnId++;
+ return "?" + prefix + suffix;
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d5128328/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
index a8aaa68..4a3940c 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
@@ -292,7 +292,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
long leftSize = estimateSizeRecursive(context, leftLineage);
long rightSize = estimateSizeRecursive(context, rightLineage);
- final long threshold = conf.getLongVar(TajoConf.ConfVars.INMEMORY_INNER_HASH_JOIN_THRESHOLD);
+ final long threshold = conf.getLongVar(TajoConf.ConfVars.EXECUTOR_INNER_JOIN_INMEMORY_HASH_THRESHOLD);
boolean hashJoin = false;
if (leftSize < threshold || rightSize < threshold) {
@@ -363,7 +363,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
String [] rightLineage = PlannerUtil.getLineage(plan.getRightChild());
long rightTableVolume = estimateSizeRecursive(context, rightLineage);
- if (rightTableVolume < conf.getLongVar(TajoConf.ConfVars.INMEMORY_OUTER_HASH_AGGREGATION_THRESHOLD)) {
+ if (rightTableVolume < conf.getLongVar(TajoConf.ConfVars.EXECUTOR_OUTER_JOIN_INMEMORY_HASH_THRESHOLD)) {
// we can implement left outer join using hash join, using the right operand as the build relation
LOG.info("Left Outer Join (" + plan.getPID() +") chooses [Hash Join].");
return new HashLeftOuterJoinExec(context, plan, leftExec, rightExec);
@@ -381,7 +381,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
// blocking, but merge join is blocking as well)
String [] outerLineage4 = PlannerUtil.getLineage(plan.getLeftChild());
long outerSize = estimateSizeRecursive(context, outerLineage4);
- if (outerSize < conf.getLongVar(TajoConf.ConfVars.INMEMORY_OUTER_HASH_AGGREGATION_THRESHOLD)){
+ if (outerSize < conf.getLongVar(TajoConf.ConfVars.EXECUTOR_OUTER_JOIN_INMEMORY_HASH_THRESHOLD)){
LOG.info("Right Outer Join (" + plan.getPID() +") chooses [Hash Join].");
return new HashLeftOuterJoinExec(context, plan, rightExec, leftExec);
} else {
@@ -701,7 +701,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
String [] outerLineage = PlannerUtil.getLineage(groupbyNode.getChild());
long estimatedSize = estimateSizeRecursive(context, outerLineage);
- final long threshold = conf.getLongVar(TajoConf.ConfVars.INMEMORY_HASH_AGGREGATION_THRESHOLD);
+ final long threshold = conf.getLongVar(TajoConf.ConfVars.EXECUTOR_GROUPBY_INMEMORY_HASH_THRESHOLD);
// if the relation size is less than the threshold,
// the hash aggregation will be used.
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d5128328/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
index e540570..37f0f9e 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
@@ -171,7 +171,7 @@ public class GlobalPlanner {
TableMeta leftMeta = leftScan.getTableDesc().getMeta();
TableMeta rightMeta = rightScan.getTableDesc().getMeta();
- long broadcastThreshold = conf.getLongVar(TajoConf.ConfVars.BROADCAST_JOIN_THRESHOLD);
+ long broadcastThreshold = conf.getLongVar(TajoConf.ConfVars.DIST_QUERY_BROADCAST_JOIN_THRESHOLD);
if (leftMeta.getStat().getNumBytes() < broadcastThreshold) {
leftBroadcasted = true;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d5128328/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
index 19fde88..12bd30b 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
@@ -50,7 +50,7 @@ public class ExternalSortExec extends SortExec {
super(context, plan.getInSchema(), plan.getOutSchema(), child, plan.getSortKeys());
this.plan = plan;
- this.MEM_TUPLE_NUM = context.getConf().getIntVar(ConfVars.EXTENAL_SORT_BUFFER_NUM);
+ this.MEM_TUPLE_NUM = context.getConf().getIntVar(ConfVars.EXECUTOR_SORT_EXTENAL_BUFFER_SIZE);
this.tupleSlots = new ArrayList<Tuple>(MEM_TUPLE_NUM);
this.sortTmpDir = new Path(context.getWorkDir(), UUID.randomUUID().toString());
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d5128328/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
index 9d6b4ff..c3eb5f7 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
@@ -241,7 +241,7 @@ public class GlobalEngine extends AbstractService {
}
if(!createTable.isExternal()){
- Path tablePath = new Path(sm.getTableBaseDir(), createTable.getTableName().toLowerCase());
+ Path tablePath = new Path(sm.getWarehouseDir(), createTable.getTableName().toLowerCase());
createTable.setPath(tablePath);
} else {
Preconditions.checkState(createTable.hasPath(), "ERROR: LOCATION must be given.");
@@ -353,7 +353,7 @@ public class GlobalEngine extends AbstractService {
StoreTableNode storeTableNode = plan.getRootBlock().getStoreTableNode();
String tableName = storeTableNode.getTableName();
queryContext.setOutputTable(tableName);
- queryContext.setOutputPath(new Path(TajoConf.getWarehousePath(context.getConf()), tableName));
+ queryContext.setOutputPath(new Path(TajoConf.getWarehouseDir(context.getConf()), tableName));
queryContext.setCreateTable();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d5128328/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMaster.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMaster.java
index 6fba7f6..f4118bb 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMaster.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMaster.java
@@ -22,6 +22,7 @@ import com.google.protobuf.ServiceException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -36,6 +37,7 @@ import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.service.CompositeService;
import org.apache.hadoop.yarn.service.Service;
import org.apache.hadoop.yarn.util.RackResolver;
+import org.apache.tajo.TajoConstants;
import org.apache.tajo.catalog.*;
import org.apache.tajo.catalog.proto.CatalogProtos.FunctionType;
import org.apache.tajo.common.TajoDataTypes.Type;
@@ -46,8 +48,8 @@ import org.apache.tajo.engine.function.InCountry;
import org.apache.tajo.engine.function.builtin.*;
import org.apache.tajo.engine.function.string.*;
import org.apache.tajo.master.querymaster.QueryJobManager;
+import org.apache.tajo.master.rm.TajoWorkerResourceManager;
import org.apache.tajo.master.rm.WorkerResourceManager;
-import org.apache.tajo.master.rm.YarnTajoResourceManager;
import org.apache.tajo.storage.AbstractStorageManager;
import org.apache.tajo.storage.StorageManagerFactory;
import org.apache.tajo.util.CommonTestingUtil;
@@ -60,6 +62,7 @@ import java.lang.management.ManagementFactory;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
import java.lang.reflect.Constructor;
+import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
@@ -72,21 +75,21 @@ public class TajoMaster extends CompositeService {
/** rw-r--r-- */
@SuppressWarnings("OctalInteger")
- final public static FsPermission TAJO_ROOT_DIR_PERMISSION = FsPermission.createImmutable((short) 0644);
+ final public static FsPermission TAJO_ROOT_DIR_PERMISSION = FsPermission.createImmutable((short) 0755);
/** rw-r--r-- */
@SuppressWarnings("OctalInteger")
- final public static FsPermission SYSTEM_DIR_PERMISSION = FsPermission.createImmutable((short) 0644);
+ final public static FsPermission SYSTEM_DIR_PERMISSION = FsPermission.createImmutable((short) 0755);
/** rw-r--r-- */
- final public static FsPermission SYSTEM_RESOURCE_DIR_PERMISSION = FsPermission.createImmutable((short) 0644);
+ final public static FsPermission SYSTEM_RESOURCE_DIR_PERMISSION = FsPermission.createImmutable((short) 0755);
/** rw-r--r-- */
@SuppressWarnings("OctalInteger")
- final public static FsPermission WAREHOUSE_DIR_PERMISSION = FsPermission.createImmutable((short) 0644);
+ final public static FsPermission WAREHOUSE_DIR_PERMISSION = FsPermission.createImmutable((short) 0755);
/** rw-r--r-- */
@SuppressWarnings("OctalInteger")
- final public static FsPermission STAGING_ROOTDIR_PERMISSION = FsPermission.createImmutable((short) 0644);
+ final public static FsPermission STAGING_ROOTDIR_PERMISSION = FsPermission.createImmutable((short) 0755);
/** rw-r--r-- */
@SuppressWarnings("OctalInteger")
- final public static FsPermission SYSTEM_CONF_FILE_PERMISSION = FsPermission.createImmutable((short) 0644);
+ final public static FsPermission SYSTEM_CONF_FILE_PERMISSION = FsPermission.createImmutable((short) 0755);
private MasterContext context;
@@ -118,11 +121,11 @@ public class TajoMaster extends CompositeService {
}
public String getMasterName() {
- return tajoMasterService.getBindAddress().getHostName() + ":" + tajoMasterService.getBindAddress().getPort();
+ return NetUtils.normalizeInetSocketAddress(tajoMasterService.getBindAddress());
}
public String getVersion() {
- return "0.2.0";
+ return TajoConstants.TAJO_VERSION;
}
public TajoMasterClientService getTajoMasterClientService() {
@@ -176,7 +179,7 @@ public class TajoMaster extends CompositeService {
private void initResourceManager() throws Exception {
Class<WorkerResourceManager> resourceManagerClass = (Class<WorkerResourceManager>)
- systemConf.getClass(ConfVars.RESOURCE_MANAGER_CLASS.varname, YarnTajoResourceManager.class);
+ systemConf.getClass(ConfVars.RESOURCE_MANAGER_CLASS.varname, TajoWorkerResourceManager.class);
Constructor<WorkerResourceManager> constructor = resourceManagerClass.getConstructor(MasterContext.class);
resourceManager = constructor.newInstance(context);
resourceManager.init(context.getConf());
@@ -184,8 +187,8 @@ public class TajoMaster extends CompositeService {
private void initWebServer() throws Exception {
if (!systemConf.get(CommonTestingUtil.TAJO_TEST, "FALSE").equalsIgnoreCase("TRUE")) {
- int httpPort = systemConf.getInt("tajo.master.http.port", 8080);
- webServer = StaticHttpServer.getInstance(this ,"admin", null, httpPort ,
+ InetSocketAddress address = systemConf.getSocketAddrVar(ConfVars.TAJO_MASTER_INFO_ADDRESS);
+ webServer = StaticHttpServer.getInstance(this ,"admin", address.getHostName(), address.getPort(),
true, null, context.getConf(), null);
webServer.addServlet("queryServlet", "/query_exec", QueryExecutorServlet.class);
webServer.start();
@@ -194,12 +197,12 @@ public class TajoMaster extends CompositeService {
private void checkAndInitializeSystemDirectories() throws IOException {
// Get Tajo root dir
- this.tajoRootPath = TajoConf.getTajoRootPath(systemConf);
+ this.tajoRootPath = TajoConf.getTajoRootDir(systemConf);
LOG.info("Tajo Root Directory: " + tajoRootPath);
// Check and Create Tajo root dir
this.defaultFS = tajoRootPath.getFileSystem(systemConf);
- systemConf.set("fs.defaultFS", defaultFS.getUri().toString());
+ systemConf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, defaultFS.getUri().toString());
LOG.info("FileSystem (" + this.defaultFS.getUri() + ") is initialized.");
if (!defaultFS.exists(tajoRootPath)) {
defaultFS.mkdirs(tajoRootPath, new FsPermission(TAJO_ROOT_DIR_PERMISSION));
@@ -207,20 +210,20 @@ public class TajoMaster extends CompositeService {
}
// Check and Create system and system resource dir
- Path systemPath = TajoConf.getSystemPath(systemConf);
+ Path systemPath = TajoConf.getSystemDir(systemConf);
if (!defaultFS.exists(systemPath)) {
defaultFS.mkdirs(systemPath, new FsPermission(SYSTEM_DIR_PERMISSION));
LOG.info("System dir '" + systemPath + "' is created");
}
- Path systemResourcePath = TajoConf.getSystemResourcePath(systemConf);
+ Path systemResourcePath = TajoConf.getSystemResourceDir(systemConf);
if (!defaultFS.exists(systemResourcePath)) {
defaultFS.mkdirs(systemResourcePath, new FsPermission(SYSTEM_RESOURCE_DIR_PERMISSION));
LOG.info("System resource dir '" + systemResourcePath + "' is created");
}
// Get Warehouse dir
- this.wareHousePath = TajoConf.getWarehousePath(systemConf);
- LOG.info("Tajo Warehouse Dir: " + wareHousePath);
+ this.wareHousePath = TajoConf.getWarehouseDir(systemConf);
+ LOG.info("Tajo Warehouse dir: " + wareHousePath);
// Check and Create Warehouse dir
if (!defaultFS.exists(wareHousePath)) {
@@ -228,9 +231,11 @@ public class TajoMaster extends CompositeService {
LOG.info("Warehouse dir '" + wareHousePath + "' is created");
}
- Path stagingPath = TajoConf.getStagingRoot(systemConf);
+ Path stagingPath = TajoConf.getStagingDir(systemConf);
+ LOG.info("Staging dir: " + wareHousePath);
if (!defaultFS.exists(stagingPath)) {
defaultFS.mkdirs(stagingPath, new FsPermission(STAGING_ROOTDIR_PERMISSION));
+ LOG.info("Staging dir '" + stagingPath + "' is created");
}
}
@@ -420,11 +425,16 @@ public class TajoMaster extends CompositeService {
private void writeSystemConf() throws IOException {
// Storing the system configs
- Path systemResourcePath = TajoConf.getSystemResourcePath(systemConf);
- Path systemConfPath = new Path(systemResourcePath, "system_conf.xml");
- systemConf.setVar(ConfVars.SYSTEM_CONF_PATH, systemConfPath.toUri().toString());
+ Path systemConfPath = TajoConf.getSystemConfPath(systemConf);
+
+ if (!defaultFS.exists(systemConfPath.getParent())) {
+ defaultFS.mkdirs(systemConfPath.getParent());
+ }
+
+ if (defaultFS.exists(systemConfPath)) {
+ defaultFS.delete(systemConfPath, false);
+ }
- defaultFS.delete(systemConfPath, true);
FSDataOutputStream out = FileSystem.create(defaultFS, systemConfPath,
new FsPermission(SYSTEM_CONF_FILE_PERMISSION));
try {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d5128328/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterClientService.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
index 1ed0f54..6b7d8d3 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
@@ -81,7 +81,7 @@ public class TajoMasterClientService extends AbstractService {
public void start() {
// start the rpc server
- String confClientServiceAddr = conf.getVar(ConfVars.CLIENT_SERVICE_ADDRESS);
+ String confClientServiceAddr = conf.getVar(ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS);
InetSocketAddress initIsa = NetUtils.createSocketAddr(confClientServiceAddr);
try {
server = new BlockingRpcServer(TajoMasterClientProtocol.class, clientHandler, initIsa);
@@ -90,7 +90,7 @@ public class TajoMasterClientService extends AbstractService {
}
server.start();
bindAddress = NetUtils.getConnectAddress(server.getListenAddress());
- this.conf.setVar(ConfVars.CLIENT_SERVICE_ADDRESS, NetUtils.normalizeInetSocketAddress(bindAddress));
+ this.conf.setVar(ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS, NetUtils.normalizeInetSocketAddress(bindAddress));
LOG.info("Instantiated TajoMasterClientService at " + this.bindAddress);
super.start();
}
@@ -287,26 +287,63 @@ public class TajoMasterClientService extends AbstractService {
String name = request.getTableName();
if (catalog.existsTable(name)) {
return TableResponse.newBuilder()
+ .setResultCode(ResultCode.OK)
.setTableDesc((TableDescProto) catalog.getTableDesc(name).getProto())
.build();
} else {
- return null;
+ return TableResponse.newBuilder()
+ .setResultCode(ResultCode.ERROR)
+ .setErrorMessage("No such a table: " + request.getTableName())
+ .build();
}
}
@Override
- public TableResponse createTable(RpcController controller, CreateTableRequest request)
+ public TableResponse createExternalTable(RpcController controller, CreateTableRequest request)
throws ServiceException {
- Path path = new Path(request.getPath());
- TableMeta meta = new TableMetaImpl(request.getMeta());
- TableDesc desc;
try {
- desc = context.getGlobalEngine().createTableOnDirectory(request.getName(), meta, path, false);
- } catch (Exception e) {
- return TableResponse.newBuilder().setErrorMessage(e.getMessage()).build();
- }
+ Path path = new Path(request.getPath());
+ FileSystem fs = path.getFileSystem(conf);
+
+ if (!fs.exists(path)) {
+ throw new IOException("No such a directory: " + path);
+ }
+
+ TableMeta meta = new TableMetaImpl(request.getMeta());
+
+ if (meta.getStat() == null) {
+ meta.setStat(new TableStat());
+ }
+
+ TableStat stat = meta.getStat();
+ long totalSize;
+ try {
+ totalSize = fs.getContentSummary(path).getSpaceConsumed();
+ } catch (IOException e) {
+ String message =
+ "Cannot get the volume of the table \"" + request.getName() + "\" from " + request.getPath();
+ LOG.warn(message);
+ throw new IOException(message, e);
+ }
+ stat.setNumBytes(totalSize);
- return TableResponse.newBuilder().setTableDesc((TableDescProto) desc.getProto()).build();
+ TableDesc desc;
+ try {
+ desc = context.getGlobalEngine().createTableOnDirectory(request.getName(), meta, path, false);
+ } catch (Exception e) {
+ return TableResponse.newBuilder()
+ .setResultCode(ResultCode.ERROR)
+ .setErrorMessage(e.getMessage()).build();
+ }
+
+ return TableResponse.newBuilder()
+ .setResultCode(ResultCode.OK)
+ .setTableDesc((TableDescProto) desc.getProto()).build();
+ } catch (IOException ioe) {
+ return TableResponse.newBuilder()
+ .setResultCode(ResultCode.ERROR)
+ .setErrorMessage(ioe.getMessage()).build();
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d5128328/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterService.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterService.java
index 4b27e46..04b562c 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterService.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterService.java
@@ -58,7 +58,7 @@ public class TajoMasterService extends AbstractService {
@Override
public void start() {
- String confMasterServiceAddr = conf.getVar(TajoConf.ConfVars.TAJO_MASTER_SERVICE_ADDRESS);
+ String confMasterServiceAddr = conf.getVar(TajoConf.ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS);
InetSocketAddress initIsa = NetUtils.createSocketAddr(confMasterServiceAddr);
try {
server = new AsyncRpcServer(TajoMasterProtocol.class, masterHandler, initIsa);
@@ -67,7 +67,7 @@ public class TajoMasterService extends AbstractService {
}
server.start();
bindAddress = NetUtils.getConnectAddress(server.getListenAddress());
- this.conf.setVar(TajoConf.ConfVars.TAJO_MASTER_SERVICE_ADDRESS,
+ this.conf.setVar(TajoConf.ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS,
NetUtils.normalizeInetSocketAddress(bindAddress));
LOG.info("Instantiated TajoMasterService at " + this.bindAddress);
super.start();
@@ -139,7 +139,7 @@ public class TajoMasterService extends AbstractService {
WorkerResource workerResource = new WorkerResource();
String[] tokens = eachWorkerResource.getWorkerHostAndPort().split(":");
workerResource.setAllocatedHost(tokens[0]);
- workerResource.setManagerPort(Integer.parseInt(tokens[1]));
+ workerResource.setPeerRpcPort(Integer.parseInt(tokens[1]));
workerResource.setMemoryMBSlots(eachWorkerResource.getMemoryMBSlots());
workerResource.setDiskSlots(eachWorkerResource.getDiskSlots());
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d5128328/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/YarnContainerProxy.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/YarnContainerProxy.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/YarnContainerProxy.java
index c4fd8b8..5117700 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/YarnContainerProxy.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/YarnContainerProxy.java
@@ -296,7 +296,7 @@ public class YarnContainerProxy extends ContainerProxy {
Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
FileSystem fs = null;
FileContext fsCtx = null;
- LOG.info("defaultFS: " + conf.get("fs.defaultFS"));
+ LOG.info("defaultFS: " + conf.get(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY));
try {
fs = FileSystem.get(conf);
fsCtx = FileContext.getFileContext(conf);
@@ -305,7 +305,7 @@ public class YarnContainerProxy extends ContainerProxy {
}
try {
- Path systemConfPath = TajoConf.getSystemConf(conf);
+ Path systemConfPath = TajoConf.getSystemConfPath(conf);
if (!fs.exists(systemConfPath)) {
LOG.error("system_conf.xml (" + systemConfPath.toString() + ") Not Found");
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d5128328/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/YarnTaskRunnerLauncherImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/YarnTaskRunnerLauncherImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/YarnTaskRunnerLauncherImpl.java
index c07d759..c8792ca 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/YarnTaskRunnerLauncherImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/YarnTaskRunnerLauncherImpl.java
@@ -68,7 +68,7 @@ public class YarnTaskRunnerLauncherImpl extends AbstractService implements TaskR
this.context = context;
this.yarnRPC = yarnRPC;
executorService = Executors.newFixedThreadPool(
- context.getConf().getIntVar(TajoConf.ConfVars.AM_TASKRUNNER_LAUNCH_PARALLEL_NUM));
+ context.getConf().getIntVar(TajoConf.ConfVars.YARN_RM_TASKRUNNER_LAUNCH_PARALLEL_NUM));
}
public void start() {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d5128328/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInfo.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInfo.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInfo.java
index 89084d1..f89a017 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInfo.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInfo.java
@@ -66,7 +66,7 @@ public class QueryInfo {
if(queryMasterResource == null) {
return 0;
}
- return queryMasterResource.getManagerPort();
+ return queryMasterResource.getPeerRpcPort();
}
public int getQueryMasterClientPort() {
@@ -122,6 +122,6 @@ public class QueryInfo {
@Override
public String toString() {
- return queryId.toString() + "state=" + queryState +", queryMaster=" + queryMasterResource;
+ return queryId.toString() + "state=" + queryState +",progress=" + progress + ", queryMaster=" + queryMasterResource;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d5128328/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
index faa5a5c..f5f029f 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
@@ -171,13 +171,14 @@ public class QueryJobManager extends CompositeService {
if(queryHeartbeat.getTajoWorkerHost() != null) {
WorkerResource queryMasterResource = new WorkerResource();
queryMasterResource.setAllocatedHost(queryHeartbeat.getTajoWorkerHost());
- queryMasterResource.setManagerPort(queryHeartbeat.getTajoWorkerPort());
+ queryMasterResource.setPeerRpcPort(queryHeartbeat.getTajoWorkerPort());
queryMasterResource.setClientPort(queryHeartbeat.getTajoWorkerClientPort());
queryMasterResource.setPullServerPort(queryHeartbeat.getTajoWorkerPullServerPort());
queryInfo.setQueryMasterResource(queryMasterResource);
}
queryInfo.setLastMessage(queryHeartbeat.getStatusMessage());
queryInfo.setQueryState(queryHeartbeat.getState());
+ queryInfo.setProgress(queryHeartbeat.getQueryProgress());
if (queryHeartbeat.hasQueryFinishTime()) {
queryInfo.setFinishTime(queryHeartbeat.getQueryFinishTime());
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d5128328/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
index fa6790d..2860b17 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
@@ -53,7 +53,7 @@ import static org.apache.tajo.ipc.TajoMasterProtocol.TajoHeartbeat;
public class QueryMaster extends CompositeService implements EventHandler {
private static final Log LOG = LogFactory.getLog(QueryMaster.class.getName());
- private static int QUERY_SESSION_TIMEOUT = 60 * 1000; //60 sec
+ private int querySessionTimeout;
private Clock clock;
@@ -91,7 +91,7 @@ public class QueryMaster extends CompositeService implements EventHandler {
try {
systemConf = (TajoConf)conf;
- QUERY_SESSION_TIMEOUT = 60 * 1000;
+ querySessionTimeout = systemConf.getIntVar(TajoConf.ConfVars.QUERY_SESSION_TIMEOUT);
queryMasterContext = new QueryMasterContext(systemConf);
clock = new SystemClock();
@@ -368,7 +368,7 @@ public class QueryMaster extends CompositeService implements EventHandler {
try {
long lastHeartbeat = eachTask.getLastClientHeartbeat();
long time = System.currentTimeMillis() - lastHeartbeat;
- if(lastHeartbeat > 0 && time > QUERY_SESSION_TIMEOUT) {
+ if(lastHeartbeat > 0 && time > querySessionTimeout * 1000) {
LOG.warn("Query " + eachTask.getQueryId() + " stopped cause query sesstion timeout: " + time + " ms");
eachTask.expiredSessionTimeout();
}
@@ -383,8 +383,8 @@ public class QueryMaster extends CompositeService implements EventHandler {
class FinishedQueryMasterTaskCleanThread extends Thread {
public void run() {
- int expireIntervalTime = systemConf.getInt("tajo.worker.history.expire.interval.min", 12 * 60); //12 hour
- LOG.info("FinishedQueryMasterTaskCleanThread started: expireIntervalTime=" + expireIntervalTime);
+ int expireIntervalTime = systemConf.getIntVar(TajoConf.ConfVars.WORKER_HISTORY_EXPIRE_PERIOD);
+ LOG.info("FinishedQueryMasterTaskCleanThread started: expire interval minutes = " + expireIntervalTime);
while(!queryMasterStop.get()) {
try {
Thread.sleep(60 * 1000 * 60); // hourly
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d5128328/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
index ae1508c..26cba45 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
@@ -113,8 +113,7 @@ public class QueryMasterTask extends CompositeService {
try {
queryTaskContext = new QueryMasterTaskContext();
- String resourceManagerClassName = conf.get("tajo.resource.manager",
- TajoWorkerResourceManager.class.getCanonicalName());
+ String resourceManagerClassName = systemConf.getVar(TajoConf.ConfVars.RESOURCE_MANAGER_CLASS);
if(resourceManagerClassName.indexOf(TajoWorkerResourceManager.class.getName()) >= 0) {
resourceAllocator = new TajoResourceAllocator(queryTaskContext);
@@ -307,7 +306,7 @@ public class QueryMasterTask extends CompositeService {
ugi = UserGroupInformation.getLoginUser();
realUser = ugi.getShortUserName();
currentUser = UserGroupInformation.getCurrentUser().getShortUserName();
- FileSystem defaultFS = FileSystem.get(systemConf);
+ FileSystem defaultFS = TajoConf.getWarehouseDir(systemConf).getFileSystem(systemConf);
Path stagingDir = null;
Path outputDir = null;
@@ -316,7 +315,7 @@ public class QueryMasterTask extends CompositeService {
// Create Output Directory
////////////////////////////////////////////
- stagingDir = new Path(TajoConf.getStagingRoot(systemConf), queryId.toString());
+ stagingDir = new Path(TajoConf.getStagingDir(systemConf), queryId.toString());
if (defaultFS.exists(stagingDir)) {
throw new IOException("The staging directory '" + stagingDir + "' already exists");
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d5128328/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java
index 9b9c63f..f01cb75 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java
@@ -273,7 +273,6 @@ public class QueryUnitAttempt implements EventHandler<TaskAttemptEvent> {
@Override
public void transition(QueryUnitAttempt queryUnitAttempt,
TaskAttemptEvent taskAttemptEvent) {
- LOG.info(">>>>>>>>> Already Assigned: " + queryUnitAttempt.getId());
}
}
@@ -283,7 +282,6 @@ public class QueryUnitAttempt implements EventHandler<TaskAttemptEvent> {
@Override
public void transition(QueryUnitAttempt queryUnitAttempt,
TaskAttemptEvent taskAttemptEvent) {
- LOG.info(">>>>>>>>> Already Done: " + queryUnitAttempt.getId());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d5128328/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
index 10795ea..178e9b5 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
@@ -161,7 +161,7 @@ public class Repartitioner {
// of a larger table
int largerIdx = stats[0].getNumBytes() >= stats[1].getNumBytes() ? 0 : 1;
int desireJoinTaskVolumn = subQuery.getContext().getConf().
- getIntVar(ConfVars.JOIN_TASK_VOLUME);
+ getIntVar(ConfVars.DIST_QUERY_JOIN_TASK_VOLUME);
// calculate the number of tasks according to the data size
int mb = (int) Math.ceil((double)stats[largerIdx].getNumBytes() / 1048576);