You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@heron.apache.org by GitBox <gi...@apache.org> on 2018/05/17 20:53:41 UTC

[GitHub] nlu90 closed pull request #2901: Refactor CheckpointManager to move initialization code out of constru…

nlu90 closed pull request #2901: Refactor CheckpointManager to move initialization code out of constru…
URL: https://github.com/apache/incubator-heron/pull/2901
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/heron/ckptmgr/src/java/org/apache/heron/ckptmgr/CheckpointManager.java b/heron/ckptmgr/src/java/org/apache/heron/ckptmgr/CheckpointManager.java
index fa9c435f9c..5c590ffca5 100644
--- a/heron/ckptmgr/src/java/org/apache/heron/ckptmgr/CheckpointManager.java
+++ b/heron/ckptmgr/src/java/org/apache/heron/ckptmgr/CheckpointManager.java
@@ -50,8 +50,8 @@
   private static final String CHECKPOINT_MANAGER_HOST = "127.0.0.1";
 
   // The looper drives CheckpointManagerServer
-  private final NIOLooper checkpointManagerServerLoop;
-  private final CheckpointManagerServer checkpointManagerServer;
+  private NIOLooper checkpointManagerServerLoop;
+  private CheckpointManagerServer checkpointManagerServer;
 
   // Print usage options
   private static void usage(Options options) {
@@ -133,14 +133,21 @@ private static Options constructHelpOptions() {
     return options;
   }
 
-  public CheckpointManager(
-      String topologyName, String topologyId, String checkpointMgrId,
-      String serverHost, int serverPort,
-      SystemConfig systemConfig, CheckpointManagerConfig checkpointManagerConfig)
-      throws IOException, CheckpointManagerException {
+  public CheckpointManager() {
+  }
 
-    this.checkpointManagerServerLoop = new NIOLooper();
+  public void init(
+      String topologyName,
+      String topologyId,
+      String checkpointMgrId,
+      String serverHost,
+      int serverPort,
+      SystemConfig systemConfig,
+      CheckpointManagerConfig checkpointManagerConfig)
+      throws IOException, CheckpointManagerException {
 
+    LOG.info("Initializing CheckpointManager");
+    checkpointManagerServerLoop = new NIOLooper();
     HeronSocketOptions serverSocketOptions =
         new HeronSocketOptions(
             checkpointManagerConfig.getWriteBatchSize(),
@@ -152,9 +159,29 @@ public CheckpointManager(
             checkpointManagerConfig.getMaximumPacketSize());
 
     // Setup the IStatefulStorage
-    // TODO(mfu): This should be done in an executor driven by another thread, kind of async
+    IStatefulStorage statefulStorage = setupStatefulStorage(topologyName, checkpointManagerConfig);
+
+    // Start the server
+    this.checkpointManagerServer = new CheckpointManagerServer(
+        topologyName, topologyId, checkpointMgrId, statefulStorage,
+        checkpointManagerServerLoop, serverHost, serverPort, serverSocketOptions);
+  }
+
+  public void startAndLoop() {
+    // The CheckpointManagerServer would run in the main thread
+    // We do it in the final step since it would await the main thread
+    LOG.info("Starting CheckpointManager Server");
+    checkpointManagerServer.start();
+    checkpointManagerServerLoop.loop();
+  }
+
+  private static IStatefulStorage setupStatefulStorage(
+      String topologyName,
+      CheckpointManagerConfig checkpointManagerConfig) throws CheckpointManagerException {
+
     IStatefulStorage statefulStorage;
     String classname = checkpointManagerConfig.getStorageClassname();
+
     try {
       statefulStorage = (IStatefulStorage) Class.forName(classname).newInstance();
     } catch (InstantiationException e) {
@@ -172,18 +199,7 @@ public CheckpointManager(
       throw new CheckpointManagerException(classname + " init threw exception", e);
     }
 
-    // Start the server
-    this.checkpointManagerServer = new CheckpointManagerServer(
-        topologyName, topologyId, checkpointMgrId, statefulStorage,
-        checkpointManagerServerLoop, serverHost, serverPort, serverSocketOptions);
-  }
-
-  public void startAndLoop() {
-    // The CheckpointManagerServer would run in the main thread
-    // We do it in the final step since it would await the main thread
-    LOG.info("Starting CheckpointManager Server");
-    checkpointManagerServer.start();
-    checkpointManagerServerLoop.loop();
+    return statefulStorage;
   }
 
   public static void main(String[] args) throws IOException,
@@ -241,9 +257,9 @@ public static void main(String[] args) throws IOException,
 
     LOG.info("System Config: " + systemConfig);
 
-    CheckpointManager checkpointManager =
-        new CheckpointManager(topologyName, topologyId, ckptmgrId,
-            CHECKPOINT_MANAGER_HOST, port, systemConfig, ckptmgrConfig);
+    CheckpointManager checkpointManager = new CheckpointManager();
+    checkpointManager.init(topologyName, topologyId, ckptmgrId,
+        CHECKPOINT_MANAGER_HOST, port, systemConfig, ckptmgrConfig);
     checkpointManager.startAndLoop();
 
     LOG.info("Loops terminated. Exiting.");


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services