You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by sa...@apache.org on 2015/11/06 11:51:23 UTC

[28/30] ode git commit: Tested with two nodes cluster successfully

Tested with two nodes cluster successfully


Project: http://git-wip-us.apache.org/repos/asf/ode/repo
Commit: http://git-wip-us.apache.org/repos/asf/ode/commit/8fe5546d
Tree: http://git-wip-us.apache.org/repos/asf/ode/tree/8fe5546d
Diff: http://git-wip-us.apache.org/repos/asf/ode/diff/8fe5546d

Branch: refs/heads/ODE-563
Commit: 8fe5546d6528b1b2e970971af6c077077b871561
Parents: 348ae9d
Author: suba <su...@cse.mrt.ac.lk>
Authored: Wed Aug 5 22:39:34 2015 +0530
Committer: suba <su...@cse.mrt.ac.lk>
Committed: Wed Aug 5 22:39:34 2015 +0530

----------------------------------------------------------------------
 .../src/main/webapp/WEB-INF/conf/hazelcast.xml  | 63 ++++++++++++++++++++
 .../java/org/apache/ode/axis2/ODEServer.java    |  2 +
 .../hazelcast/HazelcastClusterImpl.java         | 18 +++---
 .../hazelcast/HazelcastDeploymentLock.java      |  2 +-
 .../hazelcast/HazelcastInstanceLock.java        |  3 +-
 .../ode/scheduler/simple/SimpleScheduler.java   | 43 ++++++-------
 .../scheduler/simple/SimpleSchedulerTest.java   | 27 ++++-----
 7 files changed, 113 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ode/blob/8fe5546d/axis2-war/src/main/webapp/WEB-INF/conf/hazelcast.xml
----------------------------------------------------------------------
diff --git a/axis2-war/src/main/webapp/WEB-INF/conf/hazelcast.xml b/axis2-war/src/main/webapp/WEB-INF/conf/hazelcast.xml
new file mode 100644
index 0000000..bf1e99e
--- /dev/null
+++ b/axis2-war/src/main/webapp/WEB-INF/conf/hazelcast.xml
@@ -0,0 +1,63 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Copyright (c) 2008-2013, Hazelcast, Inc. All Rights Reserved.
+  ~
+  ~ Licensed under the Apache License, Version 2.0 (the "License");
+  ~ you may not use this file except in compliance with the License.
+  ~ You may obtain a copy of the License at
+  ~
+  ~ http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<hazelcast xsi:schemaLocation="http://www.hazelcast.com/schema/config hazelcast-config-3.4.xsd"
+           xmlns="http://www.hazelcast.com/schema/config"
+           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+   <network>
+        <port auto-increment="true" port-count="100">5701</port>
+        <outbound-ports>
+            <ports>0</ports>
+        </outbound-ports>
+        <reuse-address>false</reuse-address>
+        <join>
+            <multicast enabled="false">
+                <multicast-group>224.2.2.3</multicast-group>
+                <multicast-port>54327</multicast-port>
+            </multicast>
+            <tcp-ip enabled="true">
+                <member>127.0.0.1:5701</member>
+                <member>127.0.0.1:5702</member>
+            </tcp-ip>
+            <aws enabled="false">
+                <access-key>my-access-key</access-key>
+                <secret-key>my-secret-key</secret-key>
+                <region>us-west-1</region>
+                <host-header>ec2.amazonaws.com</host-header>
+                <security-group-name>hazelcast-sg</security-group-name>
+                <tag-key>type</tag-key>
+                <tag-value>hz-nodes</tag-value>
+            <multicast enabled="false">
+                <multicast-group>224.2.2.3</multicast-group>
+                <multicast-port>54327</multicast-port>
+            </multicast>
+            </aws>
+        </join>
+        <interfaces enabled="false">
+            <interface>10.10.1.*</interface>
+        </interfaces>
+        <ssl enabled="false" />
+        <socket-interceptor enabled="false" />
+    </network>
+    <partition-group enabled="false"/>
+    <map name="ODE_DEPLOYMENT_LOCK"></map>
+    <map name="ODE_PROCESS_INSTANCE_LOCK"></map>
+    <topic name="ODE_DEPLOYMENT_TOPIC"></topic>
+</hazelcast>
+
+
+

