You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by jh...@apache.org on 2015/12/15 07:48:05 UTC

tajo git commit: TAJO-2021: Some refactoring and changing anonymous class to lambda expression.

Repository: tajo
Updated Branches:
  refs/heads/master bfc70203b -> afa9b432f


TAJO-2021: Some refactoring and changing anonymous class to lambda expression.

Closes #911

Signed-off-by: Jinho Kim <jh...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/afa9b432
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/afa9b432
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/afa9b432

Branch: refs/heads/master
Commit: afa9b432fd357a2e958ece7917a7704aadb9b3ab
Parents: bfc7020
Author: Jongyoung Park <em...@gmail.com>
Authored: Tue Dec 15 15:46:39 2015 +0900
Committer: Jinho Kim <jh...@apache.org>
Committed: Tue Dec 15 15:46:39 2015 +0900

----------------------------------------------------------------------
 CHANGES                                         |  3 +
 .../java/org/apache/tajo/worker/TajoWorker.java | 78 ++++----------------
 2 files changed, 17 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/afa9b432/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index c297940..74a86fa 100644
--- a/CHANGES
+++ b/CHANGES
@@ -121,6 +121,9 @@ Release 0.12.0 - unreleased
 
   TASKS
 
+    TAJO-2021: Some refactoring and changing anonymous class to lambda expression.
+    (Contributed by Jongyoung Park. Committed by jinho)
+
     TAJO-2019: Replace manual array copy with System.arraycopy(). 
     (Contributed by Dongkyu Hwangbo, committed by jihoon)
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/afa9b432/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
index c7cac4f..8315c1c 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
@@ -18,7 +18,6 @@
 
 package org.apache.tajo.worker;
 
-import com.codahale.metrics.Gauge;
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -33,10 +32,8 @@ import org.apache.hadoop.yarn.util.RackResolver;
 import org.apache.tajo.TajoConstants;
 import org.apache.tajo.catalog.CatalogClient;
 import org.apache.tajo.catalog.CatalogService;
-import org.apache.tajo.catalog.FunctionDesc;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.function.FunctionLoader;
-import org.apache.tajo.function.FunctionSignature;
 import org.apache.tajo.master.cluster.WorkerConnectionInfo;
 import org.apache.tajo.metrics.Node;
 import org.apache.tajo.plan.function.python.PythonScriptEngine;
