You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2014/07/15 20:17:20 UTC

[1/4] git commit: [HELIX-448] Call onCallback for CustomCodeCallbackHandler for FINALIZE

Repository: helix
Updated Branches:
  refs/heads/master 639f2f8a6 -> a9e96ea06


[HELIX-448] Call onCallback for CustomCodeCallbackHandler for FINALIZE


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/67be36e0
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/67be36e0
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/67be36e0

Branch: refs/heads/master
Commit: 67be36e02cf8197c9fa19986040636b960463a3f
Parents: 639f2f8
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Wed May 28 15:37:48 2014 -0700
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Tue Jul 15 10:35:31 2014 -0700

----------------------------------------------------------------------
 .../helix/participant/CustomCodeInvoker.java    | 53 +++++++++-----------
 .../TestDisableCustomCodeRunner.java            | 23 ++++++---
 2 files changed, 41 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/67be36e0/helix-core/src/main/java/org/apache/helix/participant/CustomCodeInvoker.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/CustomCodeInvoker.java b/helix-core/src/main/java/org/apache/helix/participant/CustomCodeInvoker.java
index 04375f1..a736d71 100644
--- a/helix-core/src/main/java/org/apache/helix/participant/CustomCodeInvoker.java
+++ b/helix-core/src/main/java/org/apache/helix/participant/CustomCodeInvoker.java
@@ -47,41 +47,38 @@ public class CustomCodeInvoker implements LiveInstanceChangeListener, ConfigChan
   }
 
   private void callParticipantCode(NotificationContext context) {
-    // System.out.println("callback invoked. type:" + context.getType().toString());
-    if (context.getType() == Type.INIT || context.getType() == Type.CALLBACK) {
-      // since ZkClient.unsubscribe() does not immediately remove listeners
-      // from zk, it is possible that two listeners exist when leadership transfers
-      // therefore, double check to make sure only one participant invokes the code
-      if (context.getType() == Type.CALLBACK) {
-        HelixManager manager = context.getManager();
-        // DataAccessor accessor = manager.getDataAccessor();
-        HelixDataAccessor accessor = manager.getHelixDataAccessor();
-        Builder keyBuilder = accessor.keyBuilder();
+    // since ZkClient.unsubscribe() does not immediately remove listeners
+    // from zk, it is possible that two listeners exist when leadership transfers
+    // therefore, double check to make sure only one participant invokes the code
+    if (context.getType() == Type.CALLBACK) {
+      HelixManager manager = context.getManager();
+      // DataAccessor accessor = manager.getDataAccessor();
+      HelixDataAccessor accessor = manager.getHelixDataAccessor();
+      Builder keyBuilder = accessor.keyBuilder();
 
-        String instance = manager.getInstanceName();
-        String sessionId = manager.getSessionId();
+      String instance = manager.getInstanceName();
+      String sessionId = manager.getSessionId();
 
-        // get resource name from partition key: "PARTICIPANT_LEADER_XXX_0"
-        String resourceName = _partitionKey.substring(0, _partitionKey.lastIndexOf('_'));
+      // get resource name from partition key: "PARTICIPANT_LEADER_XXX_0"
+      String resourceName = _partitionKey.substring(0, _partitionKey.lastIndexOf('_'));
 
-        CurrentState curState =
-            accessor.getProperty(keyBuilder.currentState(instance, sessionId, resourceName));
-        if (curState == null) {
-          return;
-        }
-
-        String state = curState.getState(_partitionKey);
-        if (state == null || !state.equalsIgnoreCase("LEADER")) {
-          return;
-        }
+      CurrentState curState =
+          accessor.getProperty(keyBuilder.currentState(instance, sessionId, resourceName));
+      if (curState == null) {
+        return;
       }
 
-      try {
-        _callback.onCallback(context);
-      } catch (Exception e) {
-        LOG.error("Error invoking callback:" + _callback, e);
+      String state = curState.getState(_partitionKey);
+      if (state == null || !state.equalsIgnoreCase("LEADER")) {
+        return;
       }
     }
+
+    try {
+      _callback.onCallback(context);
+    } catch (Exception e) {
+      LOG.error("Error invoking callback:" + _callback, e);
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/helix/blob/67be36e0/helix-core/src/test/java/org/apache/helix/integration/TestDisableCustomCodeRunner.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestDisableCustomCodeRunner.java b/helix-core/src/test/java/org/apache/helix/integration/TestDisableCustomCodeRunner.java
index 3223e48..b9097b3 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestDisableCustomCodeRunner.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestDisableCustomCodeRunner.java
@@ -24,9 +24,9 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.helix.BaseDataAccessor;
 import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixConstants.ChangeType;
-import org.apache.helix.BaseDataAccessor;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.NotificationContext;
 import org.apache.helix.PropertyKey;
@@ -72,6 +72,10 @@ public class TestDisableCustomCodeRunner extends ZkUnitTestBase {
     public boolean isCallbackTypeInvoked() {
       return _callbackInvokeMap.containsKey(NotificationContext.Type.CALLBACK);
     }
+
+    public boolean isFinalizeTypeInvoked() {
+      return _callbackInvokeMap.containsKey(NotificationContext.Type.FINALIZE);
+    }
   }
 
   @Test
@@ -100,8 +104,7 @@ public class TestDisableCustomCodeRunner extends ZkUnitTestBase {
         new HashMap<String, MockParticipantManager>();
     Map<String, HelixCustomCodeRunner> customCodeRunners =
         new HashMap<String, HelixCustomCodeRunner>();
-    Map<String, DummyCallback> callbacks =
-        new HashMap<String, DummyCallback>();
+    Map<String, DummyCallback> callbacks = new HashMap<String, DummyCallback>();
     for (int i = 0; i < N; i++) {
       String instanceName = "localhost_" + (12918 + i);
 
@@ -113,15 +116,14 @@ public class TestDisableCustomCodeRunner extends ZkUnitTestBase {
       callbacks.put(instanceName, new DummyCallback());
 
       customCodeRunners.get(instanceName).invoke(callbacks.get(instanceName))
-          .on(ChangeType.LIVE_INSTANCE)
-          .usingLeaderStandbyModel("TestParticLeader").start();
+          .on(ChangeType.LIVE_INSTANCE).usingLeaderStandbyModel("TestParticLeader").start();
       participants.get(instanceName).syncStart();
     }
 
     boolean result =
         ClusterStateVerifier
             .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
-            clusterName));
+                clusterName));
     Assert.assertTrue(result);
 
     // Make sure callback is registered
@@ -191,9 +193,16 @@ public class TestDisableCustomCodeRunner extends ZkUnitTestBase {
     accessor.setProperty(keyBuilder.liveInstance("fakeInstance"), fakeInstance);
     Thread.sleep(1000);
 
-    for (DummyCallback callback : callbacks.values()) {
+    for (Map.Entry<String, DummyCallback> e : callbacks.entrySet()) {
+      String instance = e.getKey();
+      DummyCallback callback = e.getValue();
       Assert.assertFalse(callback.isInitTypeInvoked());
       Assert.assertFalse(callback.isCallbackTypeInvoked());
+
+      // Ensure that we were told that a leader stopped being the leader
+      if (instance.equals(leader)) {
+        Assert.assertTrue(callback.isFinalizeTypeInvoked());
+      }
     }
 
     // Remove fake instance


[3/4] git commit: [HELIX-452] Increase frequency of status update cleanup

Posted by ka...@apache.org.
[HELIX-452] Increase frequency of status update cleanup


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/ff900705
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/ff900705
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/ff900705

Branch: refs/heads/master
Commit: ff900705fab53ebb386a91314faad7773ce964f0
Parents: 00217c6
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Thu Jun 5 10:26:13 2014 -0700
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Tue Jul 15 11:12:45 2014 -0700

----------------------------------------------------------------------
 .../controller/stages/BestPossibleStateCalcStage.java | 14 +++++++++-----
 .../helix/manager/zk/ControllerManagerHelper.java     |  2 ++
 .../org/apache/helix/manager/zk/ZKHelixManager.java   | 12 +++++-------
 3 files changed, 16 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/ff900705/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
index 8c34f6b..003be3c 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
@@ -81,11 +81,15 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
         compute(cluster, event, resourceMap, currentStateOutput);
     event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.toString(), bestPossibleStateOutput);
 
-    ClusterStatusMonitor clusterStatusMonitor =
-        (ClusterStatusMonitor) event.getAttribute("clusterStatusMonitor");
-    if (clusterStatusMonitor != null) {
-      clusterStatusMonitor.setPerInstanceResourceStatus(bestPossibleStateOutput,
-          cache.getInstanceConfigMap(), resourceMap, cache.getStateModelDefMap());
+    try {
+      ClusterStatusMonitor clusterStatusMonitor =
+          (ClusterStatusMonitor) event.getAttribute("clusterStatusMonitor");
+      if (clusterStatusMonitor != null) {
+        clusterStatusMonitor.setPerInstanceResourceStatus(bestPossibleStateOutput,
+            cache.getInstanceConfigMap(), resourceMap, cache.getStateModelDefMap());
+      }
+    } catch (Exception e) {
+      logger.error("Could not update cluster status metrics!", e);
     }
 
     long endTime = System.currentTimeMillis();

http://git-wip-us.apache.org/repos/asf/helix/blob/ff900705/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManagerHelper.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManagerHelper.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManagerHelper.java
index d2b520b..9a817e3 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManagerHelper.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManagerHelper.java
@@ -56,10 +56,12 @@ public class ControllerManagerHelper {
           new DefaultControllerMessageHandlerFactory();
       _messagingService.getExecutor().registerMessageHandlerFactory(
           defaultControllerMsgHandlerFactory.getMessageType(), defaultControllerMsgHandlerFactory);
+
       MessageHandlerFactory defaultSchedulerMsgHandlerFactory =
           new DefaultSchedulerMessageHandlerFactory(_manager);
       _messagingService.getExecutor().registerMessageHandlerFactory(
           defaultSchedulerMsgHandlerFactory.getMessageType(), defaultSchedulerMsgHandlerFactory);
+
       MessageHandlerFactory defaultParticipantErrorMessageHandlerFactory =
           new DefaultParticipantErrorMessageHandlerFactory(_manager);
       _messagingService.getExecutor().registerMessageHandlerFactory(

http://git-wip-us.apache.org/repos/asf/helix/blob/ff900705/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
index 9aa8d6b..8d28dbd 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
@@ -36,12 +36,12 @@ import org.apache.helix.ControllerChangeListener;
 import org.apache.helix.CurrentStateChangeListener;
 import org.apache.helix.ExternalViewChangeListener;
 import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixConstants.ChangeType;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixException;
 import org.apache.helix.HelixManager;
 import org.apache.helix.HelixManagerProperties;
 import org.apache.helix.HelixTimerTask;
-import org.apache.helix.HelixConstants.ChangeType;
 import org.apache.helix.IdealStateChangeListener;
 import org.apache.helix.InstanceConfigChangeListener;
 import org.apache.helix.InstanceType;
@@ -57,18 +57,16 @@ import org.apache.helix.ScopedConfigChangeListener;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.controller.GenericHelixController;
 import org.apache.helix.messaging.DefaultMessagingService;
-import org.apache.helix.model.ConfigScope;
 import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
 import org.apache.helix.model.LiveInstance;
-import org.apache.helix.model.builder.ConfigScopeBuilder;
 import org.apache.helix.monitoring.ZKPathDataDumpTask;
 import org.apache.helix.participant.HelixStateMachineEngine;
 import org.apache.helix.participant.StateMachineEngine;
 import org.apache.helix.store.zk.AutoFallbackPropertyStore;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.log4j.Logger;
-import org.apache.zookeeper.Watcher.Event.KeeperState;
 import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
 import org.apache.zookeeper.ZooKeeper.States;
 
 public class ZKHelixManager implements HelixManager, IZkStateListener {
@@ -139,9 +137,9 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
 
     @Override
     public void start() {
-      long initialDelay = 30 * 60 * 1000;
-      long period = 120 * 60 * 1000;
-      int timeThresholdNoChange = 180 * 60 * 1000;
+      long initialDelay = 0;
+      long period = 15 * 60 * 1000;
+      int timeThresholdNoChange = 15 * 60 * 1000;
 
       if (_timer == null) {
         LOG.info("Start StatusDumpTask");


[2/4] git commit: [HELIX-455] Add REST API for submitting jobs, add license header

Posted by ka...@apache.org.
[HELIX-455] Add REST API for submitting jobs, add license header


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/00217c64
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/00217c64
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/00217c64

Branch: refs/heads/master
Commit: 00217c64d21023017c78c79939ca3a999da609f8
Parents: 67be36e
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Tue Jul 15 11:10:35 2014 -0700
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Tue Jul 15 11:10:35 2014 -0700

----------------------------------------------------------------------
 .../webapp/resources/WorkflowsResource.java      | 19 +++++++++++++++++++
 1 file changed, 19 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/00217c64/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/WorkflowsResource.java
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/WorkflowsResource.java b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/WorkflowsResource.java
index f09155b..9175530 100644
--- a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/WorkflowsResource.java
+++ b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/WorkflowsResource.java
@@ -1,5 +1,24 @@
 package org.apache.helix.webapp.resources;
 
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 import java.io.IOException;
 import java.util.Iterator;
 import java.util.List;


[4/4] git commit: [HELIX-452] Increase frequency of status update cleanup, 0.7 fix

Posted by ka...@apache.org.
[HELIX-452] Increase frequency of status update cleanup, 0.7 fix


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/a9e96ea0
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/a9e96ea0
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/a9e96ea0

Branch: refs/heads/master
Commit: a9e96ea069c8eb7e106f14ca3331518048dd7cdd
Parents: ff90070
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Tue Jul 15 11:16:25 2014 -0700
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Tue Jul 15 11:16:25 2014 -0700

----------------------------------------------------------------------
 .../apache/helix/controller/stages/BestPossibleStateCalcStage.java | 2 +-
 .../main/java/org/apache/helix/monitoring/ZKPathDataDumpTask.java  | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/a9e96ea0/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
index 003be3c..364c370 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
@@ -89,7 +89,7 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
             cache.getInstanceConfigMap(), resourceMap, cache.getStateModelDefMap());
       }
     } catch (Exception e) {
-      logger.error("Could not update cluster status metrics!", e);
+      LOG.error("Could not update cluster status metrics!", e);
     }
 
     long endTime = System.currentTimeMillis();

http://git-wip-us.apache.org/repos/asf/helix/blob/a9e96ea0/helix-core/src/main/java/org/apache/helix/monitoring/ZKPathDataDumpTask.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/ZKPathDataDumpTask.java b/helix-core/src/main/java/org/apache/helix/monitoring/ZKPathDataDumpTask.java
index 2ab1be3..2f5f773 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/ZKPathDataDumpTask.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/ZKPathDataDumpTask.java
@@ -76,7 +76,7 @@ public class ZKPathDataDumpTask extends TimerTask {
       String errorPath =
           HelixUtil.getInstancePropertyPath(_manager.getClusterName(), instance,
               PropertyType.ERRORS);
-      dump(baseAccessor, errorPath, _thresholdNoChangeInMs * 3);
+      dump(baseAccessor, errorPath, _thresholdNoChangeInMs * 96);
     }
     // dump controller status updates
     String controllerStatusUpdatePath =