http://git-wip-us.apache.org/repos/asf/ode/blob/8fe5546d/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java
----------------------------------------------------------------------
diff --git a/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java b/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java
index 4860150..0a13c4a 100644
--- a/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java
+++ b/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java
@@ -201,6 +201,7 @@ public class ODEServer {
             _clusterManager.registerClusterMemberListener((ClusterMemberListener) _scheduler);
             _clusterManager.setClusterProcessStore((ClusterProcessStore) _store);
             _clusterManager.init(_configRoot);
+           ((SimpleScheduler)_scheduler).setNodeId(_clusterManager.getNodeID());
         }
 
         try {
@@ -483,6 +484,7 @@ public class ODEServer {
         }
     }
 
+
     /**
      * Initialize the DAO.
      *

http://git-wip-us.apache.org/repos/asf/ode/blob/8fe5546d/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java
----------------------------------------------------------------------
diff --git a/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java b/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java
index 9d2a554..4c5cad5 100644
--- a/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java
+++ b/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java
@@ -57,8 +57,8 @@ public class HazelcastClusterImpl implements ClusterManager, ProcessStoreCluster
     private IMap<Long, Long> instance_lock_map;
     private ITopic<ProcessStoreClusterEvent> clusterDeploymentMessageTopic;
     private ClusterProcessStore _clusterProcessStore;
-    private ClusterLock<String> _hazelcastDeploymentLock;
-    private ClusterLock<Long> _hazelcastInstanceLock;
+    private HazelcastDeploymentLock hazelcastDeploymentLock;
+    private HazelcastInstanceLock hazelcastInstanceLock;
     private ClusterDeploymentMessageListener clusterDeploymentMessageListener;
     private ClusterMemberShipListener clusterMemberShipListener;
     private List<ClusterMemberListener> clusterMemberListenerList = null;
@@ -67,8 +67,11 @@ public class HazelcastClusterImpl implements ClusterManager, ProcessStoreCluster
         clusterMemberShipListener = new ClusterMemberShipListener();
         clusterDeploymentMessageListener = new ClusterDeploymentMessageListener();
         clusterDeploymentMessageListener.registerClusterProcessStoreListener((ProcessStoreClusterListener)this);
+        hazelcastDeploymentLock = new HazelcastDeploymentLock();
+        hazelcastInstanceLock = new HazelcastInstanceLock();
     }
 
+
     public void init(File configRoot) {
 
         /*First,looks for the hazelcast.config system property. If it is set, its value is used as the path.
@@ -101,9 +104,8 @@ public class HazelcastClusterImpl implements ClusterManager, ProcessStoreCluster
             instance_lock_map = _hazelcastInstance.getMap(HazelcastConstants.ODE_CLUSTER_PROCESS_INSTANCE_LOCK);
             clusterDeploymentMessageTopic = _hazelcastInstance.getTopic(HazelcastConstants.ODE_CLUSTER_DEPLOYMENT_TOPIC);
 
-            _hazelcastDeploymentLock = (ClusterLock) new HazelcastDeploymentLock(deployment_lock_map);
-            _hazelcastInstanceLock = (ClusterLock) new HazelcastInstanceLock(instance_lock_map);
-
+            hazelcastDeploymentLock.setLockMap(deployment_lock_map);
+            hazelcastInstanceLock.setLockMap(instance_lock_map);
             markAsMaster();
         }
     }
@@ -221,7 +223,7 @@ public class HazelcastClusterImpl implements ClusterManager, ProcessStoreCluster
                 listener.memberElectedAsMaster(nodeID);
             }
         }
-        __log.info(isMaster);
+        __log.info("Master node: " +isMaster);
     }
 
     public boolean isMaster() {
@@ -249,11 +251,11 @@ public class HazelcastClusterImpl implements ClusterManager, ProcessStoreCluster
     }
 
     public ClusterLock<String> getDeploymentLock(){
-        return _hazelcastDeploymentLock;
+        return (ClusterLock)hazelcastDeploymentLock;
     }
 
     public ClusterLock<Long> getInstanceLock(){
-        return _hazelcastInstanceLock;
+        return (ClusterLock)hazelcastInstanceLock;
     }
 
     public List<String> getActiveNodes() {

http://git-wip-us.apache.org/repos/asf/ode/blob/8fe5546d/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastDeploymentLock.java
----------------------------------------------------------------------
diff --git a/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastDeploymentLock.java b/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastDeploymentLock.java
index f36a1b4..b753305 100644
--- a/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastDeploymentLock.java
+++ b/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastDeploymentLock.java
@@ -31,7 +31,7 @@ public class HazelcastDeploymentLock implements ClusterLock<String>{
 
     private IMap<String, String> _lock_map;
 
-    HazelcastDeploymentLock(IMap<String, String> lock_map) {
+    public void setLockMap(IMap<String, String> lock_map) {
         _lock_map = lock_map;
     }
 

http://git-wip-us.apache.org/repos/asf/ode/blob/8fe5546d/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastInstanceLock.java
----------------------------------------------------------------------
diff --git a/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastInstanceLock.java b/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastInstanceLock.java
index 1729bac..8ac11f8 100644
--- a/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastInstanceLock.java
+++ b/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastInstanceLock.java
@@ -31,8 +31,7 @@ public class HazelcastInstanceLock implements ClusterLock<Long> {
 
     private IMap<Long, Long> _lock_map;
 
-
-    HazelcastInstanceLock(IMap<Long, Long> lock_map) {
+    public void setLockMap(IMap<Long, Long> lock_map) {
         _lock_map = lock_map;
     }
 

http://git-wip-us.apache.org/repos/asf/ode/blob/8fe5546d/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
----------------------------------------------------------------------
diff --git a/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java b/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
index 1da5571..517045d 100644
--- a/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
+++ b/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
@@ -479,10 +479,10 @@ public class SimpleScheduler implements Scheduler, TaskRunner, ClusterMemberList
         // schedule immediate job loading for now!
         _todo.enqueue(new LoadImmediateTask(now));
 
-        if(!_isClusterEnabled) enqueueTasksReadnodeIds();
+        if(!_isClusterEnabled) enqueueTasksReadnodeIds(now);
 
         else {
-            if (_clusterManager.isMaster()) enqueueTasksReadnodeIds();
+            if (_clusterManager.isMaster()) enqueueTasksReadnodeIds(now);
         }
 
         _todo.start();
@@ -521,10 +521,11 @@ public class SimpleScheduler implements Scheduler, TaskRunner, ClusterMemberList
 
     // Do enqueue CheckStaleNodes and UpgradeJobsTask after a new master is identified.
     public void memberElectedAsMaster(String masterId) {
-        enqueueTasksReadnodeIds();
+        long now = System.currentTimeMillis();
+        enqueueTasksReadnodeIds(now);
     }
 
-    private void enqueueTasksReadnodeIds() {
+    private void enqueueTasksReadnodeIds(long now) {
         try {
             execTransaction(new Callable<Void>() {
 
@@ -544,8 +545,6 @@ public class SimpleScheduler implements Scheduler, TaskRunner, ClusterMemberList
 
         else _knownNodes.add(_nodeId);
 
-        long now = System.currentTimeMillis();
-
         // schedule check for stale nodes, make it random so that the nodes don't overlap.
         _todo.enqueue(new CheckStaleNodes(now + randomMean(_staleInterval)));
 
@@ -815,8 +814,10 @@ public class SimpleScheduler implements Scheduler, TaskRunner, ClusterMemberList
         final ArrayList<String> activeNodes;
 
         // for cluster mode
-        if (_isClusterEnabled && _clusterManager.isMaster()) {
-            activeNodes = (ArrayList) _clusterManager.getActiveNodes();
+        if (_isClusterEnabled) {
+            if (_clusterManager.isMaster()) {
+                activeNodes = (ArrayList) _clusterManager.getActiveNodes();
+            } else activeNodes = null;
         }
         //for standalone ODE deployments
         else {
@@ -984,24 +985,26 @@ public class SimpleScheduler implements Scheduler, TaskRunner, ClusterMemberList
             ArrayList<String> knownNodes = new ArrayList<String>(_knownNodes);
 
             // for cluster mode
-            if (_isClusterEnabled && _clusterManager.isMaster()) {
-                ArrayList<String> memberList = (ArrayList) _clusterManager.getActiveNodes();
-
-                //find stale nodes
-                knownNodes.removeAll(memberList);
-                if (knownNodes.size() != 0) {
-                    for (String nodeId : knownNodes) {
-                        _staleNodes.add(nodeId);
+            if (_isClusterEnabled) {
+                if (_clusterManager.isMaster()) {
+                    ArrayList<String> memberList = (ArrayList) _clusterManager.getActiveNodes();
+
+                    //find stale nodes
+                    knownNodes.removeAll(memberList);
+                    if (knownNodes.size() != 0) {
+                        for (String nodeId : knownNodes) {
+                            _staleNodes.add(nodeId);
+                        }
+                    }
+                    for (String nodeId : _staleNodes) {
+                        recoverStaleNode(nodeId);
                     }
-                }
-                for (String nodeId : _staleNodes)  {
-                    recoverStaleNode(nodeId);
                 }
             }
             // for standalone ode node
             else {
                 for (String nodeId : knownNodes) {
-                    if (!nodeId.equals(_nodeId)) recoverStaleNode(nodeId);
+                    if (!_nodeId.equals(nodeId)) recoverStaleNode(nodeId);
                 }
             }
             /*for (String nodeId : _knownNodes) {

http://git-wip-us.apache.org/repos/asf/ode/blob/8fe5546d/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SimpleSchedulerTest.java
----------------------------------------------------------------------
diff --git a/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SimpleSchedulerTest.java b/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SimpleSchedulerTest.java
index 4c89ae9..10e86fc 100644
--- a/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SimpleSchedulerTest.java
+++ b/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SimpleSchedulerTest.java
@@ -19,27 +19,26 @@
 
 package org.apache.ode.scheduler.simple;
 
-import java.util.*;
-
-import javax.transaction.RollbackException;
-import javax.transaction.Status;
-import javax.transaction.Synchronization;
-import javax.transaction.SystemException;
-import javax.transaction.TransactionManager;
-
 import junit.framework.Assert;
-import junit.framework.TestCase;
-
 import org.apache.geronimo.transaction.manager.GeronimoTransactionManager;
 import org.apache.ode.bpel.iapi.Scheduler;
 import org.apache.ode.bpel.iapi.Scheduler.JobInfo;
 import org.apache.ode.bpel.iapi.Scheduler.JobProcessor;
 import org.apache.ode.bpel.iapi.Scheduler.JobProcessorException;
-import org.apache.geronimo.transaction.manager.GeronimoTransactionManager;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
+import javax.transaction.RollbackException;
+import javax.transaction.Status;
+import javax.transaction.Synchronization;
+import javax.transaction.SystemException;
+import javax.transaction.TransactionManager;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.Properties;
+
 public class SimpleSchedulerTest extends Assert implements JobProcessor {
 
     DelegateSupport _ds;
@@ -210,10 +209,10 @@ public class SimpleSchedulerTest extends Assert implements JobProcessor {
         _scheduler.setImmediateInterval(1000);
         _scheduler.setStaleInterval(1000);
         _scheduler.start();
-        for (int i = 0; i < 40; ++i) {
-            _scheduler.updateHeartBeat("n1");
+        /*for (int i = 0; i < 40; ++i) {
+           _scheduler.updateHeartBeat("n1");
             Thread.sleep(100);
-        }
+        }*/
 
         _scheduler.stop();
         Thread.sleep(1000);