You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2015/01/08 17:17:31 UTC
[14/16] tajo git commit: TAJO-1288: Refactoring
org.apache.tajo.master package.
TAJO-1288: Refactoring org.apache.tajo.master package.
Closes #338
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/1c29c1cb
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/1c29c1cb
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/1c29c1cb
Branch: refs/heads/index_support
Commit: 1c29c1cb4bd0e2d75954575717cb5cf05875fe51
Parents: a1e0328
Author: Hyunsik Choi <hy...@apache.org>
Authored: Fri Jan 9 00:31:54 2015 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Fri Jan 9 00:31:54 2015 +0900
----------------------------------------------------------------------
CHANGES | 2 +
.../apache/tajo/engine/query/QueryContext.java | 2 +-
.../main/java/org/apache/tajo/ha/HAService.java | 56 +
.../org/apache/tajo/ha/HAServiceHDFSImpl.java | 316 +++++
.../java/org/apache/tajo/ha/TajoMasterInfo.java | 89 ++
.../tajo/master/AbstractTaskScheduler.java | 56 -
.../org/apache/tajo/master/ContainerProxy.java | 2 +-
.../tajo/master/DefaultTaskScheduler.java | 928 ------------
.../apache/tajo/master/FetchScheduleEvent.java | 40 -
.../org/apache/tajo/master/FragmentPair.java | 73 -
.../org/apache/tajo/master/GlobalEngine.java | 2 +-
.../NonForwardQueryResultFileScanner.java | 164 ---
.../master/NonForwardQueryResultScanner.java | 46 -
.../NonForwardQueryResultSystemScanner.java | 616 --------
.../java/org/apache/tajo/master/QueryInfo.java | 235 +++
.../org/apache/tajo/master/QueryJobManager.java | 311 ++++
.../apache/tajo/master/ScheduledFetches.java | 49 -
.../apache/tajo/master/TajoContainerProxy.java | 2 +-
.../java/org/apache/tajo/master/TajoMaster.java | 11 +-
.../tajo/master/TajoMasterClientService.java | 13 +-
.../apache/tajo/master/TajoMasterService.java | 2 -
.../tajo/master/TaskSchedulerContext.java | 65 -
.../tajo/master/TaskSchedulerFactory.java | 69 -
.../tajo/master/event/QueryCompletedEvent.java | 2 +-
.../tajo/master/event/QueryStartEvent.java | 2 +-
.../tajo/master/event/StageCompletedEvent.java | 2 +-
.../event/TaskAttemptToSchedulerEvent.java | 2 +-
.../apache/tajo/master/exec/DDLExecutor.java | 1 -
.../exec/NonForwardQueryResultFileScanner.java | 164 +++
.../exec/NonForwardQueryResultScanner.java | 46 +
.../NonForwardQueryResultSystemScanner.java | 616 ++++++++
.../apache/tajo/master/exec/QueryExecutor.java | 9 +-
.../org/apache/tajo/master/ha/HAService.java | 56 -
.../tajo/master/ha/HAServiceHDFSImpl.java | 318 -----
.../apache/tajo/master/ha/TajoMasterInfo.java | 89 --
.../master/metrics/CatalogMetricsGaugeSet.java | 56 -
.../metrics/WorkerResourceMetricsGaugeSet.java | 74 -
.../apache/tajo/master/querymaster/Query.java | 738 ----------
.../master/querymaster/QueryInProgress.java | 300 ----
.../tajo/master/querymaster/QueryInfo.java | 235 ---
.../tajo/master/querymaster/QueryJobEvent.java | 45 -
.../master/querymaster/QueryJobManager.java | 310 ----
.../tajo/master/querymaster/QueryMaster.java | 631 --------
.../querymaster/QueryMasterManagerService.java | 263 ----
.../master/querymaster/QueryMasterRunner.java | 149 --
.../master/querymaster/QueryMasterTask.java | 638 ---------
.../tajo/master/querymaster/Repartitioner.java | 1251 ----------------
.../apache/tajo/master/querymaster/Stage.java | 1342 ------------------
.../tajo/master/querymaster/StageState.java | 30 -
.../apache/tajo/master/querymaster/Task.java | 907 ------------
.../tajo/master/querymaster/TaskAttempt.java | 443 ------
.../master/rm/TajoWorkerResourceManager.java | 3 +-
.../tajo/master/rm/WorkerResourceManager.java | 2 +-
.../master/scheduler/QuerySchedulingInfo.java | 55 +
.../apache/tajo/master/scheduler/Scheduler.java | 41 +
.../master/scheduler/SchedulingAlgorithms.java | 47 +
.../master/scheduler/SimpleFifoScheduler.java | 147 ++
.../master/session/InvalidSessionException.java | 25 -
.../session/NoSuchSessionVariableException.java | 25 -
.../org/apache/tajo/master/session/Session.java | 196 ---
.../tajo/master/session/SessionConstants.java | 23 -
.../tajo/master/session/SessionEvent.java | 34 -
.../tajo/master/session/SessionEventType.java | 24 -
.../session/SessionLivelinessMonitor.java | 53 -
.../tajo/master/session/SessionManager.java | 144 --
.../tajo/metrics/CatalogMetricsGaugeSet.java | 56 +
.../metrics/WorkerResourceMetricsGaugeSet.java | 74 +
.../tajo/querymaster/AbstractTaskScheduler.java | 56 +
.../tajo/querymaster/DefaultTaskScheduler.java | 926 ++++++++++++
.../tajo/querymaster/FetchScheduleEvent.java | 40 +
.../java/org/apache/tajo/querymaster/Query.java | 738 ++++++++++
.../tajo/querymaster/QueryInProgress.java | 301 ++++
.../apache/tajo/querymaster/QueryJobEvent.java | 46 +
.../apache/tajo/querymaster/QueryMaster.java | 631 ++++++++
.../querymaster/QueryMasterManagerService.java | 262 ++++
.../tajo/querymaster/QueryMasterTask.java | 638 +++++++++
.../apache/tajo/querymaster/Repartitioner.java | 1250 ++++++++++++++++
.../java/org/apache/tajo/querymaster/Stage.java | 1342 ++++++++++++++++++
.../org/apache/tajo/querymaster/StageState.java | 30 +
.../java/org/apache/tajo/querymaster/Task.java | 897 ++++++++++++
.../apache/tajo/querymaster/TaskAttempt.java | 443 ++++++
.../tajo/querymaster/TaskSchedulerContext.java | 65 +
.../tajo/querymaster/TaskSchedulerFactory.java | 68 +
.../tajo/scheduler/QuerySchedulingInfo.java | 55 -
.../org/apache/tajo/scheduler/Scheduler.java | 41 -
.../tajo/scheduler/SchedulingAlgorithms.java | 47 -
.../tajo/scheduler/SimpleFifoScheduler.java | 147 --
.../tajo/session/InvalidSessionException.java | 25 +
.../session/NoSuchSessionVariableException.java | 25 +
.../java/org/apache/tajo/session/Session.java | 196 +++
.../apache/tajo/session/SessionConstants.java | 23 +
.../org/apache/tajo/session/SessionEvent.java | 34 +
.../apache/tajo/session/SessionEventType.java | 24 +
.../tajo/session/SessionLivelinessMonitor.java | 53 +
.../org/apache/tajo/session/SessionManager.java | 144 ++
.../main/java/org/apache/tajo/util/JSPUtil.java | 10 +-
.../apache/tajo/util/history/HistoryReader.java | 2 +-
.../apache/tajo/util/history/HistoryWriter.java | 2 +-
.../java/org/apache/tajo/worker/FetchImpl.java | 4 +-
.../tajo/worker/TajoResourceAllocator.java | 6 +-
.../java/org/apache/tajo/worker/TajoWorker.java | 6 +-
.../tajo/worker/TajoWorkerClientService.java | 2 +-
tajo-core/src/main/resources/tajo-default.xml | 2 +-
.../resources/webapps/admin/catalogview.jsp | 2 +-
.../main/resources/webapps/admin/cluster.jsp | 4 +-
.../src/main/resources/webapps/admin/index.jsp | 6 +-
.../src/main/resources/webapps/admin/query.jsp | 4 +-
.../resources/webapps/admin/query_executor.jsp | 2 +-
.../src/main/resources/webapps/worker/index.jsp | 4 +-
.../resources/webapps/worker/querydetail.jsp | 4 +-
.../main/resources/webapps/worker/queryplan.jsp | 6 +-
.../resources/webapps/worker/querytasks.jsp | 2 +-
.../src/main/resources/webapps/worker/task.jsp | 8 +-
.../apache/tajo/LocalTajoTestingUtility.java | 2 +-
.../org/apache/tajo/TajoTestingCluster.java | 8 +-
.../tajo/engine/planner/TestLogicalPlanner.java | 2 +-
.../planner/physical/TestPhysicalPlanner.java | 2 +-
.../tajo/engine/query/TestGroupByQuery.java | 8 +-
.../tajo/engine/query/TestJoinBroadcast.java | 2 +-
.../tajo/engine/query/TestTablePartitions.java | 2 +-
.../apache/tajo/ha/TestHAServiceHDFSImpl.java | 153 ++
.../TestNonForwardQueryResultSystemScanner.java | 4 +-
.../apache/tajo/master/TestRepartitioner.java | 8 +-
.../tajo/master/ha/TestHAServiceHDFSImpl.java | 158 ---
.../querymaster/TestIntermediateEntry.java | 53 -
.../tajo/master/querymaster/TestKillQuery.java | 125 --
.../master/querymaster/TestQueryProgress.java | 75 -
.../querymaster/TestTaskStatusUpdate.java | 194 ---
.../master/scheduler/TestFifoScheduler.java | 116 ++
.../tajo/querymaster/TestIntermediateEntry.java | 53 +
.../apache/tajo/querymaster/TestKillQuery.java | 125 ++
.../tajo/querymaster/TestQueryProgress.java | 75 +
.../tajo/querymaster/TestTaskStatusUpdate.java | 194 +++
.../tajo/scheduler/TestFifoScheduler.java | 116 --
.../java/org/apache/tajo/util/TestJSPUtil.java | 2 +-
.../util/history/TestHistoryWriterReader.java | 2 +-
.../org/apache/tajo/worker/TestHistory.java | 2 +-
tajo-dist/pom.xml | 8 +-
138 files changed, 11317 insertions(+), 11612 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 7783db8..96b63ea 100644
--- a/CHANGES
+++ b/CHANGES
@@ -27,6 +27,8 @@ Release 0.9.1 - unreleased
IMPROVEMENT
+ TAJO-1288: Refactoring org.apache.tajo.master package. (hyunsik)
+
TAJO-1279: Cleanup TajoAsyncDispatcher and interrupt stop events.
(jinho)
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryContext.java b/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryContext.java
index 493ca6e..7b3c00d 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryContext.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryContext.java
@@ -25,7 +25,7 @@ import org.apache.tajo.QueryVars;
import org.apache.tajo.SessionVars;
import org.apache.tajo.catalog.partition.PartitionMethodDesc;
import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.master.session.Session;
+import org.apache.tajo.session.Session;
import org.apache.tajo.plan.logical.NodeType;
import static org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.KeyValueSetProto;
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/ha/HAService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/ha/HAService.java b/tajo-core/src/main/java/org/apache/tajo/ha/HAService.java
new file mode 100644
index 0000000..1329223
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/ha/HAService.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.ha;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * The HAService is responsible for setting active TajoMaster on startup or when the
+ * current active is changing (eg due to failure), monitoring the health of TajoMaster.
+ *
+ */
+public interface HAService {
+
+ /**
+ * Add master name to shared storage.
+ */
+ public void register() throws IOException;
+
+
+ /**
+ * Delete master name to shared storage.
+ *
+ */
+ public void delete() throws IOException;
+
+ /**
+ *
+ * @return True if current master is an active master.
+ */
+ public boolean isActiveStatus();
+
+ /**
+ *
+ * @return return all master list
+ * @throws IOException
+ */
+ public List<TajoMasterInfo> getMasters() throws IOException;
+
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/ha/HAServiceHDFSImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/ha/HAServiceHDFSImpl.java b/tajo-core/src/main/java/org/apache/tajo/ha/HAServiceHDFSImpl.java
new file mode 100644
index 0000000..e18a9b2
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/ha/HAServiceHDFSImpl.java
@@ -0,0 +1,316 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.ha;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.tajo.TajoConstants;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.master.TajoMaster;
+import org.apache.tajo.master.TajoMaster.MasterContext;
+import org.apache.tajo.util.TUtil;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.List;
+
+/**
+ * This implements HAService utilizing HDFS cluster. This saves master status to HDFS cluster.
+ *
+ */
+public class HAServiceHDFSImpl implements HAService {
+ private static Log LOG = LogFactory.getLog(HAServiceHDFSImpl.class);
+
+ private MasterContext context;
+ private TajoConf conf;
+
+ private FileSystem fs;
+
+ private String masterName;
+ private Path rootPath;
+ private Path haPath;
+ private Path activePath;
+ private Path backupPath;
+
+ private boolean isActiveStatus = false;
+
+ //thread which runs periodically to see the last time since a heartbeat is received.
+ private Thread checkerThread;
+ private volatile boolean stopped = false;
+
+ private int monitorInterval;
+
+ private String currentActiveMaster;
+
+ public HAServiceHDFSImpl(MasterContext context) throws IOException {
+ this.context = context;
+ this.conf = context.getConf();
+ initSystemDirectory();
+
+ InetSocketAddress socketAddress = context.getTajoMasterService().getBindAddress();
+ this.masterName = socketAddress.getAddress().getHostAddress() + ":" + socketAddress.getPort();
+
+ monitorInterval = conf.getIntVar(TajoConf.ConfVars.TAJO_MASTER_HA_MONITOR_INTERVAL);
+ }
+
+ private void initSystemDirectory() throws IOException {
+ // Get Tajo root dir
+ this.rootPath = TajoConf.getTajoRootDir(conf);
+
+ // Check Tajo root dir
+ this.fs = rootPath.getFileSystem(conf);
+
+ // Check and create Tajo system HA dir
+ haPath = TajoConf.getSystemHADir(conf);
+ if (!fs.exists(haPath)) {
+ fs.mkdirs(haPath, new FsPermission(TajoMaster.SYSTEM_RESOURCE_DIR_PERMISSION));
+ LOG.info("System HA dir '" + haPath + "' is created");
+ }
+
+ activePath = new Path(haPath, TajoConstants.SYSTEM_HA_ACTIVE_DIR_NAME);
+ if (!fs.exists(activePath)) {
+ fs.mkdirs(activePath, new FsPermission(TajoMaster.SYSTEM_RESOURCE_DIR_PERMISSION));
+ LOG.info("System HA Active dir '" + activePath + "' is created");
+ }
+
+ backupPath = new Path(haPath, TajoConstants.SYSTEM_HA_BACKUP_DIR_NAME);
+ if (!fs.exists(backupPath)) {
+ fs.mkdirs(backupPath, new FsPermission(TajoMaster.SYSTEM_RESOURCE_DIR_PERMISSION));
+ LOG.info("System HA Backup dir '" + backupPath + "' is created");
+ }
+ }
+
+ private void startPingChecker() {
+ if (checkerThread == null) {
+ checkerThread = new Thread(new PingChecker());
+ checkerThread.setName("Ping Checker");
+ checkerThread.start();
+ }
+ }
+
+ @Override
+ public void register() throws IOException {
+ FileStatus[] files = fs.listStatus(activePath);
+
+ // Phase 1: If there is not another active master, this try to become active master.
+ if (files.length == 0) {
+ createMasterFile(true);
+ currentActiveMaster = masterName;
+ LOG.info(String.format("This is added to active master (%s)", masterName));
+ } else {
+ // Phase 2: If there is active master information, we need to check its status.
+ Path activePath = files[0].getPath();
+ currentActiveMaster = activePath.getName().replaceAll("_", ":");
+
+ // Phase 3: If current active master is dead, this master should be active master.
+ if (!HAServiceUtil.isMasterAlive(currentActiveMaster, conf)) {
+ fs.delete(activePath, true);
+ createMasterFile(true);
+ currentActiveMaster = masterName;
+ LOG.info(String.format("This is added to active master (%s)", masterName));
+ } else {
+ // Phase 4: If current active master is alive, this master need to be backup master.
+ createMasterFile(false);
+ LOG.info(String.format("This is added to backup masters (%s)", masterName));
+ }
+ }
+ }
+
+ private void createMasterFile(boolean isActive) throws IOException {
+ String fileName = masterName.replaceAll(":", "_");
+ Path path = null;
+
+ if (isActive) {
+ path = new Path(activePath, fileName);
+ } else {
+ path = new Path(backupPath, fileName);
+ }
+
+ StringBuilder sb = new StringBuilder();
+ InetSocketAddress address = getHostAddress(HAConstants.MASTER_CLIENT_RPC_ADDRESS);
+ sb.append(address.getAddress().getHostAddress()).append(":").append(address.getPort()).append("_");
+
+ address = getHostAddress(HAConstants.RESOURCE_TRACKER_RPC_ADDRESS);
+ sb.append(address.getAddress().getHostAddress()).append(":").append(address.getPort()).append("_");
+
+ address = getHostAddress(HAConstants.CATALOG_ADDRESS);
+ sb.append(address.getAddress().getHostAddress()).append(":").append(address.getPort()).append("_");
+
+ address = getHostAddress(HAConstants.MASTER_INFO_ADDRESS);
+ sb.append(address.getAddress().getHostAddress()).append(":").append(address.getPort());
+
+ FSDataOutputStream out = fs.create(path);
+
+ try {
+ out.writeUTF(sb.toString());
+ out.hflush();
+ out.close();
+ } catch (FileAlreadyExistsException e) {
+ createMasterFile(false);
+ }
+
+ if (isActive) {
+ isActiveStatus = true;
+ } else {
+ isActiveStatus = false;
+ }
+
+ startPingChecker();
+ }
+
+
+ private InetSocketAddress getHostAddress(int type) {
+ InetSocketAddress address = null;
+
+ switch (type) {
+ case HAConstants.MASTER_UMBILICAL_RPC_ADDRESS:
+ address = context.getConf().getSocketAddrVar(TajoConf.ConfVars
+ .TAJO_MASTER_UMBILICAL_RPC_ADDRESS);
+ break;
+ case HAConstants.MASTER_CLIENT_RPC_ADDRESS:
+ address = context.getConf().getSocketAddrVar(TajoConf.ConfVars
+ .TAJO_MASTER_CLIENT_RPC_ADDRESS);
+ break;
+ case HAConstants.RESOURCE_TRACKER_RPC_ADDRESS:
+ address = context.getConf().getSocketAddrVar(TajoConf.ConfVars
+ .RESOURCE_TRACKER_RPC_ADDRESS);
+ break;
+ case HAConstants.CATALOG_ADDRESS:
+ address = context.getConf().getSocketAddrVar(TajoConf.ConfVars
+ .CATALOG_ADDRESS);
+ break;
+ case HAConstants.MASTER_INFO_ADDRESS:
+ address = context.getConf().getSocketAddrVar(TajoConf.ConfVars
+ .TAJO_MASTER_INFO_ADDRESS);
+ default:
+ break;
+ }
+
+ return NetUtils.createSocketAddr(masterName.split(":")[0] + ":" + address.getPort());
+ }
+
+ @Override
+ public void delete() throws IOException {
+ String fileName = masterName.replaceAll(":", "_");
+
+ Path activeFile = new Path(activePath, fileName);
+ if (fs.exists(activeFile)) {
+ fs.delete(activeFile, true);
+ }
+
+ Path backupFile = new Path(backupPath, fileName);
+ if (fs.exists(backupFile)) {
+ fs.delete(backupFile, true);
+ }
+ if (isActiveStatus) {
+ isActiveStatus = false;
+ }
+ stopped = true;
+ }
+
+ @Override
+ public boolean isActiveStatus() {
+ return isActiveStatus;
+ }
+
+ @Override
+ public List<TajoMasterInfo> getMasters() throws IOException {
+ List<TajoMasterInfo> list = TUtil.newList();
+ Path path = null;
+
+ FileStatus[] files = fs.listStatus(activePath);
+ if (files.length == 1) {
+ path = files[0].getPath();
+ list.add(createTajoMasterInfo(path, true));
+ }
+
+ files = fs.listStatus(backupPath);
+ for (FileStatus status : files) {
+ path = status.getPath();
+ list.add(createTajoMasterInfo(path, false));
+ }
+
+ return list;
+ }
+
+ private TajoMasterInfo createTajoMasterInfo(Path path, boolean isActive) throws IOException {
+ String masterAddress = path.getName().replaceAll("_", ":");
+ boolean isAlive = HAServiceUtil.isMasterAlive(masterAddress, conf);
+
+ FSDataInputStream stream = fs.open(path);
+ String data = stream.readUTF();
+
+ stream.close();
+
+ String[] addresses = data.split("_");
+ TajoMasterInfo info = new TajoMasterInfo();
+
+ info.setTajoMasterAddress(NetUtils.createSocketAddr(masterAddress));
+ info.setTajoClientAddress(NetUtils.createSocketAddr(addresses[0]));
+ info.setWorkerResourceTrackerAddr(NetUtils.createSocketAddr(addresses[1]));
+ info.setCatalogAddress(NetUtils.createSocketAddr(addresses[2]));
+ info.setWebServerAddress(NetUtils.createSocketAddr(addresses[3]));
+
+ info.setAvailable(isAlive);
+ info.setActive(isActive);
+
+ return info;
+ }
+
+ private class PingChecker implements Runnable {
+ @Override
+ public void run() {
+ while (!stopped && !Thread.currentThread().isInterrupted()) {
+ synchronized (HAServiceHDFSImpl.this) {
+ try {
+ if (!currentActiveMaster.equals(masterName)) {
+ boolean isAlive = HAServiceUtil.isMasterAlive(currentActiveMaster, conf);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("currentActiveMaster:" + currentActiveMaster + ", thisMasterName:" + masterName
+ + ", isAlive:" + isAlive);
+ }
+
+ // If active master is dead, this master should be active master instead of
+ // previous active master.
+ if (!isAlive) {
+ FileStatus[] files = fs.listStatus(activePath);
+ if (files.length == 0 || (files.length == 1
+ && currentActiveMaster.equals(files[0].getPath().getName().replaceAll("_", ":")))) {
+ delete();
+ register();
+ }
+ }
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ try {
+ Thread.sleep(monitorInterval);
+ } catch (InterruptedException e) {
+ LOG.info("PingChecker interrupted. - masterName:" + masterName);
+ break;
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/ha/TajoMasterInfo.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/ha/TajoMasterInfo.java b/tajo-core/src/main/java/org/apache/tajo/ha/TajoMasterInfo.java
new file mode 100644
index 0000000..c6fdd40
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/ha/TajoMasterInfo.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.ha;
+
+import java.net.InetSocketAddress;
+
+public class TajoMasterInfo {
+
+ private boolean available;
+ private boolean isActive;
+
+ private InetSocketAddress tajoMasterAddress;
+ private InetSocketAddress tajoClientAddress;
+ private InetSocketAddress workerResourceTrackerAddr;
+ private InetSocketAddress catalogAddress;
+ private InetSocketAddress webServerAddress;
+
+ public InetSocketAddress getTajoMasterAddress() {
+ return tajoMasterAddress;
+ }
+
+ public void setTajoMasterAddress(InetSocketAddress tajoMasterAddress) {
+ this.tajoMasterAddress = tajoMasterAddress;
+ }
+
+ public InetSocketAddress getTajoClientAddress() {
+ return tajoClientAddress;
+ }
+
+ public void setTajoClientAddress(InetSocketAddress tajoClientAddress) {
+ this.tajoClientAddress = tajoClientAddress;
+ }
+
+ public InetSocketAddress getWorkerResourceTrackerAddr() {
+ return workerResourceTrackerAddr;
+ }
+
+ public void setWorkerResourceTrackerAddr(InetSocketAddress workerResourceTrackerAddr) {
+ this.workerResourceTrackerAddr = workerResourceTrackerAddr;
+ }
+
+ public InetSocketAddress getCatalogAddress() {
+ return catalogAddress;
+ }
+
+ public void setCatalogAddress(InetSocketAddress catalogAddress) {
+ this.catalogAddress = catalogAddress;
+ }
+
+ public InetSocketAddress getWebServerAddress() {
+ return webServerAddress;
+ }
+
+ public void setWebServerAddress(InetSocketAddress webServerAddress) {
+ this.webServerAddress = webServerAddress;
+ }
+
+ public boolean isAvailable() {
+ return available;
+ }
+
+ public void setAvailable(boolean available) {
+ this.available = available;
+ }
+
+ public boolean isActive() {
+ return isActive;
+ }
+
+ public void setActive(boolean active) {
+ isActive = active;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/AbstractTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/AbstractTaskScheduler.java b/tajo-core/src/main/java/org/apache/tajo/master/AbstractTaskScheduler.java
deleted file mode 100644
index 320a5aa..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/AbstractTaskScheduler.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master;
-
-import org.apache.hadoop.service.AbstractService;
-import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.tajo.master.event.TaskRequestEvent;
-import org.apache.tajo.master.event.TaskSchedulerEvent;
-
-
-public abstract class AbstractTaskScheduler extends AbstractService implements EventHandler<TaskSchedulerEvent> {
-
- protected int hostLocalAssigned;
- protected int rackLocalAssigned;
- protected int totalAssigned;
-
- /**
- * Construct the service.
- *
- * @param name service name
- */
- public AbstractTaskScheduler(String name) {
- super(name);
- }
-
- public int getHostLocalAssigned() {
- return hostLocalAssigned;
- }
-
- public int getRackLocalAssigned() {
- return rackLocalAssigned;
- }
-
- public int getTotalAssigned() {
- return totalAssigned;
- }
-
- public abstract void handleTaskRequestEvent(TaskRequestEvent event);
- public abstract int remainingScheduledObjectNum();
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/ContainerProxy.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/ContainerProxy.java b/tajo-core/src/main/java/org/apache/tajo/master/ContainerProxy.java
index 462de91..562790d 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/ContainerProxy.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/ContainerProxy.java
@@ -24,7 +24,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.master.querymaster.QueryMasterTask;
+import org.apache.tajo.querymaster.QueryMasterTask;
import org.apache.tajo.master.container.TajoContainer;
import org.apache.tajo.master.container.TajoContainerId;
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java b/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
deleted file mode 100644
index d47c93a..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
+++ /dev/null
@@ -1,928 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.util.RackResolver;
-import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.QueryIdFactory;
-import org.apache.tajo.TaskAttemptId;
-import org.apache.tajo.engine.planner.global.ExecutionBlock;
-import org.apache.tajo.engine.planner.global.MasterPlan;
-import org.apache.tajo.engine.query.TaskRequest;
-import org.apache.tajo.engine.query.TaskRequestImpl;
-import org.apache.tajo.ipc.TajoWorkerProtocol;
-import org.apache.tajo.master.cluster.WorkerConnectionInfo;
-import org.apache.tajo.master.container.TajoContainerId;
-import org.apache.tajo.master.event.*;
-import org.apache.tajo.master.event.TaskAttemptToSchedulerEvent.TaskAttemptScheduleContext;
-import org.apache.tajo.master.event.TaskSchedulerEvent.EventType;
-import org.apache.tajo.master.querymaster.Stage;
-import org.apache.tajo.master.querymaster.Task;
-import org.apache.tajo.master.querymaster.TaskAttempt;
-import org.apache.tajo.plan.serder.LogicalNodeSerializer;
-import org.apache.tajo.plan.serder.PlanProto;
-import org.apache.tajo.storage.DataLocation;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.storage.fragment.Fragment;
-import org.apache.tajo.util.NetUtils;
-import org.apache.tajo.worker.FetchImpl;
-
-import java.util.*;
-import java.util.Map.Entry;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
-
-public class DefaultTaskScheduler extends AbstractTaskScheduler {
- private static final Log LOG = LogFactory.getLog(DefaultTaskScheduler.class);
-
- private final TaskSchedulerContext context;
- private Stage stage;
-
- private Thread schedulingThread;
- private AtomicBoolean stopEventHandling = new AtomicBoolean(false);
-
- private ScheduledRequests scheduledRequests;
- private TaskRequests taskRequests;
-
- private int nextTaskId = 0;
- private int scheduledObjectNum = 0;
-
- public DefaultTaskScheduler(TaskSchedulerContext context, Stage stage) {
- super(DefaultTaskScheduler.class.getName());
- this.context = context;
- this.stage = stage;
- }
-
- @Override
- public void init(Configuration conf) {
-
- scheduledRequests = new ScheduledRequests();
- taskRequests = new TaskRequests();
-
- super.init(conf);
- }
-
- @Override
- public void start() {
- LOG.info("Start TaskScheduler");
-
- this.schedulingThread = new Thread() {
- public void run() {
-
- while(!stopEventHandling.get() && !Thread.currentThread().isInterrupted()) {
- try {
- synchronized (schedulingThread){
- schedulingThread.wait(100);
- }
- schedule();
- } catch (InterruptedException e) {
- break;
- } catch (Throwable e) {
- LOG.fatal(e.getMessage(), e);
- break;
- }
- }
- LOG.info("TaskScheduler schedulingThread stopped");
- }
- };
-
- this.schedulingThread.start();
- super.start();
- }
-
- private static final TaskAttemptId NULL_ATTEMPT_ID;
- public static final TajoWorkerProtocol.TaskRequestProto stopTaskRunnerReq;
- static {
- ExecutionBlockId nullStage = QueryIdFactory.newExecutionBlockId(QueryIdFactory.NULL_QUERY_ID, 0);
- NULL_ATTEMPT_ID = QueryIdFactory.newTaskAttemptId(QueryIdFactory.newTaskId(nullStage, 0), 0);
-
- TajoWorkerProtocol.TaskRequestProto.Builder builder =
- TajoWorkerProtocol.TaskRequestProto.newBuilder();
- builder.setId(NULL_ATTEMPT_ID.getProto());
- builder.setShouldDie(true);
- builder.setOutputTable("");
- builder.setPlan(PlanProto.LogicalNodeTree.newBuilder());
- builder.setClusteredOutput(false);
- stopTaskRunnerReq = builder.build();
- }
-
- @Override
- public void stop() {
- if(stopEventHandling.getAndSet(true)){
- return;
- }
-
- if (schedulingThread != null) {
- synchronized (schedulingThread) {
- schedulingThread.notifyAll();
- }
- }
-
- // Return all of request callbacks instantly.
- if(taskRequests != null){
- for (TaskRequestEvent req : taskRequests.taskRequestQueue) {
- req.getCallback().run(stopTaskRunnerReq);
- }
- }
-
- LOG.info("Task Scheduler stopped");
- super.stop();
- }
-
- private Fragment[] fragmentsForNonLeafTask;
- private Fragment[] broadcastFragmentsForNonLeafTask;
-
- LinkedList<TaskRequestEvent> taskRequestEvents = new LinkedList<TaskRequestEvent>();
- public void schedule() {
-
- if (taskRequests.size() > 0) {
- if (scheduledRequests.leafTaskNum() > 0) {
- LOG.debug("Try to schedule tasks with taskRequestEvents: " +
- taskRequests.size() + ", LeafTask Schedule Request: " +
- scheduledRequests.leafTaskNum());
- taskRequests.getTaskRequests(taskRequestEvents,
- scheduledRequests.leafTaskNum());
- LOG.debug("Get " + taskRequestEvents.size() + " taskRequestEvents ");
- if (taskRequestEvents.size() > 0) {
- scheduledRequests.assignToLeafTasks(taskRequestEvents);
- taskRequestEvents.clear();
- }
- }
- }
-
- if (taskRequests.size() > 0) {
- if (scheduledRequests.nonLeafTaskNum() > 0) {
- LOG.debug("Try to schedule tasks with taskRequestEvents: " +
- taskRequests.size() + ", NonLeafTask Schedule Request: " +
- scheduledRequests.nonLeafTaskNum());
- taskRequests.getTaskRequests(taskRequestEvents,
- scheduledRequests.nonLeafTaskNum());
- scheduledRequests.assignToNonLeafTasks(taskRequestEvents);
- taskRequestEvents.clear();
- }
- }
- }
-
- @Override
- public void handle(TaskSchedulerEvent event) {
- if (event.getType() == EventType.T_SCHEDULE) {
- if (event instanceof FragmentScheduleEvent) {
- FragmentScheduleEvent castEvent = (FragmentScheduleEvent) event;
- if (context.isLeafQuery()) {
- TaskAttemptScheduleContext taskContext = new TaskAttemptScheduleContext();
- Task task = Stage.newEmptyTask(context, taskContext, stage, nextTaskId++);
- task.addFragment(castEvent.getLeftFragment(), true);
- scheduledObjectNum++;
- if (castEvent.hasRightFragments()) {
- task.addFragments(castEvent.getRightFragments());
- }
- stage.getEventHandler().handle(new TaskEvent(task.getId(), TaskEventType.T_SCHEDULE));
- } else {
- fragmentsForNonLeafTask = new FileFragment[2];
- fragmentsForNonLeafTask[0] = castEvent.getLeftFragment();
- if (castEvent.hasRightFragments()) {
- FileFragment[] rightFragments = castEvent.getRightFragments().toArray(new FileFragment[]{});
- fragmentsForNonLeafTask[1] = rightFragments[0];
- if (rightFragments.length > 1) {
- broadcastFragmentsForNonLeafTask = new FileFragment[rightFragments.length - 1];
- System.arraycopy(rightFragments, 1, broadcastFragmentsForNonLeafTask, 0, broadcastFragmentsForNonLeafTask.length);
- } else {
- broadcastFragmentsForNonLeafTask = null;
- }
- }
- }
- } else if (event instanceof FetchScheduleEvent) {
- FetchScheduleEvent castEvent = (FetchScheduleEvent) event;
- Map<String, List<FetchImpl>> fetches = castEvent.getFetches();
- TaskAttemptScheduleContext taskScheduleContext = new TaskAttemptScheduleContext();
- Task task = Stage.newEmptyTask(context, taskScheduleContext, stage, nextTaskId++);
- scheduledObjectNum++;
- for (Entry<String, List<FetchImpl>> eachFetch : fetches.entrySet()) {
- task.addFetches(eachFetch.getKey(), eachFetch.getValue());
- task.addFragment(fragmentsForNonLeafTask[0], true);
- if (fragmentsForNonLeafTask[1] != null) {
- task.addFragment(fragmentsForNonLeafTask[1], true);
- }
- }
- if (broadcastFragmentsForNonLeafTask != null && broadcastFragmentsForNonLeafTask.length > 0) {
- task.addFragments(Arrays.asList(broadcastFragmentsForNonLeafTask));
- }
- stage.getEventHandler().handle(new TaskEvent(task.getId(), TaskEventType.T_SCHEDULE));
- } else if (event instanceof TaskAttemptToSchedulerEvent) {
- TaskAttemptToSchedulerEvent castEvent = (TaskAttemptToSchedulerEvent) event;
- if (context.isLeafQuery()) {
- scheduledRequests.addLeafTask(castEvent);
- } else {
- scheduledRequests.addNonLeafTask(castEvent);
- }
- }
- } else if (event.getType() == EventType.T_SCHEDULE_CANCEL) {
- // when a stage is killed, unassigned query unit attmpts are canceled from the scheduler.
- // This event is triggered by TaskAttempt.
- TaskAttemptToSchedulerEvent castedEvent = (TaskAttemptToSchedulerEvent) event;
- scheduledRequests.leafTasks.remove(castedEvent.getTaskAttempt().getId());
- LOG.info(castedEvent.getTaskAttempt().getId() + " is canceled from " + this.getClass().getSimpleName());
- ((TaskAttemptToSchedulerEvent) event).getTaskAttempt().handle(
- new TaskAttemptEvent(castedEvent.getTaskAttempt().getId(), TaskAttemptEventType.TA_SCHEDULE_CANCELED));
- }
- }
-
- @Override
- public void handleTaskRequestEvent(TaskRequestEvent event) {
-
- taskRequests.handle(event);
- int hosts = scheduledRequests.leafTaskHostMapping.size();
-
- // if available cluster resource are large then tasks, the scheduler thread are working immediately.
- if(remainingScheduledObjectNum() > 0 &&
- (remainingScheduledObjectNum() <= hosts || hosts <= taskRequests.size())){
- synchronized (schedulingThread){
- schedulingThread.notifyAll();
- }
- }
- }
-
- @Override
- public int remainingScheduledObjectNum() {
- return scheduledObjectNum;
- }
-
- private class TaskRequests implements EventHandler<TaskRequestEvent> {
- private final LinkedBlockingQueue<TaskRequestEvent> taskRequestQueue =
- new LinkedBlockingQueue<TaskRequestEvent>();
-
- @Override
- public void handle(TaskRequestEvent event) {
- if(LOG.isDebugEnabled()){
- LOG.debug("TaskRequest: " + event.getContainerId() + "," + event.getExecutionBlockId());
- }
-
- if(stopEventHandling.get()) {
- event.getCallback().run(stopTaskRunnerReq);
- return;
- }
- int qSize = taskRequestQueue.size();
- if (qSize != 0 && qSize % 1000 == 0) {
- LOG.info("Size of event-queue in DefaultTaskScheduler is " + qSize);
- }
- int remCapacity = taskRequestQueue.remainingCapacity();
- if (remCapacity < 1000) {
- LOG.warn("Very low remaining capacity in the event-queue "
- + "of DefaultTaskScheduler: " + remCapacity);
- }
-
- taskRequestQueue.add(event);
- }
-
- public void getTaskRequests(final Collection<TaskRequestEvent> taskRequests,
- int num) {
- taskRequestQueue.drainTo(taskRequests, num);
- }
-
- public int size() {
- return taskRequestQueue.size();
- }
- }
-
- /**
- * One worker can have multiple running task runners. <code>HostVolumeMapping</code>
- * describes various information for one worker, including :
- * <ul>
- * <li>host name</li>
- * <li>rack name</li>
- * <li>unassigned tasks for each disk volume</li>
- * <li>last assigned volume id - it can be used for assigning task in a round-robin manner</li>
- * <li>the number of running tasks for each volume</li>
- * </ul>, each task runner and the concurrency number of running tasks for volumes.
- *
- * Here, we identifier a task runner by {@link ContainerId}, and we use volume ids to identify
- * all disks in this node. Actually, each volume is only used to distinguish disks, and we don't
- * know a certain volume id indicates a certain disk. If you want to know volume id, please read the below section.
- *
- * <h3>Volume id</h3>
- * Volume id is an integer. Each volume id identifies each disk volume.
- *
- * This volume id can be obtained from org.apache.hadoop.fs.BlockStorageLocation#getVolumeIds()}. *
- * HDFS cannot give any volume id due to unknown reason and disabled config 'dfs.client.file-block-locations.enabled'.
- * In this case, the volume id will be -1 or other native integer.
- *
- * <h3>See Also</h3>
- * <ul>
- * <li>HDFS-3672 (https://issues.apache.org/jira/browse/HDFS-3672).</li>
- * </ul>
- */
- public class HostVolumeMapping {
- private final String host;
- private final String rack;
- /** A key is disk volume, and a value is a list of tasks to be scheduled. */
- private Map<Integer, LinkedHashSet<TaskAttempt>> unassignedTaskForEachVolume =
- Collections.synchronizedMap(new HashMap<Integer, LinkedHashSet<TaskAttempt>>());
- /** A value is last assigned volume id for each task runner */
- private HashMap<TajoContainerId, Integer> lastAssignedVolumeId = new HashMap<TajoContainerId,
- Integer>();
- /**
- * A key is disk volume id, and a value is the load of this volume.
- * This load is measured by counting how many number of tasks are running.
- *
- * These disk volumes are kept in an order of ascending order of the volume id.
- * In other words, the head volume ids are likely to -1, meaning no given volume id.
- */
- private SortedMap<Integer, Integer> diskVolumeLoads = new TreeMap<Integer, Integer>();
- /** The total number of remain tasks in this host */
- private AtomicInteger remainTasksNum = new AtomicInteger(0);
- public static final int REMOTE = -2;
-
-
- public HostVolumeMapping(String host, String rack){
- this.host = host;
- this.rack = rack;
- }
-
- public synchronized void addTaskAttempt(int volumeId, TaskAttempt attemptId){
- synchronized (unassignedTaskForEachVolume){
- LinkedHashSet<TaskAttempt> list = unassignedTaskForEachVolume.get(volumeId);
- if (list == null) {
- list = new LinkedHashSet<TaskAttempt>();
- unassignedTaskForEachVolume.put(volumeId, list);
- }
- list.add(attemptId);
- }
-
- remainTasksNum.incrementAndGet();
-
- if(!diskVolumeLoads.containsKey(volumeId)) diskVolumeLoads.put(volumeId, 0);
- }
-
- /**
- * Priorities
- * 1. a task list in a volume of host
- * 2. unknown block or Non-splittable task in host
- * 3. remote tasks. unassignedTaskForEachVolume is only contained local task. so it will be null
- */
- public synchronized TaskAttemptId getLocalTask(TajoContainerId containerId) {
- int volumeId;
- TaskAttemptId taskAttemptId = null;
-
- if (!lastAssignedVolumeId.containsKey(containerId)) {
- volumeId = getLowestVolumeId();
- increaseConcurrency(containerId, volumeId);
- } else {
- volumeId = lastAssignedVolumeId.get(containerId);
- }
-
- if (unassignedTaskForEachVolume.size() > 0) {
- int retry = unassignedTaskForEachVolume.size();
- do {
- //clean and get a remaining local task
- taskAttemptId = getAndRemove(volumeId);
- if(!unassignedTaskForEachVolume.containsKey(volumeId)) {
- decreaseConcurrency(containerId);
- if (volumeId > REMOTE) {
- diskVolumeLoads.remove(volumeId);
- }
- }
-
- if (taskAttemptId == null) {
- //reassign next volume
- volumeId = getLowestVolumeId();
- increaseConcurrency(containerId, volumeId);
- retry--;
- } else {
- break;
- }
- } while (retry > 0);
- } else {
- this.remainTasksNum.set(0);
- }
- return taskAttemptId;
- }
-
- public synchronized TaskAttemptId getTaskAttemptIdByRack(String rack) {
- TaskAttemptId taskAttemptId = null;
-
- if (unassignedTaskForEachVolume.size() > 0 && this.rack.equals(rack)) {
- int retry = unassignedTaskForEachVolume.size();
- do {
- //clean and get a remaining task
- int volumeId = getLowestVolumeId();
- taskAttemptId = getAndRemove(volumeId);
- if (taskAttemptId == null) {
- if (volumeId > REMOTE) {
- diskVolumeLoads.remove(volumeId);
- }
- retry--;
- } else {
- break;
- }
- } while (retry > 0);
- }
- return taskAttemptId;
- }
-
- private synchronized TaskAttemptId getAndRemove(int volumeId){
- TaskAttemptId taskAttemptId = null;
- if(!unassignedTaskForEachVolume.containsKey(volumeId)) return taskAttemptId;
-
- LinkedHashSet<TaskAttempt> list = unassignedTaskForEachVolume.get(volumeId);
- if(list != null && list.size() > 0){
- TaskAttempt taskAttempt;
- synchronized (unassignedTaskForEachVolume) {
- Iterator<TaskAttempt> iterator = list.iterator();
- taskAttempt = iterator.next();
- iterator.remove();
- }
-
- this.remainTasksNum.getAndDecrement();
- taskAttemptId = taskAttempt.getId();
- for (DataLocation location : taskAttempt.getTask().getDataLocations()) {
- if (!this.getHost().equals(location.getHost())) {
- HostVolumeMapping volumeMapping = scheduledRequests.leafTaskHostMapping.get(location.getHost());
- if (volumeMapping != null) {
- volumeMapping.removeTaskAttempt(location.getVolumeId(), taskAttempt);
- }
- }
- }
- }
-
- if(list == null || list.isEmpty()) {
- unassignedTaskForEachVolume.remove(volumeId);
- }
- return taskAttemptId;
- }
-
- private synchronized void removeTaskAttempt(int volumeId, TaskAttempt taskAttempt){
- if(!unassignedTaskForEachVolume.containsKey(volumeId)) return;
-
- LinkedHashSet<TaskAttempt> tasks = unassignedTaskForEachVolume.get(volumeId);
-
- if(tasks != null && tasks.size() > 0){
- tasks.remove(taskAttempt);
- remainTasksNum.getAndDecrement();
- } else {
- unassignedTaskForEachVolume.remove(volumeId);
- }
- }
-
- /**
- * Increase the count of running tasks and disk loads for a certain task runner.
- *
- * @param containerId The task runner identifier
- * @param volumeId Volume identifier
- * @return the volume load (i.e., how many running tasks use this volume)
- */
- private synchronized int increaseConcurrency(TajoContainerId containerId, int volumeId) {
-
- int concurrency = 1;
- if (diskVolumeLoads.containsKey(volumeId)) {
- concurrency = diskVolumeLoads.get(volumeId) + 1;
- }
-
- if (volumeId > -1) {
- LOG.info("Assigned host : " + host + ", Volume : " + volumeId + ", Concurrency : " + concurrency);
- } else if (volumeId == -1) {
- // this case is disabled namenode block meta or compressed text file or amazon s3
- LOG.info("Assigned host : " + host + ", Unknown Volume : " + volumeId + ", Concurrency : " + concurrency);
- } else if (volumeId == REMOTE) {
- // this case has processed all block on host and it will be assigned to remote
- LOG.info("Assigned host : " + host + ", Remaining local tasks : " + getRemainingLocalTaskSize()
- + ", Remote Concurrency : " + concurrency);
- }
- diskVolumeLoads.put(volumeId, concurrency);
- lastAssignedVolumeId.put(containerId, volumeId);
- return concurrency;
- }
-
- /**
- * Decrease the count of running tasks of a certain task runner
- */
- private synchronized void decreaseConcurrency(TajoContainerId containerId){
- Integer volumeId = lastAssignedVolumeId.get(containerId);
- if(volumeId != null && diskVolumeLoads.containsKey(volumeId)){
- Integer concurrency = diskVolumeLoads.get(volumeId);
- if(concurrency > 0){
- diskVolumeLoads.put(volumeId, concurrency - 1);
- } else {
- if (volumeId > REMOTE) {
- diskVolumeLoads.remove(volumeId);
- }
- }
- }
- lastAssignedVolumeId.remove(containerId);
- }
-
- /**
- * volume of a host : 0 ~ n
- * compressed task, amazon s3, unKnown volume : -1
- * remote task : -2
- */
- public int getLowestVolumeId(){
- Map.Entry<Integer, Integer> volumeEntry = null;
-
- for (Map.Entry<Integer, Integer> entry : diskVolumeLoads.entrySet()) {
- if(volumeEntry == null) volumeEntry = entry;
-
- if (volumeEntry.getValue() >= entry.getValue()) {
- volumeEntry = entry;
- }
- }
-
- if(volumeEntry != null){
- return volumeEntry.getKey();
- } else {
- return REMOTE;
- }
- }
-
- public boolean isAssigned(TajoContainerId containerId){
- return lastAssignedVolumeId.containsKey(containerId);
- }
-
- public boolean isRemote(TajoContainerId containerId){
- Integer volumeId = lastAssignedVolumeId.get(containerId);
- if(volumeId == null || volumeId > REMOTE){
- return false;
- } else {
- return true;
- }
- }
-
- public int getRemoteConcurrency(){
- return getVolumeConcurrency(REMOTE);
- }
-
- public int getVolumeConcurrency(int volumeId){
- Integer size = diskVolumeLoads.get(volumeId);
- if(size == null) return 0;
- else return size;
- }
-
- public int getRemainingLocalTaskSize(){
- return remainTasksNum.get();
- }
-
- public String getHost() {
-
- return host;
- }
-
- public String getRack() {
- return rack;
- }
- }
-
- private class ScheduledRequests {
- // two list leafTasks and nonLeafTasks keep all tasks to be scheduled. Even though some task is included in
- // leafTaskHostMapping or leafTasksRackMapping, some task T will not be sent to a task runner
- // if the task is not included in leafTasks and nonLeafTasks.
- private final Set<TaskAttemptId> leafTasks = Collections.synchronizedSet(new HashSet<TaskAttemptId>());
- private final Set<TaskAttemptId> nonLeafTasks = Collections.synchronizedSet(new HashSet<TaskAttemptId>());
- private Map<String, HostVolumeMapping> leafTaskHostMapping = Maps.newConcurrentMap();
- private final Map<String, HashSet<TaskAttemptId>> leafTasksRackMapping = Maps.newConcurrentMap();
-
- private synchronized void addLeafTask(TaskAttemptToSchedulerEvent event) {
- TaskAttempt taskAttempt = event.getTaskAttempt();
- List<DataLocation> locations = taskAttempt.getTask().getDataLocations();
-
- for (DataLocation location : locations) {
- String host = location.getHost();
-
- HostVolumeMapping hostVolumeMapping = leafTaskHostMapping.get(host);
- if (hostVolumeMapping == null) {
- String rack = RackResolver.resolve(host).getNetworkLocation();
- hostVolumeMapping = new HostVolumeMapping(host, rack);
- leafTaskHostMapping.put(host, hostVolumeMapping);
- }
- hostVolumeMapping.addTaskAttempt(location.getVolumeId(), taskAttempt);
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Added attempt req to host " + host);
- }
-
- HashSet<TaskAttemptId> list = leafTasksRackMapping.get(hostVolumeMapping.getRack());
- if (list == null) {
- list = new HashSet<TaskAttemptId>();
- leafTasksRackMapping.put(hostVolumeMapping.getRack(), list);
- }
-
- list.add(taskAttempt.getId());
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Added attempt req to rack " + hostVolumeMapping.getRack());
- }
- }
-
- leafTasks.add(taskAttempt.getId());
- }
-
- private void addNonLeafTask(TaskAttemptToSchedulerEvent event) {
- nonLeafTasks.add(event.getTaskAttempt().getId());
- }
-
- public int leafTaskNum() {
- return leafTasks.size();
- }
-
- public int nonLeafTaskNum() {
- return nonLeafTasks.size();
- }
-
- public Set<TaskAttemptId> assignedRequest = new HashSet<TaskAttemptId>();
-
- private TaskAttemptId allocateLocalTask(String host, TajoContainerId containerId){
- HostVolumeMapping hostVolumeMapping = leafTaskHostMapping.get(host);
-
- if (hostVolumeMapping != null) { //tajo host is located in hadoop datanode
- for (int i = 0; i < hostVolumeMapping.getRemainingLocalTaskSize(); i++) {
- TaskAttemptId attemptId = hostVolumeMapping.getLocalTask(containerId);
-
- if(attemptId == null) break;
- //find remaining local task
- if (leafTasks.contains(attemptId)) {
- leafTasks.remove(attemptId);
- //LOG.info(attemptId + " Assigned based on host match " + hostName);
- hostLocalAssigned++;
- totalAssigned++;
- return attemptId;
- }
- }
- }
- return null;
- }
-
- private TaskAttemptId allocateRackTask(String host) {
-
- List<HostVolumeMapping> remainingTasks = Lists.newArrayList(leafTaskHostMapping.values());
- String rack = RackResolver.resolve(host).getNetworkLocation();
- TaskAttemptId attemptId = null;
-
- if (remainingTasks.size() > 0) {
- synchronized (scheduledRequests) {
- //find largest remaining task of other host in rack
- Collections.sort(remainingTasks, new Comparator<HostVolumeMapping>() {
- @Override
- public int compare(HostVolumeMapping v1, HostVolumeMapping v2) {
- // descending remaining tasks
- if (v2.remainTasksNum.get() > v1.remainTasksNum.get()) {
- return 1;
- } else if (v2.remainTasksNum.get() == v1.remainTasksNum.get()) {
- return 0;
- } else {
- return -1;
- }
- }
- });
- }
-
- for (HostVolumeMapping tasks : remainingTasks) {
- for (int i = 0; i < tasks.getRemainingLocalTaskSize(); i++) {
- TaskAttemptId tId = tasks.getTaskAttemptIdByRack(rack);
-
- if (tId == null) break;
-
- if (leafTasks.contains(tId)) {
- leafTasks.remove(tId);
- attemptId = tId;
- break;
- }
- }
- if(attemptId != null) break;
- }
- }
-
- //find task in rack
- if (attemptId == null) {
- HashSet<TaskAttemptId> list = leafTasksRackMapping.get(rack);
- if (list != null) {
- synchronized (list) {
- Iterator<TaskAttemptId> iterator = list.iterator();
- while (iterator.hasNext()) {
- TaskAttemptId tId = iterator.next();
- iterator.remove();
- if (leafTasks.contains(tId)) {
- leafTasks.remove(tId);
- attemptId = tId;
- break;
- }
- }
- }
- }
- }
-
- if (attemptId != null) {
- rackLocalAssigned++;
- totalAssigned++;
-
- LOG.info(String.format("Assigned Local/Rack/Total: (%d/%d/%d), Locality: %.2f%%, Rack host: %s",
- hostLocalAssigned, rackLocalAssigned, totalAssigned,
- ((double) hostLocalAssigned / (double) totalAssigned) * 100, host));
-
- }
- return attemptId;
- }
-
- public void assignToLeafTasks(LinkedList<TaskRequestEvent> taskRequests) {
- Collections.shuffle(taskRequests);
- LinkedList<TaskRequestEvent> remoteTaskRequests = new LinkedList<TaskRequestEvent>();
-
- TaskRequestEvent taskRequest;
- while (leafTasks.size() > 0 && (!taskRequests.isEmpty() || !remoteTaskRequests.isEmpty())) {
- taskRequest = taskRequests.pollFirst();
- if(taskRequest == null) { // if there are only remote task requests
- taskRequest = remoteTaskRequests.pollFirst();
- }
-
- // checking if this container is still alive.
- // If not, ignore the task request and stop the task runner
- ContainerProxy container = context.getMasterContext().getResourceAllocator()
- .getContainer(taskRequest.getContainerId());
- if(container == null) {
- taskRequest.getCallback().run(stopTaskRunnerReq);
- continue;
- }
-
- // getting the hostname of requested node
- WorkerConnectionInfo connectionInfo =
- context.getMasterContext().getResourceAllocator().getWorkerConnectionInfo(taskRequest.getWorkerId());
- String host = connectionInfo.getHost();
-
- // if there are no worker matched to the hostname a task request
- if(!leafTaskHostMapping.containsKey(host)){
- String normalizedHost = NetUtils.normalizeHost(host);
-
- if(!leafTaskHostMapping.containsKey(normalizedHost) && !taskRequests.isEmpty()){
- // this case means one of either cases:
- // * there are no blocks which reside in this node.
- // * all blocks which reside in this node are consumed, and this task runner requests a remote task.
- // In this case, we transfer the task request to the remote task request list, and skip the followings.
- remoteTaskRequests.add(taskRequest);
- continue;
- }
- }
-
- TajoContainerId containerId = taskRequest.getContainerId();
- LOG.debug("assignToLeafTasks: " + taskRequest.getExecutionBlockId() + "," +
- "containerId=" + containerId);
-
- //////////////////////////////////////////////////////////////////////
- // disk or host-local allocation
- //////////////////////////////////////////////////////////////////////
- TaskAttemptId attemptId = allocateLocalTask(host, containerId);
-
- if (attemptId == null) { // if a local task cannot be found
- HostVolumeMapping hostVolumeMapping = leafTaskHostMapping.get(host);
-
- if(hostVolumeMapping != null) {
- if(!hostVolumeMapping.isRemote(containerId)){
- // assign to remote volume
- hostVolumeMapping.decreaseConcurrency(containerId);
- hostVolumeMapping.increaseConcurrency(containerId, HostVolumeMapping.REMOTE);
- }
- // this part is remote concurrency management of a tail tasks
- int tailLimit = Math.max(remainingScheduledObjectNum() / (leafTaskHostMapping.size() * 2), 1);
-
- if(hostVolumeMapping.getRemoteConcurrency() > tailLimit){
- //release container
- hostVolumeMapping.decreaseConcurrency(containerId);
- taskRequest.getCallback().run(stopTaskRunnerReq);
- continue;
- }
- }
-
- //////////////////////////////////////////////////////////////////////
- // rack-local allocation
- //////////////////////////////////////////////////////////////////////
- attemptId = allocateRackTask(host);
-
- //////////////////////////////////////////////////////////////////////
- // random node allocation
- //////////////////////////////////////////////////////////////////////
- if (attemptId == null && leafTaskNum() > 0) {
- synchronized (leafTasks){
- attemptId = leafTasks.iterator().next();
- leafTasks.remove(attemptId);
- rackLocalAssigned++;
- totalAssigned++;
- LOG.info(String.format("Assigned Local/Remote/Total: (%d/%d/%d), Locality: %.2f%%,",
- hostLocalAssigned, rackLocalAssigned, totalAssigned,
- ((double) hostLocalAssigned / (double) totalAssigned) * 100));
- }
- }
- }
-
- if (attemptId != null) {
- Task task = stage.getTask(attemptId.getTaskId());
- TaskRequest taskAssign = new TaskRequestImpl(
- attemptId,
- new ArrayList<FragmentProto>(task.getAllFragments()),
- "",
- false,
- LogicalNodeSerializer.serialize(task.getLogicalPlan()),
- context.getMasterContext().getQueryContext(),
- stage.getDataChannel(), stage.getBlock().getEnforcer());
- if (checkIfInterQuery(stage.getMasterPlan(), stage.getBlock())) {
- taskAssign.setInterQuery();
- }
-
- context.getMasterContext().getEventHandler().handle(new TaskAttemptAssignedEvent(attemptId,
- taskRequest.getContainerId(), connectionInfo));
- assignedRequest.add(attemptId);
-
- scheduledObjectNum--;
- taskRequest.getCallback().run(taskAssign.getProto());
- } else {
- throw new RuntimeException("Illegal State!!!!!!!!!!!!!!!!!!!!!");
- }
- }
- }
-
- private boolean checkIfInterQuery(MasterPlan masterPlan, ExecutionBlock block) {
- if (masterPlan.isRoot(block)) {
- return false;
- }
-
- ExecutionBlock parent = masterPlan.getParent(block);
- if (masterPlan.isRoot(parent) && parent.hasUnion()) {
- return false;
- }
-
- return true;
- }
-
- public void assignToNonLeafTasks(LinkedList<TaskRequestEvent> taskRequests) {
- Collections.shuffle(taskRequests);
-
- TaskRequestEvent taskRequest;
- while (!taskRequests.isEmpty()) {
- taskRequest = taskRequests.pollFirst();
- LOG.debug("assignToNonLeafTasks: " + taskRequest.getExecutionBlockId());
-
- TaskAttemptId attemptId;
- // random allocation
- if (nonLeafTasks.size() > 0) {
- synchronized (nonLeafTasks){
- attemptId = nonLeafTasks.iterator().next();
- nonLeafTasks.remove(attemptId);
- }
- LOG.debug("Assigned based on * match");
-
- Task task;
- task = stage.getTask(attemptId.getTaskId());
- TaskRequest taskAssign = new TaskRequestImpl(
- attemptId,
- Lists.newArrayList(task.getAllFragments()),
- "",
- false,
- LogicalNodeSerializer.serialize(task.getLogicalPlan()),
- context.getMasterContext().getQueryContext(),
- stage.getDataChannel(),
- stage.getBlock().getEnforcer());
- if (checkIfInterQuery(stage.getMasterPlan(), stage.getBlock())) {
- taskAssign.setInterQuery();
- }
- for(Map.Entry<String, Set<FetchImpl>> entry: task.getFetchMap().entrySet()) {
- Collection<FetchImpl> fetches = entry.getValue();
- if (fetches != null) {
- for (FetchImpl fetch : fetches) {
- taskAssign.addFetch(entry.getKey(), fetch);
- }
- }
- }
-
- WorkerConnectionInfo connectionInfo = context.getMasterContext().getResourceAllocator().
- getWorkerConnectionInfo(taskRequest.getWorkerId());
- context.getMasterContext().getEventHandler().handle(new TaskAttemptAssignedEvent(attemptId,
- taskRequest.getContainerId(), connectionInfo));
- taskRequest.getCallback().run(taskAssign.getProto());
- totalAssigned++;
- scheduledObjectNum--;
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/FetchScheduleEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/FetchScheduleEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/FetchScheduleEvent.java
deleted file mode 100644
index 21e376c..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/FetchScheduleEvent.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master;
-
-import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.master.event.TaskSchedulerEvent;
-import org.apache.tajo.worker.FetchImpl;
-
-import java.util.List;
-import java.util.Map;
-
-public class FetchScheduleEvent extends TaskSchedulerEvent {
- private final Map<String, List<FetchImpl>> fetches;
-
- public FetchScheduleEvent(final EventType eventType, final ExecutionBlockId blockId,
- final Map<String, List<FetchImpl>> fetches) {
- super(eventType, blockId);
- this.fetches = fetches;
- }
-
- public Map<String, List<FetchImpl>> getFetches() {
- return fetches;
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/FragmentPair.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/FragmentPair.java b/tajo-core/src/main/java/org/apache/tajo/master/FragmentPair.java
deleted file mode 100644
index 827386b..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/FragmentPair.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master;
-
-import com.google.common.base.Objects;
-import org.apache.tajo.storage.fragment.Fragment;
-
-/**
- * FragmentPair consists of two fragments, a left fragment and a right fragment.
- * According to queries, it can have the different values.
- * For join queries, it is assumed to have both fragments.
- * Also, the left fragment is assumed to be a fragment of the larger table.
- * For other queries, it is assumed to have only a left fragment.
- */
-public class FragmentPair {
- private Fragment leftFragment;
- private Fragment rightFragment;
-
- public FragmentPair(Fragment left) {
- this.leftFragment = left;
- }
-
- public FragmentPair(Fragment left, Fragment right) {
- this.leftFragment = left;
- this.rightFragment = right;
- }
-
- public Fragment getLeftFragment() {
- return leftFragment;
- }
-
- public Fragment getRightFragment() {
- return rightFragment;
- }
-
- @Override
- public boolean equals(Object o) {
- if (o instanceof FragmentPair) {
- FragmentPair other = (FragmentPair) o;
- boolean eq = this.leftFragment.equals(other.leftFragment);
- if (this.rightFragment != null && other.rightFragment != null) {
- eq &= this.rightFragment.equals(other.rightFragment);
- } else if (this.rightFragment == null && other.rightFragment == null) {
- eq &= true;
- } else {
- return false;
- }
- return eq;
- }
- return false;
- }
-
- @Override
- public int hashCode() {
- return Objects.hashCode(leftFragment, rightFragment);
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
index 51964f0..9d853a5 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
@@ -39,7 +39,7 @@ import org.apache.tajo.master.TajoMaster.MasterContext;
import org.apache.tajo.master.exec.DDLExecutor;
import org.apache.tajo.master.exec.QueryExecutor;
import org.apache.tajo.master.exec.prehook.DistributedQueryHookManager;
-import org.apache.tajo.master.session.Session;
+import org.apache.tajo.session.Session;
import org.apache.tajo.plan.*;
import org.apache.tajo.plan.logical.InsertNode;
import org.apache.tajo.plan.logical.LogicalRootNode;
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultFileScanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultFileScanner.java b/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultFileScanner.java
deleted file mode 100644
index d6ea459..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultFileScanner.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master;
-
-import com.google.protobuf.ByteString;
-
-import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.QueryId;
-import org.apache.tajo.TaskAttemptId;
-import org.apache.tajo.TaskId;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableDesc;
-import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.plan.logical.ScanNode;
-import org.apache.tajo.engine.planner.physical.SeqScanExec;
-import org.apache.tajo.engine.query.QueryContext;
-import org.apache.tajo.storage.RowStoreUtil;
-import org.apache.tajo.storage.RowStoreUtil.RowStoreEncoder;
-import org.apache.tajo.storage.StorageManager;
-import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.fragment.Fragment;
-import org.apache.tajo.storage.fragment.FragmentConvertor;
-import org.apache.tajo.worker.TaskAttemptContext;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-public class NonForwardQueryResultFileScanner implements NonForwardQueryResultScanner {
- private static final int MAX_FRAGMENT_NUM_PER_SCAN = 100;
-
- private QueryId queryId;
- private String sessionId;
- private SeqScanExec scanExec;
- private TableDesc tableDesc;
- private RowStoreEncoder rowEncoder;
- private int maxRow;
- private int currentNumRows;
- private TaskAttemptContext taskContext;
- private TajoConf tajoConf;
- private ScanNode scanNode;
-
- private int currentFragmentIndex = 0;
-
- public NonForwardQueryResultFileScanner(TajoConf tajoConf, String sessionId, QueryId queryId, ScanNode scanNode,
- TableDesc tableDesc, int maxRow) throws IOException {
- this.tajoConf = tajoConf;
- this.sessionId = sessionId;
- this.queryId = queryId;
- this.scanNode = scanNode;
- this.tableDesc = tableDesc;
- this.maxRow = maxRow;
- this.rowEncoder = RowStoreUtil.createEncoder(tableDesc.getLogicalSchema());
- }
-
- public void init() throws IOException {
- initSeqScanExec();
- }
-
- private void initSeqScanExec() throws IOException {
- List<Fragment> fragments = StorageManager.getStorageManager(tajoConf, tableDesc.getMeta().getStoreType())
- .getNonForwardSplit(tableDesc, currentFragmentIndex, MAX_FRAGMENT_NUM_PER_SCAN);
-
- if (fragments != null && !fragments.isEmpty()) {
- FragmentProto[] fragmentProtos = FragmentConvertor.toFragmentProtoArray(fragments.toArray(new Fragment[] {}));
- this.taskContext = new TaskAttemptContext(
- new QueryContext(tajoConf), null,
- new TaskAttemptId(new TaskId(new ExecutionBlockId(queryId, 1), 0), 0),
- fragmentProtos, null);
- try {
- // scanNode must be clone cause SeqScanExec change target in the case of
- // a partitioned table.
- scanExec = new SeqScanExec(taskContext, (ScanNode) scanNode.clone(), fragmentProtos);
- } catch (CloneNotSupportedException e) {
- throw new IOException(e.getMessage(), e);
- }
- scanExec.init();
- currentFragmentIndex += fragments.size();
- }
- }
-
- public QueryId getQueryId() {
- return queryId;
- }
-
- public String getSessionId() {
- return sessionId;
- }
-
- public void setScanExec(SeqScanExec scanExec) {
- this.scanExec = scanExec;
- }
-
- public TableDesc getTableDesc() {
- return tableDesc;
- }
-
- public void close() throws Exception {
- if (scanExec != null) {
- scanExec.close();
- scanExec = null;
- }
- }
-
- public List<ByteString> getNextRows(int fetchRowNum) throws IOException {
- List<ByteString> rows = new ArrayList<ByteString>();
- if (scanExec == null) {
- return rows;
- }
- int rowCount = 0;
- while (true) {
- Tuple tuple = scanExec.next();
- if (tuple == null) {
- scanExec.close();
- scanExec = null;
- initSeqScanExec();
- if (scanExec != null) {
- tuple = scanExec.next();
- }
- if (tuple == null) {
- if (scanExec != null) {
- scanExec.close();
- scanExec = null;
- }
- break;
- }
- }
- rows.add(ByteString.copyFrom((rowEncoder.toBytes(tuple))));
- rowCount++;
- currentNumRows++;
- if (rowCount >= fetchRowNum) {
- break;
- }
- if (currentNumRows >= maxRow) {
- scanExec.close();
- scanExec = null;
- break;
- }
- }
- return rows;
- }
-
- @Override
- public Schema getLogicalSchema() {
- return tableDesc.getLogicalSchema();
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultScanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultScanner.java b/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultScanner.java
deleted file mode 100644
index 7e7d705..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultScanner.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.tajo.QueryId;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableDesc;
-
-import com.google.protobuf.ByteString;
-
-public interface NonForwardQueryResultScanner {
-
- public void close() throws Exception;
-
- public Schema getLogicalSchema();
-
- public List<ByteString> getNextRows(int fetchRowNum) throws IOException;
-
- public QueryId getQueryId();
-
- public String getSessionId();
-
- public TableDesc getTableDesc();
-
- public void init() throws IOException;
-
-}