You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by zz...@apache.org on 2014/12/01 23:24:36 UTC
helix git commit: [HELIX-546] Add REST API for Helix job queue
management - first part
Repository: helix
Updated Branches:
refs/heads/helix-0.6.x 5378afab5 -> 059ab387b
[HELIX-546] Add REST API for Helix job queue management - first part
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/059ab387
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/059ab387
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/059ab387
Branch: refs/heads/helix-0.6.x
Commit: 059ab387b7fe70e71989e6560b57667c8eda7b60
Parents: 5378afa
Author: zzhang <zz...@apache.org>
Authored: Mon Dec 1 14:23:43 2014 -0800
Committer: zzhang <zz...@apache.org>
Committed: Mon Dec 1 14:23:43 2014 -0800
----------------------------------------------------------------------
.../main/java/org/apache/helix/PropertyKey.java | 1 -
.../apache/helix/examples/BootstrapProcess.java | 2 -
.../apache/helix/examples/ExampleProcess.java | 1 -
.../org/apache/helix/examples/Quickstart.java | 1 -
.../java/org/apache/helix/manager/zk/Cache.java | 1 -
.../DefaultSchedulerMessageHandlerFactory.java | 7 +-
.../helix/manager/zk/WriteThroughCache.java | 1 -
.../org/apache/helix/manager/zk/ZKUtil.java | 1 -
.../helix/manager/zk/ZkCallbackCache.java | 1 -
.../org/apache/helix/manager/zk/ZkClient.java | 1 -
.../handling/AsyncCallbackService.java | 9 --
.../helix/messaging/handling/HelixTask.java | 9 --
.../messaging/handling/MessageHandler.java | 2 -
.../java/org/apache/helix/model/HealthStat.java | 1 -
.../monitoring/mbeans/ClusterMBeanObserver.java | 7 --
.../mbeans/StateTransitionStatMonitor.java | 1 -
.../participant/HelixStateMachineEngine.java | 1 -
.../java/org/apache/helix/task/TaskDriver.java | 87 ++++++++++------
.../java/org/apache/helix/task/TaskUtil.java | 102 +++++++++++++------
.../java/org/apache/helix/task/Workflow.java | 78 +++++++-------
.../apache/helix/tools/CLMLogFileAppender.java | 5 -
.../org/apache/helix/util/ZKClientPool.java | 1 -
.../org/apache/helix/TestConfigAccessor.java | 1 -
.../helix/TestRelayIdealStateCalculator.java | 1 -
.../java/org/apache/helix/ZkTestHelper.java | 3 -
.../helix/integration/TestCustomIdealState.java | 4 -
.../integration/TestDistributedCMMain.java | 1 -
.../TestDistributedClusterController.java | 1 -
.../apache/helix/integration/TestDriver.java | 5 -
.../integration/TestNonOfflineInitState.java | 1 -
.../TestSessionExpiryInTransition.java | 2 -
.../TestStandAloneCMSessionExpiry.java | 1 -
.../manager/TestParticipantManager.java | 1 -
.../zk/TestZkCacheSyncOpSingleThread.java | 1 -
.../helix/messaging/TestAsyncCallbackSvc.java | 4 -
35 files changed, 170 insertions(+), 176 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/059ab387/helix-core/src/main/java/org/apache/helix/PropertyKey.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/PropertyKey.java b/helix-core/src/main/java/org/apache/helix/PropertyKey.java
index a5a3561..663e831 100644
--- a/helix-core/src/main/java/org/apache/helix/PropertyKey.java
+++ b/helix-core/src/main/java/org/apache/helix/PropertyKey.java
@@ -50,7 +50,6 @@ import org.apache.helix.model.LeaderHistory;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.Message;
import org.apache.helix.model.PauseSignal;
-import org.apache.helix.model.PersistentStats;
import org.apache.helix.model.StateModelDefinition;
import org.apache.helix.model.StatusUpdate;
import org.apache.log4j.Logger;
http://git-wip-us.apache.org/repos/asf/helix/blob/059ab387/helix-core/src/main/java/org/apache/helix/examples/BootstrapProcess.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/examples/BootstrapProcess.java b/helix-core/src/main/java/org/apache/helix/examples/BootstrapProcess.java
index 73f1175..d924fe5 100644
--- a/helix-core/src/main/java/org/apache/helix/examples/BootstrapProcess.java
+++ b/helix-core/src/main/java/org/apache/helix/examples/BootstrapProcess.java
@@ -18,7 +18,6 @@ package org.apache.helix.examples;
* specific language governing permissions and limitations
* under the License.
*/
-import java.io.File;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Date;
@@ -45,7 +44,6 @@ import org.apache.helix.model.Message.MessageType;
import org.apache.helix.participant.StateMachineEngine;
import org.apache.helix.participant.statemachine.StateModel;
import org.apache.helix.participant.statemachine.StateModelFactory;
-import org.apache.helix.tools.ClusterStateVerifier;
/**
* This process does little more than handling the state transition messages.
http://git-wip-us.apache.org/repos/asf/helix/blob/059ab387/helix-core/src/main/java/org/apache/helix/examples/ExampleProcess.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/examples/ExampleProcess.java b/helix-core/src/main/java/org/apache/helix/examples/ExampleProcess.java
index e023cf9..840a963 100644
--- a/helix-core/src/main/java/org/apache/helix/examples/ExampleProcess.java
+++ b/helix-core/src/main/java/org/apache/helix/examples/ExampleProcess.java
@@ -38,7 +38,6 @@ import org.apache.helix.model.Message.MessageType;
import org.apache.helix.participant.StateMachineEngine;
import org.apache.helix.participant.statemachine.StateModel;
import org.apache.helix.participant.statemachine.StateModelFactory;
-import org.apache.helix.tools.ClusterStateVerifier;
import org.apache.log4j.Logger;
public class ExampleProcess {
http://git-wip-us.apache.org/repos/asf/helix/blob/059ab387/helix-core/src/main/java/org/apache/helix/examples/Quickstart.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/examples/Quickstart.java b/helix-core/src/main/java/org/apache/helix/examples/Quickstart.java
index b80d458..6773848 100644
--- a/helix-core/src/main/java/org/apache/helix/examples/Quickstart.java
+++ b/helix-core/src/main/java/org/apache/helix/examples/Quickstart.java
@@ -34,7 +34,6 @@ import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
import org.apache.helix.controller.HelixControllerMain;
import org.apache.helix.manager.zk.ZKHelixAdmin;
-import org.apache.helix.manager.zk.ZNRecordSerializer;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.StateModelDefinition;
http://git-wip-us.apache.org/repos/asf/helix/blob/059ab387/helix-core/src/main/java/org/apache/helix/manager/zk/Cache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/Cache.java b/helix-core/src/main/java/org/apache/helix/manager/zk/Cache.java
index 354a9f6..1935c5f 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/Cache.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/Cache.java
@@ -19,7 +19,6 @@ package org.apache.helix.manager.zk;
* under the License.
*/
-import java.io.File;
import java.util.List;
import java.util.Map;
import java.util.Set;
http://git-wip-us.apache.org/repos/asf/helix/blob/059ab387/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultSchedulerMessageHandlerFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultSchedulerMessageHandlerFactory.java b/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultSchedulerMessageHandlerFactory.java
index 5451a81..8e4071c 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultSchedulerMessageHandlerFactory.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultSchedulerMessageHandlerFactory.java
@@ -20,9 +20,6 @@ package org.apache.helix.manager.zk;
*/
import java.io.StringReader;
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
-import java.util.Date;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
@@ -52,8 +49,8 @@ import org.apache.log4j.Logger;
import org.codehaus.jackson.map.ObjectMapper;
/*
- * The current implementation supports throttling on STATE-TRANSITION type of message, transition SCHEDULED-COMPLETED.
- *
+ * The current implementation supports throttling on STATE-TRANSITION type of message, transition SCHEDULED-COMPLETED.
+ *
*/
public class DefaultSchedulerMessageHandlerFactory implements MessageHandlerFactory {
public static final String WAIT_ALL = "WAIT_ALL";
http://git-wip-us.apache.org/repos/asf/helix/blob/059ab387/helix-core/src/main/java/org/apache/helix/manager/zk/WriteThroughCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/WriteThroughCache.java b/helix-core/src/main/java/org/apache/helix/manager/zk/WriteThroughCache.java
index 4ad0f0f..7c991ae 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/WriteThroughCache.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/WriteThroughCache.java
@@ -19,7 +19,6 @@ package org.apache.helix.manager.zk;
* under the License.
*/
-import java.io.File;
import java.util.List;
import org.I0Itec.zkclient.exception.ZkNoNodeException;
http://git-wip-us.apache.org/repos/asf/helix/blob/059ab387/helix-core/src/main/java/org/apache/helix/manager/zk/ZKUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKUtil.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKUtil.java
index 77c32af..c61dccd 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKUtil.java
@@ -20,7 +20,6 @@ package org.apache.helix.manager.zk;
*/
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
import java.util.List;
http://git-wip-us.apache.org/repos/asf/helix/blob/059ab387/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCallbackCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCallbackCache.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCallbackCache.java
index a02cedf..d1e3af3 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCallbackCache.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCallbackCache.java
@@ -19,7 +19,6 @@ package org.apache.helix.manager.zk;
* under the License.
*/
-import java.io.File;
import java.util.List;
import java.util.Map;
import java.util.Set;
http://git-wip-us.apache.org/repos/asf/helix/blob/059ab387/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java
index 7d300a5..139fedd 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java
@@ -37,7 +37,6 @@ import org.apache.helix.manager.zk.ZkAsyncCallbacks.GetDataCallbackHandler;
import org.apache.helix.manager.zk.ZkAsyncCallbacks.SetDataCallbackHandler;
import org.apache.log4j.Logger;
import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.data.Stat;
http://git-wip-us.apache.org/repos/asf/helix/blob/059ab387/helix-core/src/main/java/org/apache/helix/messaging/handling/AsyncCallbackService.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/AsyncCallbackService.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/AsyncCallbackService.java
index c218a15..46c595d 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/AsyncCallbackService.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/AsyncCallbackService.java
@@ -19,22 +19,13 @@ package org.apache.helix.messaging.handling;
* under the License.
*/
-import java.util.Collection;
-import java.util.Date;
-import java.util.Map;
-import java.util.Random;
-import java.util.Timer;
-import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.helix.HelixException;
import org.apache.helix.NotificationContext;
import org.apache.helix.messaging.AsyncCallback;
-import org.apache.helix.messaging.handling.MessageHandler.ErrorCode;
-import org.apache.helix.messaging.handling.MessageHandler.ErrorType;
import org.apache.helix.model.Message;
import org.apache.helix.model.Message.MessageType;
-import org.apache.helix.participant.HelixStateMachineEngine;
import org.apache.log4j.Logger;
public class AsyncCallbackService implements MessageHandlerFactory {
http://git-wip-us.apache.org/repos/asf/helix/blob/059ab387/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java
index d9f7ae2..cc4123a 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java
@@ -19,13 +19,7 @@ package org.apache.helix.messaging.handling;
* under the License.
*/
-import java.util.ArrayList;
import java.util.Date;
-import java.util.List;
-import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.helix.HelixDataAccessor;
@@ -33,12 +27,9 @@ import org.apache.helix.HelixManager;
import org.apache.helix.InstanceType;
import org.apache.helix.NotificationContext;
import org.apache.helix.NotificationContext.MapKey;
-import org.apache.helix.PropertyKey;
import org.apache.helix.PropertyKey.Builder;
-import org.apache.helix.messaging.handling.GroupMessageHandler.GroupMessageInfo;
import org.apache.helix.messaging.handling.MessageHandler.ErrorCode;
import org.apache.helix.messaging.handling.MessageHandler.ErrorType;
-import org.apache.helix.model.CurrentState;
import org.apache.helix.model.Message;
import org.apache.helix.model.Message.Attributes;
import org.apache.helix.model.Message.MessageType;
http://git-wip-us.apache.org/repos/asf/helix/blob/059ab387/helix-core/src/main/java/org/apache/helix/messaging/handling/MessageHandler.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/MessageHandler.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/MessageHandler.java
index 96784c2..5715571 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/MessageHandler.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/MessageHandler.java
@@ -20,8 +20,6 @@ package org.apache.helix.messaging.handling;
*/
import org.apache.helix.NotificationContext;
-import org.apache.helix.messaging.handling.MessageHandler.ErrorCode;
-import org.apache.helix.messaging.handling.MessageHandler.ErrorType;
import org.apache.helix.model.Message;
/**
http://git-wip-us.apache.org/repos/asf/helix/blob/059ab387/helix-core/src/main/java/org/apache/helix/model/HealthStat.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/HealthStat.java b/helix-core/src/main/java/org/apache/helix/model/HealthStat.java
index 715927f..b8ac32f 100644
--- a/helix-core/src/main/java/org/apache/helix/model/HealthStat.java
+++ b/helix-core/src/main/java/org/apache/helix/model/HealthStat.java
@@ -26,7 +26,6 @@ import java.util.Map;
import org.apache.helix.HelixProperty;
import org.apache.helix.ZNRecord;
import org.apache.helix.model.Message.Attributes;
-import org.apache.log4j.Logger;
/**
* Represents a set of properties that can be queried to determine the health of instances on a
http://git-wip-us.apache.org/repos/asf/helix/blob/059ab387/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterMBeanObserver.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterMBeanObserver.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterMBeanObserver.java
index 5922ea9..d66c1f6 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterMBeanObserver.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterMBeanObserver.java
@@ -22,20 +22,13 @@ package org.apache.helix.monitoring.mbeans;
import java.io.IOException;
import java.lang.management.ManagementFactory;
-import javax.management.AttributeNotFoundException;
import javax.management.InstanceNotFoundException;
-import javax.management.IntrospectionException;
-import javax.management.ListenerNotFoundException;
-import javax.management.MBeanAttributeInfo;
-import javax.management.MBeanException;
-import javax.management.MBeanInfo;
import javax.management.MBeanServerConnection;
import javax.management.MBeanServerDelegate;
import javax.management.MBeanServerNotification;
import javax.management.MalformedObjectNameException;
import javax.management.Notification;
import javax.management.NotificationListener;
-import javax.management.ReflectionException;
import javax.management.relation.MBeanServerNotificationFilter;
import org.apache.log4j.Logger;
http://git-wip-us.apache.org/repos/asf/helix/blob/059ab387/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/StateTransitionStatMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/StateTransitionStatMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/StateTransitionStatMonitor.java
index 20a4a07..0198410 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/StateTransitionStatMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/StateTransitionStatMonitor.java
@@ -22,7 +22,6 @@ package org.apache.helix.monitoring.mbeans;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
-import org.apache.commons.math.stat.descriptive.DescriptiveStatistics;
import org.apache.helix.monitoring.StatCollector;
import org.apache.helix.monitoring.StateTransitionContext;
import org.apache.helix.monitoring.StateTransitionDataPoint;
http://git-wip-us.apache.org/repos/asf/helix/blob/059ab387/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java b/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java
index 039d076..3c11cdb 100644
--- a/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java
+++ b/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java
@@ -34,7 +34,6 @@ import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.messaging.handling.BatchMessageHandler;
import org.apache.helix.messaging.handling.BatchMessageWrapper;
import org.apache.helix.messaging.handling.HelixStateTransitionHandler;
-import org.apache.helix.messaging.handling.HelixTaskExecutor;
import org.apache.helix.messaging.handling.MessageHandler;
import org.apache.helix.messaging.handling.TaskExecutor;
import org.apache.helix.model.CurrentState;
http://git-wip-us.apache.org/repos/asf/helix/blob/059ab387/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index a9a3ac0..0bd060a 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -39,6 +39,7 @@ import org.apache.commons.cli.OptionGroup;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.helix.AccessOption;
+import org.apache.helix.ConfigAccessor;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
@@ -46,9 +47,17 @@ import org.apache.helix.HelixManagerFactory;
import org.apache.helix.HelixProperty;
import org.apache.helix.InstanceType;
import org.apache.helix.PropertyKey;
+import org.apache.helix.PropertyPathConfig;
+import org.apache.helix.PropertyType;
import org.apache.helix.ZNRecord;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.manager.zk.ZkClient;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.builder.CustomModeISBuilder;
+import org.apache.helix.store.HelixPropertyStore;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.log4j.Logger;
import com.google.common.base.Joiner;
@@ -74,7 +83,9 @@ public class TaskDriver {
/** Field for specifying a workflow file when starting a job */
private static final String WORKFLOW_FILE_OPTION = "file";
- private final HelixManager _manager;
+ private final HelixDataAccessor _accessor;
+ private final ConfigAccessor _cfgAccessor;
+ private final HelixPropertyStore<ZNRecord> _propertyStore;
private final HelixAdmin _admin;
private final String _clusterName;
@@ -88,9 +99,27 @@ public class TaskDriver {
}
public TaskDriver(HelixManager manager) {
- _manager = manager;
- _clusterName = manager.getClusterName();
- _admin = manager.getClusterManagmentTool();
+ this(manager.getClusterManagmentTool(), manager.getHelixDataAccessor(), manager
+ .getConfigAccessor(), manager.getHelixPropertyStore(), manager.getClusterName());
+ }
+
+ public TaskDriver(ZkClient client, String clusterName) {
+ this(client, new ZkBaseDataAccessor<ZNRecord>(client), clusterName);
+ }
+
+ public TaskDriver(ZkClient client, ZkBaseDataAccessor<ZNRecord> baseAccessor, String clusterName) {
+ this(new ZKHelixAdmin(client), new ZKHelixDataAccessor(clusterName, baseAccessor),
+ new ConfigAccessor(client), new ZkHelixPropertyStore<ZNRecord>(baseAccessor,
+ PropertyPathConfig.getPath(PropertyType.PROPERTYSTORE, clusterName), null), clusterName);
+ }
+
+ public TaskDriver(HelixAdmin admin, HelixDataAccessor accessor, ConfigAccessor cfgAccessor,
+ HelixPropertyStore<ZNRecord> propertyStore, String clusterName) {
+ _admin = admin;
+ _accessor = accessor;
+ _cfgAccessor = cfgAccessor;
+ _propertyStore = propertyStore;
+ _clusterName = clusterName;
}
/**
@@ -173,11 +202,10 @@ public class TaskDriver {
/** Creates a new named job queue (workflow) */
public void createQueue(JobQueue queue) throws Exception {
String queueName = queue.getName();
- HelixDataAccessor accessor = _manager.getHelixDataAccessor();
HelixProperty property = new HelixProperty(queueName);
property.getRecord().getSimpleFields().putAll(queue.getResourceConfigMap());
boolean created =
- accessor.createProperty(accessor.keyBuilder().resourceConfig(queueName), property);
+ _accessor.createProperty(_accessor.keyBuilder().resourceConfig(queueName), property);
if (!created) {
throw new IllegalArgumentException("Queue " + queueName + " already exists!");
}
@@ -185,21 +213,21 @@ public class TaskDriver {
/** Flushes a named job queue */
public void flushQueue(String queueName) throws Exception {
- WorkflowConfig config = TaskUtil.getWorkflowCfg(_manager, queueName);
+ WorkflowConfig config =
+ TaskUtil.getWorkflowCfg(_cfgAccessor, _accessor, _clusterName, queueName);
if (config == null) {
throw new IllegalArgumentException("Queue does not exist!");
}
// Remove all ideal states and resource configs to trigger a drop event
- HelixDataAccessor accessor = _manager.getHelixDataAccessor();
- PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+ PropertyKey.Builder keyBuilder = _accessor.keyBuilder();
final Set<String> toRemove = Sets.newHashSet(config.getJobDag().getAllNodes());
for (String resourceName : toRemove) {
- accessor.removeProperty(keyBuilder.idealStates(resourceName));
- accessor.removeProperty(keyBuilder.resourceConfig(resourceName));
+ _accessor.removeProperty(keyBuilder.idealStates(resourceName));
+ _accessor.removeProperty(keyBuilder.resourceConfig(resourceName));
// Delete context
String contextKey = Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, resourceName);
- _manager.getHelixPropertyStore().remove(contextKey, AccessOption.PERSISTENT);
+ _propertyStore.remove(contextKey, AccessOption.PERSISTENT);
}
// Now atomically clear the DAG
@@ -227,7 +255,7 @@ public class TaskDriver {
return currentData;
}
};
- accessor.getBaseDataAccessor().update(path, updater, AccessOption.PERSISTENT);
+ _accessor.getBaseDataAccessor().update(path, updater, AccessOption.PERSISTENT);
// Now atomically clear the results
path =
@@ -243,16 +271,15 @@ public class TaskDriver {
return currentData;
}
};
- _manager.getHelixPropertyStore().update(path, updater, AccessOption.PERSISTENT);
+ _propertyStore.update(path, updater, AccessOption.PERSISTENT);
}
/** Adds a new job to the end an existing named queue */
public void enqueueJob(final String queueName, final String jobName, JobConfig.Builder jobBuilder)
throws Exception {
// Get the job queue config and capacity
- HelixDataAccessor accessor = _manager.getHelixDataAccessor();
HelixProperty workflowConfig =
- accessor.getProperty(accessor.keyBuilder().resourceConfig(queueName));
+ _accessor.getProperty(_accessor.keyBuilder().resourceConfig(queueName));
if (workflowConfig == null) {
throw new IllegalArgumentException("Queue " + queueName + " does not yet exist!");
}
@@ -307,8 +334,8 @@ public class TaskDriver {
return currentData;
}
};
- String path = accessor.keyBuilder().resourceConfig(queueName).getPath();
- boolean status = accessor.getBaseDataAccessor().update(path, updater, AccessOption.PERSISTENT);
+ String path = _accessor.keyBuilder().resourceConfig(queueName).getPath();
+ boolean status = _accessor.getBaseDataAccessor().update(path, updater, AccessOption.PERSISTENT);
if (!status) {
throw new IllegalArgumentException("Could not enqueue job");
}
@@ -327,8 +354,7 @@ public class TaskDriver {
_admin.addResource(_clusterName, jobResource, numPartitions, TaskConstants.STATE_MODEL_NAME);
// Set the job configuration
- HelixDataAccessor accessor = _manager.getHelixDataAccessor();
- PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+ PropertyKey.Builder keyBuilder = _accessor.keyBuilder();
HelixProperty resourceConfig = new HelixProperty(jobResource);
resourceConfig.getRecord().getSimpleFields().putAll(jobConfig.getResourceConfigMap());
Map<String, TaskConfig> taskConfigMap = jobConfig.getTaskConfigMap();
@@ -337,7 +363,7 @@ public class TaskDriver {
resourceConfig.getRecord().setMapField(taskConfig.getId(), taskConfig.getConfigMap());
}
}
- accessor.setProperty(keyBuilder.resourceConfig(jobResource), resourceConfig);
+ _accessor.setProperty(keyBuilder.resourceConfig(jobResource), resourceConfig);
// Push out new ideal state based on number of target partitions
CustomModeISBuilder builder = new CustomModeISBuilder(jobResource);
@@ -378,8 +404,7 @@ public class TaskDriver {
setSingleWorkflowTargetState(workflowName, state);
// For recurring schedules, child workflows must also be handled
- HelixDataAccessor accessor = _manager.getHelixDataAccessor();
- List<String> resources = accessor.getChildNames(accessor.keyBuilder().resourceConfigs());
+ List<String> resources = _accessor.getChildNames(_accessor.keyBuilder().resourceConfigs());
String prefix = workflowName + "_" + TaskConstants.SCHEDULED;
for (String resource : resources) {
if (resource.startsWith(prefix)) {
@@ -390,7 +415,6 @@ public class TaskDriver {
/** Helper function to change target state for a given workflow */
private void setSingleWorkflowTargetState(String workflowName, final TargetState state) {
- HelixDataAccessor accessor = _manager.getHelixDataAccessor();
DataUpdater<ZNRecord> updater = new DataUpdater<ZNRecord>() {
@Override
public ZNRecord update(ZNRecord currentData) {
@@ -405,18 +429,18 @@ public class TaskDriver {
List<DataUpdater<ZNRecord>> updaters = Lists.newArrayList();
updaters.add(updater);
List<String> paths = Lists.newArrayList();
- paths.add(accessor.keyBuilder().resourceConfig(workflowName).getPath());
- accessor.updateChildren(paths, updaters, AccessOption.PERSISTENT);
+ paths.add(_accessor.keyBuilder().resourceConfig(workflowName).getPath());
+ _accessor.updateChildren(paths, updaters, AccessOption.PERSISTENT);
invokeRebalance();
}
public void list(String resource) {
- WorkflowConfig wCfg = TaskUtil.getWorkflowCfg(_manager, resource);
+ WorkflowConfig wCfg = TaskUtil.getWorkflowCfg(_cfgAccessor, _accessor, _clusterName, resource);
if (wCfg == null) {
LOG.error("Workflow " + resource + " does not exist!");
return;
}
- WorkflowContext wCtx = TaskUtil.getWorkflowContext(_manager, resource);
+ WorkflowContext wCtx = TaskUtil.getWorkflowContext(_propertyStore, resource);
LOG.info("Workflow " + resource + " consists of the following tasks: "
+ wCfg.getJobDag().getAllNodes());
@@ -430,8 +454,8 @@ public class TaskDriver {
LOG.info("Job " + job + " is " + jobState);
// fetch job information
- JobConfig jCfg = TaskUtil.getJobCfg(_manager, job);
- JobContext jCtx = TaskUtil.getJobContext(_manager, job);
+ JobConfig jCfg = TaskUtil.getJobCfg(_accessor, job);
+ JobContext jCtx = TaskUtil.getJobContext(_propertyStore, job);
if (jCfg == null || jCtx == null) {
LOG.info("-------");
continue;
@@ -472,8 +496,7 @@ public class TaskDriver {
for (String resource : _admin.getResourcesInCluster(_clusterName)) {
IdealState is = _admin.getResourceIdealState(_clusterName, resource);
if (is.getStateModelDefRef().equals(TaskConstants.STATE_MODEL_NAME)) {
- HelixDataAccessor accessor = _manager.getHelixDataAccessor();
- accessor.updateProperty(accessor.keyBuilder().idealStates(resource), is);
+ _accessor.updateProperty(_accessor.keyBuilder().idealStates(resource), is);
break;
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/059ab387/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
index 17388af..4f6afe0 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
@@ -41,6 +41,7 @@ import org.apache.helix.model.HelixConfigScope;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.ResourceAssignment;
import org.apache.helix.model.builder.HelixConfigScopeBuilder;
+import org.apache.helix.store.HelixPropertyStore;
import org.apache.log4j.Logger;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.type.TypeReference;
@@ -59,13 +60,13 @@ public class TaskUtil {
/**
* Parses job resource configurations in Helix into a {@link JobConfig} object.
- * @param manager HelixManager object used to connect to Helix.
- * @param jobResource The name of the job resource.
+ * @param accessor Accessor to access Helix configs
+ * @param jobResource The name of the job resource
* @return A {@link JobConfig} object if Helix contains valid configurations for the job, null
* otherwise.
*/
- public static JobConfig getJobCfg(HelixManager manager, String jobResource) {
- HelixProperty jobResourceConfig = getResourceConfig(manager, jobResource);
+ public static JobConfig getJobCfg(HelixDataAccessor accessor, String jobResource) {
+ HelixProperty jobResourceConfig = getResourceConfig(accessor, jobResource);
if (jobResourceConfig == null) {
return null;
}
@@ -83,14 +84,29 @@ public class TaskUtil {
}
/**
+ * Parses job resource configurations in Helix into a {@link JobConfig} object.
+ * @param manager HelixManager object used to connect to Helix.
+ * @param jobResource The name of the job resource.
+ * @return A {@link JobConfig} object if Helix contains valid configurations for the job, null
+ * otherwise.
+ */
+ public static JobConfig getJobCfg(HelixManager manager, String jobResource) {
+ return getJobCfg(manager.getHelixDataAccessor(), jobResource);
+ }
+
+ /**
* Parses workflow resource configurations in Helix into a {@link WorkflowConfig} object.
- * @param manager Helix manager object used to connect to Helix.
+ * @param cfgAccessor Config accessor to access Helix configs
+ * @param accessor Accessor to access Helix configs
+ * @param clusterName Cluster name
* @param workflowResource The name of the workflow resource.
* @return A {@link WorkflowConfig} object if Helix contains valid configurations for the
* workflow, null otherwise.
*/
- public static WorkflowConfig getWorkflowCfg(HelixManager manager, String workflowResource) {
- Map<String, String> workflowCfg = getResourceConfigMap(manager, workflowResource);
+ public static WorkflowConfig getWorkflowCfg(ConfigAccessor cfgAccessor,
+ HelixDataAccessor accessor, String clusterName, String workflowResource) {
+ Map<String, String> workflowCfg =
+ getResourceConfigMap(cfgAccessor, accessor, clusterName, workflowResource);
if (workflowCfg == null) {
return null;
}
@@ -100,6 +116,18 @@ public class TaskUtil {
}
/**
+ * Parses workflow resource configurations in Helix into a {@link WorkflowConfig} object.
+ * @param manager Helix manager object used to connect to Helix.
+ * @param workflowResource The name of the workflow resource.
+ * @return A {@link WorkflowConfig} object if Helix contains valid configurations for the
+ * workflow, null otherwise.
+ */
+ public static WorkflowConfig getWorkflowCfg(HelixManager manager, String workflowResource) {
+ return getWorkflowCfg(manager.getConfigAccessor(), manager.getHelixDataAccessor(),
+ manager.getClusterName(), workflowResource);
+ }
+
+ /**
* Request a state change for a specific task.
* @param accessor connected Helix data accessor
* @param instance the instance serving the task
@@ -168,19 +196,30 @@ public class TaskUtil {
/**
* Get the runtime context of a single job
- * @param manager a connection to Helix
- * @param jobResource the name of the job
+ * @param propertyStore Property store for the cluster
+ * @param jobResource The name of the job
* @return the {@link JobContext}, or null if none is available
*/
- public static JobContext getJobContext(HelixManager manager, String jobResource) {
+ public static JobContext getJobContext(HelixPropertyStore<ZNRecord> propertyStore,
+ String jobResource) {
ZNRecord r =
- manager.getHelixPropertyStore().get(
+ propertyStore.get(
Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, jobResource, CONTEXT_NODE),
null, AccessOption.PERSISTENT);
return r != null ? new JobContext(r) : null;
}
/**
+ * Get the runtime context of a single job
+ * @param manager a connection to Helix
+ * @param jobResource the name of the job
+ * @return the {@link JobContext}, or null if none is available
+ */
+ public static JobContext getJobContext(HelixManager manager, String jobResource) {
+ return getJobContext(manager.getHelixPropertyStore(), jobResource);
+ }
+
+ /**
* Set the runtime context of a single job
* @param manager a connection to Helix
* @param jobResource the name of the job
@@ -193,21 +232,32 @@ public class TaskUtil {
}
/**
- * Get the rumtime context of a single workflow
- * @param manager a connection to Helix
- * @param workflowResource the name of the workflow
+ * Get the runtime context of a single workflow
+ * @param propertyStore Property store of the cluster
+ * @param workflowResource The name of the workflow
* @return the {@link WorkflowContext}, or null if none is available
*/
- public static WorkflowContext getWorkflowContext(HelixManager manager, String workflowResource) {
+ public static WorkflowContext getWorkflowContext(HelixPropertyStore<ZNRecord> propertyStore,
+ String workflowResource) {
ZNRecord r =
- manager.getHelixPropertyStore().get(
+ propertyStore.get(
Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, workflowResource,
CONTEXT_NODE), null, AccessOption.PERSISTENT);
return r != null ? new WorkflowContext(r) : null;
}
/**
- * Set the rumtime context of a single workflow
+ * Get the runtime context of a single workflow
+ * @param manager a connection to Helix
+ * @param workflowResource the name of the workflow
+ * @return the {@link WorkflowContext}, or null if none is available
+ */
+ public static WorkflowContext getWorkflowContext(HelixManager manager, String workflowResource) {
+ return getWorkflowContext(manager.getHelixPropertyStore(), workflowResource);
+ }
+
+ /**
+ * Set the runtime context of a single workflow
* @param manager a connection to Helix
* @param workflowResource the name of the workflow
* @param ctx the up-to-date {@link WorkflowContext} for the workflow
@@ -403,25 +453,19 @@ public class TaskUtil {
return builder.build();
}
- private static Map<String, String> getResourceConfigMap(HelixManager manager, String resource) {
- HelixConfigScope scope = getResourceConfigScope(manager.getClusterName(), resource);
- ConfigAccessor configAccessor = manager.getConfigAccessor();
+ private static Map<String, String> getResourceConfigMap(ConfigAccessor cfgAccessor,
+ HelixDataAccessor accessor, String clusterName, String resource) {
+ HelixConfigScope scope = getResourceConfigScope(clusterName, resource);
- Map<String, String> taskCfg = new HashMap<String, String>();
- List<String> cfgKeys = configAccessor.getKeys(scope);
+ List<String> cfgKeys = cfgAccessor.getKeys(scope);
if (cfgKeys == null || cfgKeys.isEmpty()) {
return null;
}
- for (String cfgKey : cfgKeys) {
- taskCfg.put(cfgKey, configAccessor.get(scope, cfgKey));
- }
-
- return getResourceConfig(manager, resource).getRecord().getSimpleFields();
+ return getResourceConfig(accessor, resource).getRecord().getSimpleFields();
}
- private static HelixProperty getResourceConfig(HelixManager manager, String resource) {
- HelixDataAccessor accessor = manager.getHelixDataAccessor();
+ private static HelixProperty getResourceConfig(HelixDataAccessor accessor, String resource) {
PropertyKey.Builder keyBuilder = accessor.keyBuilder();
return accessor.getProperty(keyBuilder.resourceConfig(resource));
}
http://git-wip-us.apache.org/repos/asf/helix/blob/059ab387/helix-core/src/main/java/org/apache/helix/task/Workflow.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/Workflow.java b/helix-core/src/main/java/org/apache/helix/task/Workflow.java
index 2356c7f..1c0ef40 100644
--- a/helix-core/src/main/java/org/apache/helix/task/Workflow.java
+++ b/helix-core/src/main/java/org/apache/helix/task/Workflow.java
@@ -100,7 +100,7 @@ public class Workflow {
* the following
* form:
* <p/>
- *
+ *
* <pre>
* name: MyFlow
* jobs:
@@ -132,47 +132,49 @@ public class Workflow {
WorkflowBean wf = (WorkflowBean) yaml.load(reader);
Builder builder = new Builder(wf.name);
- for (JobBean job : wf.jobs) {
- if (job.name == null) {
- throw new IllegalArgumentException("A job must have a name.");
- }
+ if (wf != null) {
+ for (JobBean job : wf.jobs) {
+ if (job.name == null) {
+ throw new IllegalArgumentException("A job must have a name.");
+ }
- if (job.parents != null) {
- for (String parent : job.parents) {
- builder.addParentChildDependency(parent, job.name);
+ if (job.parents != null) {
+ for (String parent : job.parents) {
+ builder.addParentChildDependency(parent, job.name);
+ }
}
- }
- builder.addConfig(job.name, JobConfig.WORKFLOW_ID, wf.name);
- builder.addConfig(job.name, JobConfig.COMMAND, job.command);
- if (job.jobConfigMap != null) {
- builder.addJobCommandConfigMap(job.name, job.jobConfigMap);
- }
- builder.addConfig(job.name, JobConfig.TARGET_RESOURCE, job.targetResource);
- if (job.targetPartitionStates != null) {
- builder.addConfig(job.name, JobConfig.TARGET_PARTITION_STATES,
- Joiner.on(",").join(job.targetPartitionStates));
- }
- if (job.targetPartitions != null) {
- builder.addConfig(job.name, JobConfig.TARGET_PARTITIONS,
- Joiner.on(",").join(job.targetPartitions));
- }
- builder.addConfig(job.name, JobConfig.MAX_ATTEMPTS_PER_TASK,
- String.valueOf(job.maxAttemptsPerTask));
- builder.addConfig(job.name, JobConfig.MAX_FORCED_REASSIGNMENTS_PER_TASK,
- String.valueOf(job.maxForcedReassignmentsPerTask));
- builder.addConfig(job.name, JobConfig.NUM_CONCURRENT_TASKS_PER_INSTANCE,
- String.valueOf(job.numConcurrentTasksPerInstance));
- builder.addConfig(job.name, JobConfig.TIMEOUT_PER_TASK,
- String.valueOf(job.timeoutPerPartition));
- builder
- .addConfig(job.name, JobConfig.FAILURE_THRESHOLD, String.valueOf(job.failureThreshold));
- if (job.tasks != null) {
- List<TaskConfig> taskConfigs = Lists.newArrayList();
- for (TaskBean task : job.tasks) {
- taskConfigs.add(TaskConfig.from(task));
+ builder.addConfig(job.name, JobConfig.WORKFLOW_ID, wf.name);
+ builder.addConfig(job.name, JobConfig.COMMAND, job.command);
+ if (job.jobConfigMap != null) {
+ builder.addJobCommandConfigMap(job.name, job.jobConfigMap);
+ }
+ builder.addConfig(job.name, JobConfig.TARGET_RESOURCE, job.targetResource);
+ if (job.targetPartitionStates != null) {
+ builder.addConfig(job.name, JobConfig.TARGET_PARTITION_STATES,
+ Joiner.on(",").join(job.targetPartitionStates));
+ }
+ if (job.targetPartitions != null) {
+ builder.addConfig(job.name, JobConfig.TARGET_PARTITIONS,
+ Joiner.on(",").join(job.targetPartitions));
+ }
+ builder.addConfig(job.name, JobConfig.MAX_ATTEMPTS_PER_TASK,
+ String.valueOf(job.maxAttemptsPerTask));
+ builder.addConfig(job.name, JobConfig.MAX_FORCED_REASSIGNMENTS_PER_TASK,
+ String.valueOf(job.maxForcedReassignmentsPerTask));
+ builder.addConfig(job.name, JobConfig.NUM_CONCURRENT_TASKS_PER_INSTANCE,
+ String.valueOf(job.numConcurrentTasksPerInstance));
+ builder.addConfig(job.name, JobConfig.TIMEOUT_PER_TASK,
+ String.valueOf(job.timeoutPerPartition));
+ builder.addConfig(job.name, JobConfig.FAILURE_THRESHOLD,
+ String.valueOf(job.failureThreshold));
+ if (job.tasks != null) {
+ List<TaskConfig> taskConfigs = Lists.newArrayList();
+ for (TaskBean task : job.tasks) {
+ taskConfigs.add(TaskConfig.from(task));
+ }
+ builder.addTaskConfigs(job.name, taskConfigs);
}
- builder.addTaskConfigs(job.name, taskConfigs);
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/059ab387/helix-core/src/main/java/org/apache/helix/tools/CLMLogFileAppender.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/CLMLogFileAppender.java b/helix-core/src/main/java/org/apache/helix/tools/CLMLogFileAppender.java
index 326607f..4f59925 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/CLMLogFileAppender.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/CLMLogFileAppender.java
@@ -19,14 +19,9 @@ package org.apache.helix.tools;
* under the License.
*/
-import java.io.File;
import java.io.IOException;
-import java.lang.management.ManagementFactory;
-import java.lang.management.RuntimeMXBean;
import java.text.SimpleDateFormat;
import java.util.Calendar;
-import java.util.Date;
-import java.util.List;
import org.apache.log4j.FileAppender;
import org.apache.log4j.Layout;
http://git-wip-us.apache.org/repos/asf/helix/blob/059ab387/helix-core/src/main/java/org/apache/helix/util/ZKClientPool.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/util/ZKClientPool.java b/helix-core/src/main/java/org/apache/helix/util/ZKClientPool.java
index 0c45020..0980e48 100644
--- a/helix-core/src/main/java/org/apache/helix/util/ZKClientPool.java
+++ b/helix-core/src/main/java/org/apache/helix/util/ZKClientPool.java
@@ -22,7 +22,6 @@ package org.apache.helix.util;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import org.apache.helix.HelixException;
import org.apache.helix.manager.zk.ZNRecordSerializer;
import org.apache.helix.manager.zk.ZkClient;
import org.apache.zookeeper.ZooKeeper.States;
http://git-wip-us.apache.org/repos/asf/helix/blob/059ab387/helix-core/src/test/java/org/apache/helix/TestConfigAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/TestConfigAccessor.java b/helix-core/src/test/java/org/apache/helix/TestConfigAccessor.java
index c8514ef..d58389d 100644
--- a/helix-core/src/test/java/org/apache/helix/TestConfigAccessor.java
+++ b/helix-core/src/test/java/org/apache/helix/TestConfigAccessor.java
@@ -19,7 +19,6 @@ package org.apache.helix;
* under the License.
*/
-import java.util.Arrays;
import java.util.Date;
import java.util.List;
http://git-wip-us.apache.org/repos/asf/helix/blob/059ab387/helix-core/src/test/java/org/apache/helix/TestRelayIdealStateCalculator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/TestRelayIdealStateCalculator.java b/helix-core/src/test/java/org/apache/helix/TestRelayIdealStateCalculator.java
index ca58659..f81ba67 100644
--- a/helix-core/src/test/java/org/apache/helix/TestRelayIdealStateCalculator.java
+++ b/helix-core/src/test/java/org/apache/helix/TestRelayIdealStateCalculator.java
@@ -20,7 +20,6 @@ package org.apache.helix;
*/
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
http://git-wip-us.apache.org/repos/asf/helix/blob/059ab387/helix-core/src/test/java/org/apache/helix/ZkTestHelper.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/ZkTestHelper.java b/helix-core/src/test/java/org/apache/helix/ZkTestHelper.java
index b3f6f75..d6d0ff3 100644
--- a/helix-core/src/test/java/org/apache/helix/ZkTestHelper.java
+++ b/helix-core/src/test/java/org/apache/helix/ZkTestHelper.java
@@ -31,12 +31,9 @@ import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.IZkStateListener;
import org.I0Itec.zkclient.ZkConnection;
-import org.apache.helix.InstanceType;
import org.apache.helix.ZNRecord;
import org.apache.helix.PropertyKey.Builder;
-import org.apache.helix.manager.zk.CallbackHandler;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
-import org.apache.helix.manager.zk.ZKHelixManager;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
import org.apache.helix.manager.zk.ZkClient;
import org.apache.helix.model.ExternalView;
http://git-wip-us.apache.org/repos/asf/helix/blob/059ab387/helix-core/src/test/java/org/apache/helix/integration/TestCustomIdealState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestCustomIdealState.java b/helix-core/src/test/java/org/apache/helix/integration/TestCustomIdealState.java
index 5467932..941ebd5 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestCustomIdealState.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestCustomIdealState.java
@@ -22,12 +22,8 @@ package org.apache.helix.integration;
import java.util.Date;
import org.apache.helix.TestHelper;
-import org.apache.helix.manager.zk.ZNRecordSerializer;
-import org.apache.helix.manager.zk.ZkClient;
import org.apache.helix.tools.ClusterSetup;
import org.apache.log4j.Logger;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
public class TestCustomIdealState extends ZkIntegrationTestBase {
http://git-wip-us.apache.org/repos/asf/helix/blob/059ab387/helix-core/src/test/java/org/apache/helix/integration/TestDistributedCMMain.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestDistributedCMMain.java b/helix-core/src/test/java/org/apache/helix/integration/TestDistributedCMMain.java
index e3b0cc7..66715b8 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestDistributedCMMain.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestDistributedCMMain.java
@@ -24,7 +24,6 @@ import java.util.Date;
import org.apache.helix.TestHelper;
import org.apache.helix.ZNRecord;
import org.apache.helix.PropertyKey.Builder;
-import org.apache.helix.controller.HelixControllerMain;
import org.apache.helix.integration.manager.ClusterDistributedController;
import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
http://git-wip-us.apache.org/repos/asf/helix/blob/059ab387/helix-core/src/test/java/org/apache/helix/integration/TestDistributedClusterController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestDistributedClusterController.java b/helix-core/src/test/java/org/apache/helix/integration/TestDistributedClusterController.java
index 3cf4ed5..ecf0d8f 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestDistributedClusterController.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestDistributedClusterController.java
@@ -24,7 +24,6 @@ import java.util.Date;
import org.apache.helix.TestHelper;
import org.apache.helix.ZNRecord;
import org.apache.helix.PropertyKey.Builder;
-import org.apache.helix.controller.HelixControllerMain;
import org.apache.helix.integration.manager.ClusterDistributedController;
import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
http://git-wip-us.apache.org/repos/asf/helix/blob/059ab387/helix-core/src/test/java/org/apache/helix/integration/TestDriver.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestDriver.java b/helix-core/src/test/java/org/apache/helix/integration/TestDriver.java
index 4dad7c0..82b181a 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestDriver.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestDriver.java
@@ -21,19 +21,15 @@ package org.apache.helix.integration;
import java.util.ArrayList;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.helix.HelixManager;
import org.apache.helix.PropertyPathConfig;
import org.apache.helix.PropertyType;
-import org.apache.helix.TestHelper;
import org.apache.helix.ZNRecord;
-import org.apache.helix.controller.HelixControllerMain;
import org.apache.helix.integration.manager.ClusterControllerManager;
import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.manager.zk.ZNRecordSerializer;
@@ -50,7 +46,6 @@ import org.apache.helix.tools.TestExecutor;
import org.apache.helix.tools.TestTrigger;
import org.apache.helix.tools.ZnodeOpArg;
import org.apache.helix.tools.TestCommand.CommandType;
-import org.apache.helix.tools.TestCommand.NodeOpArg;
import org.apache.helix.tools.TestExecutor.ZnodePropertyType;
import org.apache.log4j.Logger;
import org.testng.Assert;
http://git-wip-us.apache.org/repos/asf/helix/blob/059ab387/helix-core/src/test/java/org/apache/helix/integration/TestNonOfflineInitState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestNonOfflineInitState.java b/helix-core/src/test/java/org/apache/helix/integration/TestNonOfflineInitState.java
index 734e2b4..aa48c90 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestNonOfflineInitState.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestNonOfflineInitState.java
@@ -24,7 +24,6 @@ import java.util.Date;
import org.apache.helix.TestHelper;
import org.apache.helix.integration.manager.ClusterControllerManager;
import org.apache.helix.integration.manager.MockParticipantManager;
-import org.apache.helix.manager.zk.ZkClient;
import org.apache.helix.mock.participant.MockBootstrapModelFactory;
import org.apache.helix.participant.StateMachineEngine;
import org.apache.helix.tools.ClusterSetup;
http://git-wip-us.apache.org/repos/asf/helix/blob/059ab387/helix-core/src/test/java/org/apache/helix/integration/TestSessionExpiryInTransition.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestSessionExpiryInTransition.java b/helix-core/src/test/java/org/apache/helix/integration/TestSessionExpiryInTransition.java
index 965b8ef..d3a9b92 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestSessionExpiryInTransition.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestSessionExpiryInTransition.java
@@ -22,7 +22,6 @@ package org.apache.helix.integration;
import java.util.Date;
import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.helix.InstanceType;
import org.apache.helix.NotificationContext;
import org.apache.helix.TestHelper;
import org.apache.helix.ZkTestHelper;
@@ -32,7 +31,6 @@ import org.apache.helix.mock.participant.MockTransition;
import org.apache.helix.model.Message;
import org.apache.helix.tools.ClusterStateVerifier;
import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
-import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.testng.Assert;
import org.testng.annotations.Test;
http://git-wip-us.apache.org/repos/asf/helix/blob/059ab387/helix-core/src/test/java/org/apache/helix/integration/TestStandAloneCMSessionExpiry.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestStandAloneCMSessionExpiry.java b/helix-core/src/test/java/org/apache/helix/integration/TestStandAloneCMSessionExpiry.java
index e8adf03..dad998d 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestStandAloneCMSessionExpiry.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestStandAloneCMSessionExpiry.java
@@ -21,7 +21,6 @@ package org.apache.helix.integration;
import java.util.Date;
-import org.apache.helix.InstanceType;
import org.apache.helix.TestHelper;
import org.apache.helix.ZkTestHelper;
import org.apache.helix.integration.manager.ClusterControllerManager;
http://git-wip-us.apache.org/repos/asf/helix/blob/059ab387/helix-core/src/test/java/org/apache/helix/integration/manager/TestParticipantManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/TestParticipantManager.java b/helix-core/src/test/java/org/apache/helix/integration/manager/TestParticipantManager.java
index 73945fe..b35bd13 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/manager/TestParticipantManager.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/manager/TestParticipantManager.java
@@ -43,7 +43,6 @@ import org.apache.helix.mock.participant.MockTransition;
import org.apache.helix.model.Message;
import org.apache.helix.tools.ClusterStateVerifier;
import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
-import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.testng.Assert;
import org.testng.annotations.Test;
http://git-wip-us.apache.org/repos/asf/helix/blob/059ab387/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkCacheSyncOpSingleThread.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkCacheSyncOpSingleThread.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkCacheSyncOpSingleThread.java
index 251234f..3c0f4bf 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkCacheSyncOpSingleThread.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkCacheSyncOpSingleThread.java
@@ -26,7 +26,6 @@ import java.util.Date;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.helix.AccessOption;
http://git-wip-us.apache.org/repos/asf/helix/blob/059ab387/helix-core/src/test/java/org/apache/helix/messaging/TestAsyncCallbackSvc.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/messaging/TestAsyncCallbackSvc.java b/helix-core/src/test/java/org/apache/helix/messaging/TestAsyncCallbackSvc.java
index 2be955f..38bd43d 100644
--- a/helix-core/src/test/java/org/apache/helix/messaging/TestAsyncCallbackSvc.java
+++ b/helix-core/src/test/java/org/apache/helix/messaging/TestAsyncCallbackSvc.java
@@ -26,7 +26,6 @@ import org.apache.helix.NotificationContext;
import org.apache.helix.messaging.AsyncCallback;
import org.apache.helix.messaging.handling.AsyncCallbackService;
import org.apache.helix.messaging.handling.MessageHandler;
-import org.apache.helix.messaging.handling.TestHelixTaskExecutor.MockClusterManager;
import org.apache.helix.model.Message;
import org.testng.annotations.Test;
import org.testng.AssertJUnit;
@@ -37,9 +36,6 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
public class TestAsyncCallbackSvc {
class MockHelixManager extends Mocks.MockManager {
public String getSessionId() {