You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2014/07/15 20:17:20 UTC
[1/4] git commit: [HELIX-448] Call onCallback for
CustomCodeCallbackHandler for FINALIZE
Repository: helix
Updated Branches:
refs/heads/master 639f2f8a6 -> a9e96ea06
[HELIX-448] Call onCallback for CustomCodeCallbackHandler for FINALIZE
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/67be36e0
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/67be36e0
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/67be36e0
Branch: refs/heads/master
Commit: 67be36e02cf8197c9fa19986040636b960463a3f
Parents: 639f2f8
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Wed May 28 15:37:48 2014 -0700
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Tue Jul 15 10:35:31 2014 -0700
----------------------------------------------------------------------
.../helix/participant/CustomCodeInvoker.java | 53 +++++++++-----------
.../TestDisableCustomCodeRunner.java | 23 ++++++---
2 files changed, 41 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/67be36e0/helix-core/src/main/java/org/apache/helix/participant/CustomCodeInvoker.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/CustomCodeInvoker.java b/helix-core/src/main/java/org/apache/helix/participant/CustomCodeInvoker.java
index 04375f1..a736d71 100644
--- a/helix-core/src/main/java/org/apache/helix/participant/CustomCodeInvoker.java
+++ b/helix-core/src/main/java/org/apache/helix/participant/CustomCodeInvoker.java
@@ -47,41 +47,38 @@ public class CustomCodeInvoker implements LiveInstanceChangeListener, ConfigChan
}
private void callParticipantCode(NotificationContext context) {
- // System.out.println("callback invoked. type:" + context.getType().toString());
- if (context.getType() == Type.INIT || context.getType() == Type.CALLBACK) {
- // since ZkClient.unsubscribe() does not immediately remove listeners
- // from zk, it is possible that two listeners exist when leadership transfers
- // therefore, double check to make sure only one participant invokes the code
- if (context.getType() == Type.CALLBACK) {
- HelixManager manager = context.getManager();
- // DataAccessor accessor = manager.getDataAccessor();
- HelixDataAccessor accessor = manager.getHelixDataAccessor();
- Builder keyBuilder = accessor.keyBuilder();
+ // since ZkClient.unsubscribe() does not immediately remove listeners
+ // from zk, it is possible that two listeners exist when leadership transfers
+ // therefore, double check to make sure only one participant invokes the code
+ if (context.getType() == Type.CALLBACK) {
+ HelixManager manager = context.getManager();
+ // DataAccessor accessor = manager.getDataAccessor();
+ HelixDataAccessor accessor = manager.getHelixDataAccessor();
+ Builder keyBuilder = accessor.keyBuilder();
- String instance = manager.getInstanceName();
- String sessionId = manager.getSessionId();
+ String instance = manager.getInstanceName();
+ String sessionId = manager.getSessionId();
- // get resource name from partition key: "PARTICIPANT_LEADER_XXX_0"
- String resourceName = _partitionKey.substring(0, _partitionKey.lastIndexOf('_'));
+ // get resource name from partition key: "PARTICIPANT_LEADER_XXX_0"
+ String resourceName = _partitionKey.substring(0, _partitionKey.lastIndexOf('_'));
- CurrentState curState =
- accessor.getProperty(keyBuilder.currentState(instance, sessionId, resourceName));
- if (curState == null) {
- return;
- }
-
- String state = curState.getState(_partitionKey);
- if (state == null || !state.equalsIgnoreCase("LEADER")) {
- return;
- }
+ CurrentState curState =
+ accessor.getProperty(keyBuilder.currentState(instance, sessionId, resourceName));
+ if (curState == null) {
+ return;
}
- try {
- _callback.onCallback(context);
- } catch (Exception e) {
- LOG.error("Error invoking callback:" + _callback, e);
+ String state = curState.getState(_partitionKey);
+ if (state == null || !state.equalsIgnoreCase("LEADER")) {
+ return;
}
}
+
+ try {
+ _callback.onCallback(context);
+ } catch (Exception e) {
+ LOG.error("Error invoking callback:" + _callback, e);
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/helix/blob/67be36e0/helix-core/src/test/java/org/apache/helix/integration/TestDisableCustomCodeRunner.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestDisableCustomCodeRunner.java b/helix-core/src/test/java/org/apache/helix/integration/TestDisableCustomCodeRunner.java
index 3223e48..b9097b3 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestDisableCustomCodeRunner.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestDisableCustomCodeRunner.java
@@ -24,9 +24,9 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Set;
+import org.apache.helix.BaseDataAccessor;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixConstants.ChangeType;
-import org.apache.helix.BaseDataAccessor;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.NotificationContext;
import org.apache.helix.PropertyKey;
@@ -72,6 +72,10 @@ public class TestDisableCustomCodeRunner extends ZkUnitTestBase {
public boolean isCallbackTypeInvoked() {
return _callbackInvokeMap.containsKey(NotificationContext.Type.CALLBACK);
}
+
+ public boolean isFinalizeTypeInvoked() {
+ return _callbackInvokeMap.containsKey(NotificationContext.Type.FINALIZE);
+ }
}
@Test
@@ -100,8 +104,7 @@ public class TestDisableCustomCodeRunner extends ZkUnitTestBase {
new HashMap<String, MockParticipantManager>();
Map<String, HelixCustomCodeRunner> customCodeRunners =
new HashMap<String, HelixCustomCodeRunner>();
- Map<String, DummyCallback> callbacks =
- new HashMap<String, DummyCallback>();
+ Map<String, DummyCallback> callbacks = new HashMap<String, DummyCallback>();
for (int i = 0; i < N; i++) {
String instanceName = "localhost_" + (12918 + i);
@@ -113,15 +116,14 @@ public class TestDisableCustomCodeRunner extends ZkUnitTestBase {
callbacks.put(instanceName, new DummyCallback());
customCodeRunners.get(instanceName).invoke(callbacks.get(instanceName))
- .on(ChangeType.LIVE_INSTANCE)
- .usingLeaderStandbyModel("TestParticLeader").start();
+ .on(ChangeType.LIVE_INSTANCE).usingLeaderStandbyModel("TestParticLeader").start();
participants.get(instanceName).syncStart();
}
boolean result =
ClusterStateVerifier
.verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
- clusterName));
+ clusterName));
Assert.assertTrue(result);
// Make sure callback is registered
@@ -191,9 +193,16 @@ public class TestDisableCustomCodeRunner extends ZkUnitTestBase {
accessor.setProperty(keyBuilder.liveInstance("fakeInstance"), fakeInstance);
Thread.sleep(1000);
- for (DummyCallback callback : callbacks.values()) {
+ for (Map.Entry<String, DummyCallback> e : callbacks.entrySet()) {
+ String instance = e.getKey();
+ DummyCallback callback = e.getValue();
Assert.assertFalse(callback.isInitTypeInvoked());
Assert.assertFalse(callback.isCallbackTypeInvoked());
+
+ // Ensure that we were told that a leader stopped being the leader
+ if (instance.equals(leader)) {
+ Assert.assertTrue(callback.isFinalizeTypeInvoked());
+ }
}
// Remove fake instance
[3/4] git commit: [HELIX-452] Increase frequency of status update
cleanup
Posted by ka...@apache.org.
[HELIX-452] Increase frequency of status update cleanup
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/ff900705
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/ff900705
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/ff900705
Branch: refs/heads/master
Commit: ff900705fab53ebb386a91314faad7773ce964f0
Parents: 00217c6
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Thu Jun 5 10:26:13 2014 -0700
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Tue Jul 15 11:12:45 2014 -0700
----------------------------------------------------------------------
.../controller/stages/BestPossibleStateCalcStage.java | 14 +++++++++-----
.../helix/manager/zk/ControllerManagerHelper.java | 2 ++
.../org/apache/helix/manager/zk/ZKHelixManager.java | 12 +++++-------
3 files changed, 16 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/ff900705/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
index 8c34f6b..003be3c 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
@@ -81,11 +81,15 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
compute(cluster, event, resourceMap, currentStateOutput);
event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.toString(), bestPossibleStateOutput);
- ClusterStatusMonitor clusterStatusMonitor =
- (ClusterStatusMonitor) event.getAttribute("clusterStatusMonitor");
- if (clusterStatusMonitor != null) {
- clusterStatusMonitor.setPerInstanceResourceStatus(bestPossibleStateOutput,
- cache.getInstanceConfigMap(), resourceMap, cache.getStateModelDefMap());
+ try {
+ ClusterStatusMonitor clusterStatusMonitor =
+ (ClusterStatusMonitor) event.getAttribute("clusterStatusMonitor");
+ if (clusterStatusMonitor != null) {
+ clusterStatusMonitor.setPerInstanceResourceStatus(bestPossibleStateOutput,
+ cache.getInstanceConfigMap(), resourceMap, cache.getStateModelDefMap());
+ }
+ } catch (Exception e) {
+ logger.error("Could not update cluster status metrics!", e);
}
long endTime = System.currentTimeMillis();
http://git-wip-us.apache.org/repos/asf/helix/blob/ff900705/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManagerHelper.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManagerHelper.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManagerHelper.java
index d2b520b..9a817e3 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManagerHelper.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManagerHelper.java
@@ -56,10 +56,12 @@ public class ControllerManagerHelper {
new DefaultControllerMessageHandlerFactory();
_messagingService.getExecutor().registerMessageHandlerFactory(
defaultControllerMsgHandlerFactory.getMessageType(), defaultControllerMsgHandlerFactory);
+
MessageHandlerFactory defaultSchedulerMsgHandlerFactory =
new DefaultSchedulerMessageHandlerFactory(_manager);
_messagingService.getExecutor().registerMessageHandlerFactory(
defaultSchedulerMsgHandlerFactory.getMessageType(), defaultSchedulerMsgHandlerFactory);
+
MessageHandlerFactory defaultParticipantErrorMessageHandlerFactory =
new DefaultParticipantErrorMessageHandlerFactory(_manager);
_messagingService.getExecutor().registerMessageHandlerFactory(
http://git-wip-us.apache.org/repos/asf/helix/blob/ff900705/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
index 9aa8d6b..8d28dbd 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
@@ -36,12 +36,12 @@ import org.apache.helix.ControllerChangeListener;
import org.apache.helix.CurrentStateChangeListener;
import org.apache.helix.ExternalViewChangeListener;
import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixConstants.ChangeType;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixException;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerProperties;
import org.apache.helix.HelixTimerTask;
-import org.apache.helix.HelixConstants.ChangeType;
import org.apache.helix.IdealStateChangeListener;
import org.apache.helix.InstanceConfigChangeListener;
import org.apache.helix.InstanceType;
@@ -57,18 +57,16 @@ import org.apache.helix.ScopedConfigChangeListener;
import org.apache.helix.ZNRecord;
import org.apache.helix.controller.GenericHelixController;
import org.apache.helix.messaging.DefaultMessagingService;
-import org.apache.helix.model.ConfigScope;
import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
import org.apache.helix.model.LiveInstance;
-import org.apache.helix.model.builder.ConfigScopeBuilder;
import org.apache.helix.monitoring.ZKPathDataDumpTask;
import org.apache.helix.participant.HelixStateMachineEngine;
import org.apache.helix.participant.StateMachineEngine;
import org.apache.helix.store.zk.AutoFallbackPropertyStore;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.log4j.Logger;
-import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooKeeper.States;
public class ZKHelixManager implements HelixManager, IZkStateListener {
@@ -139,9 +137,9 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
@Override
public void start() {
- long initialDelay = 30 * 60 * 1000;
- long period = 120 * 60 * 1000;
- int timeThresholdNoChange = 180 * 60 * 1000;
+ long initialDelay = 0;
+ long period = 15 * 60 * 1000;
+ int timeThresholdNoChange = 15 * 60 * 1000;
if (_timer == null) {
LOG.info("Start StatusDumpTask");
[2/4] git commit: [HELIX-455] Add REST API for submitting jobs,
add license header
Posted by ka...@apache.org.
[HELIX-455] Add REST API for submitting jobs, add license header
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/00217c64
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/00217c64
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/00217c64
Branch: refs/heads/master
Commit: 00217c64d21023017c78c79939ca3a999da609f8
Parents: 67be36e
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Tue Jul 15 11:10:35 2014 -0700
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Tue Jul 15 11:10:35 2014 -0700
----------------------------------------------------------------------
.../webapp/resources/WorkflowsResource.java | 19 +++++++++++++++++++
1 file changed, 19 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/00217c64/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/WorkflowsResource.java
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/WorkflowsResource.java b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/WorkflowsResource.java
index f09155b..9175530 100644
--- a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/WorkflowsResource.java
+++ b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/WorkflowsResource.java
@@ -1,5 +1,24 @@
package org.apache.helix.webapp.resources;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
[4/4] git commit: [HELIX-452] Increase frequency of status update
cleanup, 0.7 fix
Posted by ka...@apache.org.
[HELIX-452] Increase frequency of status update cleanup, 0.7 fix
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/a9e96ea0
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/a9e96ea0
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/a9e96ea0
Branch: refs/heads/master
Commit: a9e96ea069c8eb7e106f14ca3331518048dd7cdd
Parents: ff90070
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Tue Jul 15 11:16:25 2014 -0700
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Tue Jul 15 11:16:25 2014 -0700
----------------------------------------------------------------------
.../apache/helix/controller/stages/BestPossibleStateCalcStage.java | 2 +-
.../main/java/org/apache/helix/monitoring/ZKPathDataDumpTask.java | 2 +-
2 files changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/a9e96ea0/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
index 003be3c..364c370 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
@@ -89,7 +89,7 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
cache.getInstanceConfigMap(), resourceMap, cache.getStateModelDefMap());
}
} catch (Exception e) {
- logger.error("Could not update cluster status metrics!", e);
+ LOG.error("Could not update cluster status metrics!", e);
}
long endTime = System.currentTimeMillis();
http://git-wip-us.apache.org/repos/asf/helix/blob/a9e96ea0/helix-core/src/main/java/org/apache/helix/monitoring/ZKPathDataDumpTask.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/ZKPathDataDumpTask.java b/helix-core/src/main/java/org/apache/helix/monitoring/ZKPathDataDumpTask.java
index 2ab1be3..2f5f773 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/ZKPathDataDumpTask.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/ZKPathDataDumpTask.java
@@ -76,7 +76,7 @@ public class ZKPathDataDumpTask extends TimerTask {
String errorPath =
HelixUtil.getInstancePropertyPath(_manager.getClusterName(), instance,
PropertyType.ERRORS);
- dump(baseAccessor, errorPath, _thresholdNoChangeInMs * 3);
+ dump(baseAccessor, errorPath, _thresholdNoChangeInMs * 96);
}
// dump controller status updates
String controllerStatusUpdatePath =