You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by jh...@apache.org on 2015/12/15 07:48:05 UTC
tajo git commit: TAJO-2021: Some refactoring and changing anonymous
class to lambda expression.
Repository: tajo
Updated Branches:
refs/heads/master bfc70203b -> afa9b432f
TAJO-2021: Some refactoring and changing anonymous class to lambda expression.
Closes #911
Signed-off-by: Jinho Kim <jh...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/afa9b432
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/afa9b432
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/afa9b432
Branch: refs/heads/master
Commit: afa9b432fd357a2e958ece7917a7704aadb9b3ab
Parents: bfc7020
Author: Jongyoung Park <em...@gmail.com>
Authored: Tue Dec 15 15:46:39 2015 +0900
Committer: Jinho Kim <jh...@apache.org>
Committed: Tue Dec 15 15:46:39 2015 +0900
----------------------------------------------------------------------
CHANGES | 3 +
.../java/org/apache/tajo/worker/TajoWorker.java | 78 ++++----------------
2 files changed, 17 insertions(+), 64 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/afa9b432/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index c297940..74a86fa 100644
--- a/CHANGES
+++ b/CHANGES
@@ -121,6 +121,9 @@ Release 0.12.0 - unreleased
TASKS
+ TAJO-2021: Some refactoring and changing anonymous class to lambda expression.
+ (Contributed by Jongyoung Park. Committed by jinho)
+
TAJO-2019: Replace manual array copy with System.arraycopy().
(Contributed by Dongkyu Hwangbo, committed by jihoon)
http://git-wip-us.apache.org/repos/asf/tajo/blob/afa9b432/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
index c7cac4f..8315c1c 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
@@ -18,7 +18,6 @@
package org.apache.tajo.worker;
-import com.codahale.metrics.Gauge;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -33,10 +32,8 @@ import org.apache.hadoop.yarn.util.RackResolver;
import org.apache.tajo.TajoConstants;
import org.apache.tajo.catalog.CatalogClient;
import org.apache.tajo.catalog.CatalogService;
-import org.apache.tajo.catalog.FunctionDesc;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.engine.function.FunctionLoader;
-import org.apache.tajo.function.FunctionSignature;
import org.apache.tajo.master.cluster.WorkerConnectionInfo;
import org.apache.tajo.metrics.Node;
import org.apache.tajo.plan.function.python.PythonScriptEngine;
@@ -81,55 +78,26 @@ public class TajoWorker extends CompositeService {
private static final Log LOG = LogFactory.getLog(TajoWorker.class);
private TajoConf systemConf;
-
private StaticHttpServer webServer;
-
- private TajoWorkerClientService tajoWorkerClientService;
-
private QueryMasterManagerService queryMasterManagerService;
-
- private TajoWorkerManagerService tajoWorkerManagerService;
-
- private TajoMasterInfo tajoMasterInfo;
-
private CatalogClient catalogClient;
-
private WorkerContext workerContext;
-
private TaskManager taskManager;
-
private TaskExecutor taskExecutor;
-
private TajoPullServerService pullService;
-
private ServiceTracker serviceTracker;
-
private NodeResourceManager nodeResourceManager;
-
- private NodeStatusUpdater nodeStatusUpdater;
-
private AtomicBoolean stopped = new AtomicBoolean(false);
-
private WorkerConnectionInfo connectionInfo;
-
private ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
-
private String[] cmdArgs;
-
private DeletionService deletionService;
-
private TajoSystemMetrics workerSystemMetrics;
-
private HashShuffleAppenderManager hashShuffleAppenderManager;
-
- private AsyncDispatcher dispatcher;
-
private LocalDirAllocator lDirAllocator;
-
private JvmPauseMonitor pauseMonitor;
private HistoryWriter taskHistoryWriter;
-
private HistoryReader historyReader;
public TajoWorker() throws Exception {
@@ -143,9 +111,12 @@ public class TajoWorker extends CompositeService {
start();
}
-
@Override
public void serviceInit(Configuration conf) throws Exception {
+ AsyncDispatcher dispatcher;
+ TajoWorkerClientService tajoWorkerClientService;
+ TajoWorkerManagerService tajoWorkerManagerService;
+
ShutdownHookManager.get().addShutdownHook(new ShutdownHook(), SHUTDOWN_HOOK_PRIORITY);
this.systemConf = TUtil.checkTypeAndGet(conf, TajoConf.class);
@@ -156,13 +127,11 @@ public class TajoWorker extends CompositeService {
this.workerContext = new TajoWorkerContext();
this.lDirAllocator = new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname);
-
int clientPort = systemConf.getSocketAddrVar(ConfVars.WORKER_CLIENT_RPC_ADDRESS).getPort();
int peerRpcPort = systemConf.getSocketAddrVar(ConfVars.WORKER_PEER_RPC_ADDRESS).getPort();
int qmManagerPort = systemConf.getSocketAddrVar(ConfVars.WORKER_QM_RPC_ADDRESS).getPort();
-
- this.dispatcher = new AsyncDispatcher();
+ dispatcher = new AsyncDispatcher();
addIfService(dispatcher);
tajoWorkerManagerService = new TajoWorkerManagerService(workerContext, peerRpcPort);
@@ -186,8 +155,7 @@ public class TajoWorker extends CompositeService {
this.nodeResourceManager = new NodeResourceManager(rmDispatcher, workerContext);
addService(nodeResourceManager);
- this.nodeStatusUpdater = new NodeStatusUpdater(workerContext);
- addService(nodeStatusUpdater);
+ addService(new NodeStatusUpdater(workerContext));
int httpPort = 0;
if(!TajoPullServerService.isStandalone()) {
@@ -240,29 +208,15 @@ public class TajoWorker extends CompositeService {
private void initWorkerMetrics() {
workerSystemMetrics = new TajoSystemMetrics(systemConf, Node.class, workerContext.getWorkerName());
+
workerSystemMetrics.start();
- workerSystemMetrics.register(Node.QueryMaster.RUNNING_QM, new Gauge<Integer>() {
- @Override
- public Integer getValue() {
- if(queryMasterManagerService != null) {
- return queryMasterManagerService.getQueryMaster().getQueryMasterTasks().size();
- } else {
- return 0;
- }
- }
- });
-
- workerSystemMetrics.register(Node.Tasks.RUNNING_TASKS, new Gauge<Integer>() {
- @Override
- public Integer getValue() {
- if(taskExecutor != null) {
- return taskExecutor.getRunningTasks();
- } else {
- return 0;
- }
- }
- });
+ workerSystemMetrics.register(Node.QueryMaster.RUNNING_QM,
+ () -> queryMasterManagerService != null ?
+ queryMasterManagerService.getQueryMaster().getQueryMasterTasks().size() : 0);
+
+ workerSystemMetrics.register(Node.Tasks.RUNNING_TASKS,
+ () -> taskExecutor != null ? taskExecutor.getRunningTasks() : 0);
}
private int initWebServer() {
@@ -310,7 +264,7 @@ public class TajoWorker extends CompositeService {
public void serviceStart() throws Exception {
startJvmPauseMonitor();
- tajoMasterInfo = new TajoMasterInfo();
+ TajoMasterInfo tajoMasterInfo = new TajoMasterInfo();
if (systemConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
tajoMasterInfo.setTajoMasterAddress(serviceTracker.getUmbilicalAddress());
@@ -453,10 +407,6 @@ public class TajoWorker extends CompositeService {
return catalogClient;
}
- public TajoPullServerService getPullService() {
- return pullService;
- }
-
public WorkerConnectionInfo getConnectionInfo() {
return connectionInfo;
}