You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by la...@apache.org on 2014/09/17 17:28:38 UTC

git commit: Improvement to zk usage in orchestrator

Repository: airavata
Updated Branches:
  refs/heads/master ee1df1b4a -> b8bb82ce8


Improvement to zk usage in orchestrator


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/b8bb82ce
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/b8bb82ce
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/b8bb82ce

Branch: refs/heads/master
Commit: b8bb82ce835062dd741d8ff76041f25049499977
Parents: ee1df1b
Author: lahiru <la...@apache.org>
Authored: Wed Sep 17 11:28:34 2014 -0400
Committer: lahiru <la...@apache.org>
Committed: Wed Sep 17 11:28:34 2014 -0400

----------------------------------------------------------------------
 .../client/samples/CreateLaunchExperiment.java  | 14 ++--
 .../impl/push/amqp/SimpleJobFinishConsumer.java |  4 +-
 .../server/OrchestratorServerHandler.java       | 72 ++++++++++++--------
 .../core/context/OrchestratorContext.java       |  6 +-
 4 files changed, 54 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/b8bb82ce/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
index 827a187..490fbf9 100644
--- a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
+++ b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
@@ -46,9 +46,9 @@ public class CreateLaunchExperiment {
     private static final String DEFAULT_USER = "default.registry.user";
     private static final String DEFAULT_GATEWAY = "default.registry.gateway";
     private static Airavata.Client airavataClient;
-    private static String echoAppId = "Echo_bb2fe905-718b-4f4a-91ad-e1551feb06c3";
-    private static String wrfAppId = "WRF_35684c10-7749-4691-939b-225903fd2f73";
-    private static String amberAppId = "Amber_f4384478-a707-45f9-b2ed-477fe7fb486b";
+    private static String echoAppId = "Echo_b7cebf37-df12-4803-a50c-efdbc2edd9b6";
+    private static String wrfAppId = "WRF_5f097c9c-7066-49ec-aed7-4e39607b3adc";
+    private static String amberAppId = "Amber_89906be6-5678-49a6-9d04-a0604fbdef2e";
 
     private static String localHost = "localhost";
     private static String trestlesHostName = "trestles.sdsc.xsede.org";
@@ -59,14 +59,14 @@ public class CreateLaunchExperiment {
         try {
             airavataClient = AiravataClientFactory.createAiravataClient(THRIFT_SERVER_HOST, THRIFT_SERVER_PORT);
             System.out.println("API version is " + airavataClient.getAPIVersion());
-//            registerApplications();
+            registerApplications();
 
 
 
 ////            final String expId = createExperimentForSSHHost(airavata);
 //            final String expId = createEchoExperimentForTrestles(airavataClient);
 //            final String expId = createEchoExperimentForStampede(airavataClient);
-            final String expId = createExperimentEchoForLocalHost(airavataClient);
+//            final String expId = createExperimentEchoForLocalHost(airavataClient);
 //            final String expId = createExperimentWRFTrestles(airavataClient);
 //            final String expId = createExperimentForBR2(airavataClient);
 //            final String expId = createExperimentForBR2Amber(airavataClient);
@@ -74,9 +74,9 @@ public class CreateLaunchExperiment {
 //            final String expId = createExperimentForStampedeAmber(airavataClient);
 //            final String expId = createExperimentForTrestlesAmber(airavataClient);
 
-            System.out.println("Experiment ID : " + expId);
+//            System.out.println("Experiment ID : " + expId);
 //            updateExperiment(airavata, expId);
-            launchExperiment(airavataClient, expId);
+//            launchExperiment(airavataClient, expId);
 
 //            System.out.println("retrieved exp id : " + experiment.getExperimentID());
         } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/airavata/blob/b8bb82ce/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/SimpleJobFinishConsumer.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/SimpleJobFinishConsumer.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/SimpleJobFinishConsumer.java
index 3d62fc0..407d208 100644
--- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/SimpleJobFinishConsumer.java
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/SimpleJobFinishConsumer.java
@@ -63,13 +63,13 @@ public class SimpleJobFinishConsumer {
                             ch.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                         }
                     } catch (Exception ex) {
-                        logger.error("Cannot connect to a RabbitMQ Server: " + ex);
+                        logger.error("Cannot connect to a RabbitMQ Server: " , ex);
                     }
                 }
 
             }).start();
         } catch (Exception ex) {
-            logger.error("Cannot connect to a RabbitMQ Server: " + ex);
+            logger.error("Cannot connect to a RabbitMQ Server: " , ex);
             logger.info("------------- Push monitoring for HPC jobs is disabled -------------");
         }
     }