@@ -81,55 +78,26 @@ public class TajoWorker extends CompositeService {
   private static final Log LOG = LogFactory.getLog(TajoWorker.class);
 
   private TajoConf systemConf;
-
   private StaticHttpServer webServer;
-
-  private TajoWorkerClientService tajoWorkerClientService;
-
   private QueryMasterManagerService queryMasterManagerService;
-
-  private TajoWorkerManagerService tajoWorkerManagerService;
-
-  private TajoMasterInfo tajoMasterInfo;
-
   private CatalogClient catalogClient;
-
   private WorkerContext workerContext;
-
   private TaskManager taskManager;
-
   private TaskExecutor taskExecutor;
-
   private TajoPullServerService pullService;
-
   private ServiceTracker serviceTracker;
-
   private NodeResourceManager nodeResourceManager;
-
-  private NodeStatusUpdater nodeStatusUpdater;
-
   private AtomicBoolean stopped = new AtomicBoolean(false);
-
   private WorkerConnectionInfo connectionInfo;
-
   private ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
-
   private String[] cmdArgs;
-
   private DeletionService deletionService;
-
   private TajoSystemMetrics workerSystemMetrics;
-
   private HashShuffleAppenderManager hashShuffleAppenderManager;
-
-  private AsyncDispatcher dispatcher;
-
   private LocalDirAllocator lDirAllocator;
-
   private JvmPauseMonitor pauseMonitor;
 
   private HistoryWriter taskHistoryWriter;
-
   private HistoryReader historyReader;
 
   public TajoWorker() throws Exception {
@@ -143,9 +111,12 @@ public class TajoWorker extends CompositeService {
     start();
   }
 
-
   @Override
   public void serviceInit(Configuration conf) throws Exception {
+    AsyncDispatcher dispatcher;
+    TajoWorkerClientService tajoWorkerClientService;
+    TajoWorkerManagerService tajoWorkerManagerService;
+
     ShutdownHookManager.get().addShutdownHook(new ShutdownHook(), SHUTDOWN_HOOK_PRIORITY);
 
     this.systemConf = TUtil.checkTypeAndGet(conf, TajoConf.class);
@@ -156,13 +127,11 @@ public class TajoWorker extends CompositeService {
     this.workerContext = new TajoWorkerContext();
     this.lDirAllocator = new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname);
 
-
     int clientPort = systemConf.getSocketAddrVar(ConfVars.WORKER_CLIENT_RPC_ADDRESS).getPort();
     int peerRpcPort = systemConf.getSocketAddrVar(ConfVars.WORKER_PEER_RPC_ADDRESS).getPort();
     int qmManagerPort = systemConf.getSocketAddrVar(ConfVars.WORKER_QM_RPC_ADDRESS).getPort();
 
-
-    this.dispatcher = new AsyncDispatcher();
+    dispatcher = new AsyncDispatcher();
     addIfService(dispatcher);
 
     tajoWorkerManagerService = new TajoWorkerManagerService(workerContext, peerRpcPort);
@@ -186,8 +155,7 @@ public class TajoWorker extends CompositeService {
     this.nodeResourceManager = new NodeResourceManager(rmDispatcher, workerContext);
     addService(nodeResourceManager);
 
-    this.nodeStatusUpdater = new NodeStatusUpdater(workerContext);
-    addService(nodeStatusUpdater);
+    addService(new NodeStatusUpdater(workerContext));
 
     int httpPort = 0;
     if(!TajoPullServerService.isStandalone()) {
@@ -240,29 +208,15 @@ public class TajoWorker extends CompositeService {
 
   private void initWorkerMetrics() {
     workerSystemMetrics = new TajoSystemMetrics(systemConf, Node.class, workerContext.getWorkerName());
+    
     workerSystemMetrics.start();
 
-    workerSystemMetrics.register(Node.QueryMaster.RUNNING_QM, new Gauge<Integer>() {
-      @Override
-      public Integer getValue() {
-        if(queryMasterManagerService != null) {
-          return queryMasterManagerService.getQueryMaster().getQueryMasterTasks().size();
-        } else {
-          return 0;
-        }
-      }
-    });
-
-    workerSystemMetrics.register(Node.Tasks.RUNNING_TASKS, new Gauge<Integer>() {
-      @Override
-      public Integer getValue() {
-        if(taskExecutor != null) {
-          return taskExecutor.getRunningTasks();
-        } else {
-          return 0;
-        }
-      }
-    });
+    workerSystemMetrics.register(Node.QueryMaster.RUNNING_QM,
+        () -> queryMasterManagerService != null ?
+            queryMasterManagerService.getQueryMaster().getQueryMasterTasks().size() : 0);
+
+    workerSystemMetrics.register(Node.Tasks.RUNNING_TASKS,
+        () -> taskExecutor != null ? taskExecutor.getRunningTasks() : 0);
   }
 
   private int initWebServer() {
@@ -310,7 +264,7 @@ public class TajoWorker extends CompositeService {
   public void serviceStart() throws Exception {
     startJvmPauseMonitor();
 
-    tajoMasterInfo = new TajoMasterInfo();
+    TajoMasterInfo tajoMasterInfo = new TajoMasterInfo();
 
     if (systemConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
       tajoMasterInfo.setTajoMasterAddress(serviceTracker.getUmbilicalAddress());
@@ -453,10 +407,6 @@ public class TajoWorker extends CompositeService {
       return catalogClient;
     }
 
-    public TajoPullServerService getPullService() {
-      return pullService;
-    }
-
     public WorkerConnectionInfo getConnectionInfo() {
       return connectionInfo;
     }