You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@zeppelin.apache.org by GitBox <gi...@apache.org> on 2019/05/27 00:27:26 UTC

[GitHub] [zeppelin] felixcheung commented on a change in pull request #3372: [ZEPPELIN-3623] Create interpreter process in the cluster mode

felixcheung commented on a change in pull request #3372: [ZEPPELIN-3623] Create interpreter process in the cluster mode
URL: https://github.com/apache/zeppelin/pull/3372#discussion_r287621012
 
 

 ##########
 File path: zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ManagedInterpreterGroup.java
 ##########
 @@ -49,12 +60,91 @@
   ManagedInterpreterGroup(String id, InterpreterSetting interpreterSetting) {
     super(id);
     this.interpreterSetting = interpreterSetting;
+    clusterManagerServer = ClusterManagerServer.getInstance();
   }
 
   public InterpreterSetting getInterpreterSetting() {
     return interpreterSetting;
   }
 
+  // Create an interpreter process in the cluster
+  public synchronized RemoteInterpreterProcess getOrCreateClusterIntpProcess(
+      String userName, Properties properties, ClusterIntpProcParameters clusterInterpreterParam)
+      throws IOException {
+
+    if (null == remoteInterpreterProcess) {
+      LOGGER.info("Create cluster InterpreterProcess for InterpreterGroup: " + getId());
+      HashMap<String, Object> intpProcMeta
+          = clusterManagerServer.getClusterMeta(ClusterMetaType.IntpProcessMeta, id).get(id);
+      // exist Interpreter Process
+      if (null != intpProcMeta
+          && intpProcMeta.containsKey(ClusterMeta.INTP_TSERVER_HOST)
+          && intpProcMeta.containsKey(ClusterMeta.INTP_TSERVER_PORT)) {
+        // Borrow properties variable
+        String intpTSrvHost = (String) intpProcMeta.get(ClusterMeta.INTP_TSERVER_HOST);
+        String intpTSrvPort = intpProcMeta.get(ClusterMeta.INTP_TSERVER_PORT).toString();
+        properties.put(ClusterManagerServer.CONNET_EXISTING_PROCESS, "true");
+        properties.put(ClusterMeta.INTP_TSERVER_HOST, intpTSrvHost);
+        properties.put(ClusterMeta.INTP_TSERVER_PORT, intpTSrvPort);
+      } else {
+        // No process was found for the InterpreterGroup ID
+        HashMap<String, Object> meta = clusterManagerServer.getIdleNodeMeta();
+        if (null == meta) {
+          LOGGER.error("don't get idle node meta.");
+          return null;
+        }
+        try {
+          String srvHost = (String)meta.get(ClusterMeta.SERVER_TSERVER_HOST);
+          String localhost = RemoteInterpreterUtils.findAvailableHostAddress();
+          if (localhost.equalsIgnoreCase(srvHost)) {
+            getOrCreateInterpreterProcess(userName, properties);
+          } else {
+            int srvPort = (int)meta.get(ClusterMeta.SERVER_TSERVER_PORT);
+            clusterManagerServer.openRemoteInterpreterProcess(srvHost, srvPort, clusterInterpreterParam);
+            HashMap<String, Object> intpMeta = clusterManagerServer.getClusterMeta(ClusterMetaType.IntpProcessMeta, id).get(id);
+            int retryGetMeta = 0;
+            while ((++retryGetMeta < 20) &
+                (null == intpMeta || !intpMeta.containsKey(ClusterMeta.INTP_TSERVER_HOST)
+                    || !intpMeta.containsKey(ClusterMeta.INTP_TSERVER_PORT)) ) {
+              try {
+                Thread.sleep(500);
+                intpMeta = clusterManagerServer.getClusterMeta(ClusterMetaType.IntpProcessMeta, id).get(id);
+                LOGGER.warn("retry {} times to get {} meta!", retryGetMeta, id);
+              } catch (InterruptedException e) {
+                e.printStackTrace();
+              }
+            }
+
+            // Check if the remote creation process is successful
+            if (null == intpMeta || !intpMeta.containsKey(ClusterMeta.INTP_TSERVER_HOST)
+                || !intpMeta.containsKey(ClusterMeta.INTP_TSERVER_PORT)) {
+              LOGGER.error("Creating process {} failed on remote server {}:{}, {}.",
+                  id, srvHost, srvPort, clusterInterpreterParam);
+              return null;
+            }
+
+            // Borrow properties variable
+            String intpTSrvHost = (String) intpMeta.get(ClusterMeta.INTP_TSERVER_HOST);
+            String intpTSrvPort = intpMeta.get(ClusterMeta.INTP_TSERVER_PORT).toString();
+            properties.put(ClusterManagerServer.CONNET_EXISTING_PROCESS, "true");
+            properties.put(ClusterMeta.INTP_TSERVER_HOST, intpTSrvHost);
+            properties.put(ClusterMeta.INTP_TSERVER_PORT, intpTSrvPort);
+          }
+        } catch (TException e) {
+          LOGGER.error(e.getMessage());
 
 Review comment:
   LOGGER.error(message, e)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services