http://git-wip-us.apache.org/repos/asf/airavata/blob/b8bb82ce/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
index 1f5160f..c1a00a8 100644
--- a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
+++ b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
@@ -103,34 +103,7 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface,
 				synchronized (mutex) {
 					mutex.wait(); // waiting for the syncConnected event
 				}
-				Stat zkStat = zk.exists(OrchServer, false);
-				if (zkStat == null) {
-					zk.create(OrchServer, new byte[0],
-							ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-				}
-				String instantNode = OrchServer
-						+ File.separator
-						+ String.valueOf(new Random()
-								.nextInt(Integer.MAX_VALUE));
-				zkStat = zk.exists(instantNode, false);
-				if (zkStat == null) {
-					zk.create(instantNode, airavataServerHostPort.getBytes(),
-							ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); // other
-																				// component
-																				// will
-																				// watch
-																				// these
-																				// childeren
-																				// creation
-																				// deletion
-																				// to
-																				// monitor
-																				// the
-																				// status
-																				// of
-																				// the
-																				// node
-				}
+                registerOrchestratorService(airavataServerHostPort, OrchServer);
 				// creating a watch in orchestrator to monitor the gfac
 				// instances
 				zk.getChildren(ServerSettings.getSetting(
@@ -162,7 +135,24 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface,
 		}
 	}
 
-	/**
+    private void registerOrchestratorService(String airavataServerHostPort, String orchServer) throws KeeperException, InterruptedException {
+        Stat zkStat = zk.exists(orchServer, false);
+        if (zkStat == null) {
+            zk.create(orchServer, new byte[0],
+                    ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        }
+        String instantNode = orchServer
+                + File.separator
+                + String.valueOf(new Random()
+                        .nextInt(Integer.MAX_VALUE));
+        zkStat = zk.exists(instantNode, false);
+        if (zkStat == null) {
+            zk.create(instantNode, airavataServerHostPort.getBytes(),
+                    ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+        }
+    }
+
+    /**
 	 * * After creating the experiment Data user have the * experimentID as the
 	 * handler to the experiment, during the launchExperiment * We just have to
 	 * give the experimentID * * @param experimentID * @return sucess/failure *
@@ -307,7 +297,27 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface,
 				case SyncConnected:
 					mutex.notify();
 					break;
-				}
+                case Expired:case Disconnected:
+                        try {
+                            zk = new ZooKeeper(AiravataZKUtils.getZKhostPort(), 6000, this);
+                            String airavataServerHostPort = ServerSettings
+                                    .getSetting(Constants.ORCHESTRATOR_SERVER_HOST)
+                                    + ":"
+                                    + ServerSettings
+                                    .getSetting(Constants.ORCHESTRATOR_SERVER_PORT);
+                            String OrchServer = ServerSettings
+                                    .getSetting(org.apache.airavata.common.utils.Constants.ZOOKEEPER_ORCHESTRATOR_SERVER_NODE);
+                            registerOrchestratorService(airavataServerHostPort, OrchServer);
+                        } catch (IOException e) {
+                            e.printStackTrace();
+                        } catch (ApplicationSettingsException e) {
+                            e.printStackTrace();
+                        } catch (InterruptedException e) {
+                            e.printStackTrace();
+                        } catch (KeeperException e) {
+                            e.printStackTrace();
+                        }
+                }
 				if (watchedEvent.getPath() != null
 						&& watchedEvent.getPath().startsWith(
 								ServerSettings.getSetting(
@@ -358,6 +368,8 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface,
 						}).start();
 						break;
 					}
+
+
 				}
 			} catch (KeeperException e) {
 				e.printStackTrace();

http://git-wip-us.apache.org/repos/asf/airavata/blob/b8bb82ce/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/context/OrchestratorContext.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/context/OrchestratorContext.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/context/OrchestratorContext.java
index 7cd212a..542017c 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/context/OrchestratorContext.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/context/OrchestratorContext.java
@@ -39,7 +39,7 @@ public class OrchestratorContext {
 
     private Registry newRegistry;
 
-    private ZooKeeper zk;
+    private static ZooKeeper zk; // this instance can be accessed by the Validators and other components
     
     public OrchestratorContext(List<GFACInstance> gfacInstanceList) {
         if (gfacInstanceList != null) {
@@ -81,11 +81,11 @@ public class OrchestratorContext {
         this.gfacInstanceList.addAll(gfacInstanceList);
     }
 
-    public void setZk(ZooKeeper zk) {
+    public  void setZk(ZooKeeper zk) {
         this.zk = zk;
     }
 
-    public ZooKeeper getZk() {
+    public static ZooKeeper getZk() {
         return zk;
     }
 }