You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2014/07/11 19:13:15 UTC
[1/7] git commit: [HELIX-455] Add REST API for submitting jobs
Repository: helix
Updated Branches:
refs/heads/master 713586c42 -> 6d30c9c58
[HELIX-455] Add REST API for submitting jobs
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/208f1fa5
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/208f1fa5
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/208f1fa5
Branch: refs/heads/master
Commit: 208f1fa5211774778fbe2f7ba2254b7c91eaa04c
Parents: 713586c
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Fri Jun 27 11:39:39 2014 -0700
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Thu Jul 10 10:31:06 2014 -0700
----------------------------------------------------------------------
.../helix/webapp/RestAdminApplication.java | 2 +
.../webapp/resources/WorkflowsResource.java | 138 +++++++++++++++++++
2 files changed, 140 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/208f1fa5/helix-admin-webapp/src/main/java/org/apache/helix/webapp/RestAdminApplication.java
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/RestAdminApplication.java b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/RestAdminApplication.java
index 0940c39..7e7a3b9 100644
--- a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/RestAdminApplication.java
+++ b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/RestAdminApplication.java
@@ -50,6 +50,7 @@ import org.apache.helix.webapp.resources.StateModelResource;
import org.apache.helix.webapp.resources.StateModelsResource;
import org.apache.helix.webapp.resources.StatusUpdateResource;
import org.apache.helix.webapp.resources.StatusUpdatesResource;
+import org.apache.helix.webapp.resources.WorkflowsResource;
import org.apache.helix.webapp.resources.ZkChildResource;
import org.apache.helix.webapp.resources.ZkPathResource;
import org.restlet.Application;
@@ -90,6 +91,7 @@ public class RestAdminApplication extends Application {
router.attach("/clusters/{clusterName}/resourceGroups", ResourceGroupsResource.class);
router.attach("/clusters/{clusterName}/resourceGroups/{resourceName}",
ResourceGroupResource.class);
+ router.attach("/clusters/{clusterName}/workflows", WorkflowsResource.class);
router.attach("/clusters/{clusterName}/instances", InstancesResource.class);
router.attach("/clusters/{clusterName}/instances/{instanceName}", InstanceResource.class);
router.attach("/clusters/{clusterName}/instances/{instanceName}/currentState/{resourceName}",
http://git-wip-us.apache.org/repos/asf/helix/blob/208f1fa5/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/WorkflowsResource.java
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/WorkflowsResource.java b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/WorkflowsResource.java
new file mode 100644
index 0000000..f09155b
--- /dev/null
+++ b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/WorkflowsResource.java
@@ -0,0 +1,138 @@
+package org.apache.helix.webapp.resources;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.HelixProperty;
+import org.apache.helix.InstanceType;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.task.TaskDriver;
+import org.apache.helix.task.Workflow;
+import org.apache.helix.task.WorkflowConfig;
+import org.apache.helix.webapp.RestAdminApplication;
+import org.apache.log4j.Logger;
+import org.codehaus.jackson.JsonGenerationException;
+import org.codehaus.jackson.map.JsonMappingException;
+import org.restlet.data.Form;
+import org.restlet.data.MediaType;
+import org.restlet.data.Parameter;
+import org.restlet.data.Status;
+import org.restlet.representation.Representation;
+import org.restlet.representation.StringRepresentation;
+import org.restlet.representation.Variant;
+import org.restlet.resource.ServerResource;
+
+import com.google.common.collect.Lists;
+
+public class WorkflowsResource extends ServerResource {
+ private final static Logger LOG = Logger.getLogger(WorkflowsResource.class);
+
+ public WorkflowsResource() {
+ getVariants().add(new Variant(MediaType.TEXT_PLAIN));
+ getVariants().add(new Variant(MediaType.APPLICATION_JSON));
+ setNegotiated(false);
+ }
+
+ @Override
+ public Representation get() {
+ StringRepresentation presentation = null;
+ try {
+ String clusterName = (String) getRequest().getAttributes().get("clusterName");
+ presentation = getHostedEntitiesRepresentation(clusterName);
+ }
+
+ catch (Exception e) {
+ String error = ClusterRepresentationUtil.getErrorAsJsonStringFromException(e);
+ presentation = new StringRepresentation(error, MediaType.APPLICATION_JSON);
+
+ LOG.error("", e);
+ }
+ return presentation;
+ }
+
+ StringRepresentation getHostedEntitiesRepresentation(String clusterName)
+ throws JsonGenerationException, JsonMappingException, IOException {
+ // Get all resources
+ ZkClient zkClient = (ZkClient) getContext().getAttributes().get(RestAdminApplication.ZKCLIENT);
+ HelixDataAccessor accessor =
+ ClusterRepresentationUtil.getClusterDataAccessor(zkClient, clusterName);
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+ Map<String, HelixProperty> resourceConfigMap =
+ accessor.getChildValuesMap(keyBuilder.resourceConfigs());
+
+ // Create the result
+ ZNRecord hostedEntitiesRecord = new ZNRecord("Workflows");
+
+ // Filter out non-workflow resources
+ Iterator<Map.Entry<String, HelixProperty>> it = resourceConfigMap.entrySet().iterator();
+ while (it.hasNext()) {
+ Map.Entry<String, HelixProperty> e = it.next();
+ HelixProperty resource = e.getValue();
+ Map<String, String> simpleFields = resource.getRecord().getSimpleFields();
+ if (!simpleFields.containsKey(WorkflowConfig.TARGET_STATE)
+ || !simpleFields.containsKey(WorkflowConfig.DAG)) {
+ it.remove();
+ }
+ }
+
+ // Populate the result
+ List<String> allResources = Lists.newArrayList(resourceConfigMap.keySet());
+ hostedEntitiesRecord.setListField("WorkflowList", allResources);
+
+ StringRepresentation representation =
+ new StringRepresentation(ClusterRepresentationUtil.ZNRecordToJson(hostedEntitiesRecord),
+ MediaType.APPLICATION_JSON);
+
+ return representation;
+ }
+
+ @Override
+ public Representation post(Representation entity) {
+ try {
+ String clusterName = (String) getRequest().getAttributes().get("clusterName");
+ Form form = new Form(entity);
+
+ // Get the workflow and submit it
+ if (form.size() < 1) {
+ throw new HelixException("yaml workflow is required!");
+ }
+ Parameter payload = form.get(0);
+ String yamlPayload = payload.getName();
+ if (yamlPayload == null) {
+ throw new HelixException("yaml workflow is required!");
+ }
+ String zkAddr =
+ (String) getContext().getAttributes().get(RestAdminApplication.ZKSERVERADDRESS);
+ HelixManager manager =
+ HelixManagerFactory.getZKHelixManager(clusterName, null, InstanceType.ADMINISTRATOR,
+ zkAddr);
+ manager.connect();
+ try {
+ Workflow workflow = Workflow.parse(yamlPayload);
+ TaskDriver driver = new TaskDriver(manager);
+ driver.start(workflow);
+ } finally {
+ manager.disconnect();
+ }
+
+ getResponse().setEntity(getHostedEntitiesRepresentation(clusterName));
+ getResponse().setStatus(Status.SUCCESS_OK);
+ }
+
+ catch (Exception e) {
+ getResponse().setEntity(ClusterRepresentationUtil.getErrorAsJsonStringFromException(e),
+ MediaType.APPLICATION_JSON);
+ getResponse().setStatus(Status.SUCCESS_OK);
+ LOG.error("Error in posting " + entity, e);
+ }
+ return null;
+ }
+}
[4/7] git commit: [HELIX-446] Remove ZkPropertyTransfer and restlet
dependency from helix-core
Posted by ka...@apache.org.
[HELIX-446] Remove ZkPropertyTransfer and restlet dependency from helix-core
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/85277291
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/85277291
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/85277291
Branch: refs/heads/master
Commit: 85277291607154f6d63ce0a8e401e221e55cc95c
Parents: e914edb
Author: zzhang <zz...@apache.org>
Authored: Tue May 20 17:56:01 2014 -0700
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Thu Jul 10 11:32:43 2014 -0700
----------------------------------------------------------------------
helix-core/pom.xml | 7 +-
.../java/org/apache/helix/PropertyType.java | 12 -
.../helix/controller/HelixControllerMain.java | 19 -
.../restlet/ZKPropertyTransferServer.java | 247 -----------
.../controller/restlet/ZNRecordUpdate.java | 83 ----
.../restlet/ZNRecordUpdateResource.java | 79 ----
.../restlet/ZkPropertyTransferApplication.java | 45 --
.../restlet/ZkPropertyTransferClient.java | 177 --------
.../helix/controller/restlet/package-info.java | 23 -
.../manager/zk/ParticipantManagerHelper.java | 1 -
.../helix/manager/zk/ZKHelixDataAccessor.java | 71 +---
.../apache/helix/manager/zk/ZKHelixManager.java | 2 -
.../manager/zk/ZNRecordStreamingSerializer.java | 6 +-
.../helix/manager/zk/ZkHelixLeaderElection.java | 10 -
.../helix/manager/zk/ZkHelixParticipant.java | 2 -
.../healthcheck/TestAlertActionTriggering.java | 224 ----------
.../helix/healthcheck/TestAlertFireHistory.java | 422 -------------------
.../helix/integration/TestAutoRebalance.java | 6 +-
.../TestAutoRebalancePartitionLimit.java | 2 +-
.../TestCustomizedIdealStateRebalancer.java | 2 +-
.../helix/integration/TestDisableNode.java | 2 +-
.../helix/integration/TestDisablePartition.java | 2 +-
.../helix/integration/TestDropResource.java | 2 +-
.../helix/integration/TestMessagingService.java | 4 +-
.../helix/integration/TestSchedulerMessage.java | 6 +-
.../TestUserDefRebalancerCompatibility.java | 3 +-
.../integration/TestZkCallbackHandlerLeak.java | 77 ++--
...dAloneCMTestBaseWithPropertyServerCheck.java | 88 ----
.../manager/TestConsecutiveZkSessionExpiry.java | 4 +-
.../TestDistributedControllerManager.java | 4 +-
.../manager/TestZkCallbackHandlerLeak.java | 76 ++--
.../manager/zk/TestLiveInstanceBounce.java | 4 +-
.../zk/TestZKPropertyTransferServer.java | 63 ---
.../manager/zk/TestZkStateChangeListener.java | 5 +-
34 files changed, 106 insertions(+), 1674 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/85277291/helix-core/pom.xml
----------------------------------------------------------------------
diff --git a/helix-core/pom.xml b/helix-core/pom.xml
index b8e7a9a..0d06914 100644
--- a/helix-core/pom.xml
+++ b/helix-core/pom.xml
@@ -45,7 +45,6 @@ under the License.
org.apache.zookeeper.txn*;resolution:=optional,
org.apache.zookeeper*;version="[3.3,4)",
org.codehaus.jackson*;version="[1.8,2)",
- org.restlet;version="[2.2.1,3]",
*
</osgi.import>
<osgi.ignore>
@@ -131,9 +130,9 @@ under the License.
<version>2.1</version>
</dependency>
<dependency>
- <groupId>org.restlet.jse</groupId>
- <artifactId>org.restlet</artifactId>
- <version>2.2.1</version>
+ <groupId>commons-codec</groupId>
+ <artifactId>commons-codec</artifactId>
+ <version>1.6</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
http://git-wip-us.apache.org/repos/asf/helix/blob/85277291/helix-core/src/main/java/org/apache/helix/PropertyType.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/PropertyType.java b/helix-core/src/main/java/org/apache/helix/PropertyType.java
index 75adb20..579f454 100644
--- a/helix-core/src/main/java/org/apache/helix/PropertyType.java
+++ b/helix-core/src/main/java/org/apache/helix/PropertyType.java
@@ -85,8 +85,6 @@ public enum PropertyType {
*/
boolean isCached;
- boolean usePropertyTransferServer;
-
private PropertyType(Type type, boolean isPersistent, boolean mergeOnUpdate) {
this(type, isPersistent, mergeOnUpdate, false);
}
@@ -114,7 +112,6 @@ public enum PropertyType {
this.updateOnlyOnExists = updateOnlyOnExists;
this.createOnlyIfAbsent = createOnlyIfAbsent;
this.isCached = isCached;
- this.usePropertyTransferServer = isAsyncWrite;
}
/**
@@ -204,13 +201,4 @@ public enum PropertyType {
public boolean isCached() {
return isCached;
}
-
- /**
- * Check if this property uses a property transfer server
- * @return true if a property transfer server is used, false otherwise
- */
- public boolean usePropertyTransferServer() {
- return usePropertyTransferServer;
- }
-
}
http://git-wip-us.apache.org/repos/asf/helix/blob/85277291/helix-core/src/main/java/org/apache/helix/controller/HelixControllerMain.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/HelixControllerMain.java b/helix-core/src/main/java/org/apache/helix/controller/HelixControllerMain.java
index 62f3b23..b6c16b5 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/HelixControllerMain.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/HelixControllerMain.java
@@ -47,7 +47,6 @@ import org.apache.commons.cli.ParseException;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
-import org.apache.helix.controller.restlet.ZKPropertyTransferServer;
import org.apache.helix.manager.zk.HelixManagerShutdownHook;
import org.apache.helix.participant.DistClusterControllerStateModelFactory;
import org.apache.helix.participant.StateMachineEngine;
@@ -58,7 +57,6 @@ public class HelixControllerMain {
public static final String cluster = "cluster";
public static final String help = "help";
public static final String mode = "mode";
- public static final String propertyTransferServicePort = "propertyTransferPort";
public static final String name = "controllerName";
public static final String STANDALONE = "STANDALONE";
public static final String DISTRIBUTED = "DISTRIBUTED";
@@ -101,19 +99,11 @@ public class HelixControllerMain {
controllerNameOption.setRequired(false);
controllerNameOption.setArgName("Cluster controller name (Optional)");
- Option portOption =
- OptionBuilder.withLongOpt(propertyTransferServicePort)
- .withDescription("Webservice port for ZkProperty controller transfer").create();
- portOption.setArgs(1);
- portOption.setRequired(false);
- portOption.setArgName("Cluster controller property transfer port (Optional)");
-
Options options = new Options();
options.addOption(helpOption);
options.addOption(zkServerOption);
options.addOption(clusterOption);
options.addOption(modeOption);
- options.addOption(portOption);
options.addOption(controllerNameOption);
return options;
@@ -208,15 +198,11 @@ public class HelixControllerMain {
String clusterName = cmd.getOptionValue(cluster);
String controllerMode = STANDALONE;
String controllerName = null;
- int propertyTransServicePort = -1;
if (cmd.hasOption(mode)) {
controllerMode = cmd.getOptionValue(mode);
}
- if (cmd.hasOption(propertyTransferServicePort)) {
- propertyTransServicePort = Integer.parseInt(cmd.getOptionValue(propertyTransferServicePort));
- }
if (controllerMode.equalsIgnoreCase(DISTRIBUTED) && !cmd.hasOption(name)) {
throw new IllegalArgumentException(
"A unique cluster controller name is required in DISTRIBUTED mode");
@@ -228,10 +214,6 @@ public class HelixControllerMain {
logger.info("Cluster manager started, zkServer: " + zkConnectString + ", clusterName:"
+ clusterName + ", controllerName:" + controllerName + ", mode:" + controllerMode);
- if (propertyTransServicePort > 0) {
- ZKPropertyTransferServer.getInstance().init(propertyTransServicePort, zkConnectString);
- }
-
HelixManager manager =
startHelixController(zkConnectString, clusterName, controllerName, controllerMode);
@@ -244,7 +226,6 @@ public class HelixControllerMain {
+ " interrupted");
} finally {
manager.disconnect();
- ZKPropertyTransferServer.getInstance().shutdown();
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/85277291/helix-core/src/main/java/org/apache/helix/controller/restlet/ZKPropertyTransferServer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/restlet/ZKPropertyTransferServer.java b/helix-core/src/main/java/org/apache/helix/controller/restlet/ZKPropertyTransferServer.java
deleted file mode 100644
index 66bd257..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/restlet/ZKPropertyTransferServer.java
+++ /dev/null
@@ -1,247 +0,0 @@
-package org.apache.helix.controller.restlet;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.net.InetAddress;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.logging.Level;
-
-import org.I0Itec.zkclient.DataUpdater;
-import org.apache.helix.AccessOption;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.manager.zk.ZNRecordSerializer;
-import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.manager.zk.ZkClient;
-import org.apache.log4j.Logger;
-import org.restlet.Component;
-import org.restlet.Context;
-import org.restlet.data.Protocol;
-
-/**
- * Controller side restlet server that receives ZNRecordUpdate requests from
- * clients, and batch the ZNRecordUpdate and apply them to zookeeper. This is
- * to optimize the concurrency level of zookeeper access for ZNRecord updates
- * that does not require real-time, like message handling status updates and
- * healthcheck reports.
- * As one server will be used by multiple helix controllers that runs on the same machine,
- * This class is designed as a singleton. Application is responsible to call init()
- * and shutdown() on the getInstance().
- */
-public class ZKPropertyTransferServer {
- public static final String PORT = "port";
- public static String RESTRESOURCENAME = "ZNRecordUpdates";
- public static final String SERVER = "ZKPropertyTransferServer";
-
- // Frequency period for the ZNRecords are batch written to zookeeper
- public static int PERIOD = 10 * 1000;
- // If the buffered ZNRecord updates exceed the limit, do a zookeeper batch update.
- public static int MAX_UPDATE_LIMIT = 10000;
- private static Logger LOG = Logger.getLogger(ZKPropertyTransferServer.class);
-
- int _localWebservicePort;
- String _webserviceUrl;
- ZkBaseDataAccessor<ZNRecord> _accessor;
- String _zkAddress;
-
- AtomicReference<ConcurrentHashMap<String, ZNRecordUpdate>> _dataBufferRef =
- new AtomicReference<ConcurrentHashMap<String, ZNRecordUpdate>>();
-
- boolean _initialized = false;
- boolean _shutdownFlag = false;
- Component _component = null;
- Timer _timer = null;
-
- static {
- org.restlet.engine.Engine.setLogLevel(Level.SEVERE);
- }
-
- /**
- * Timertask for zookeeper batched writes
- */
- class ZKPropertyTransferTask extends TimerTask {
- @Override
- public void run() {
- try {
- sendData();
- } catch (Throwable t) {
- LOG.error("", t);
- }
-
- }
- }
-
- void sendData() {
- LOG.info("ZKPropertyTransferServer transfering data to zookeeper");
- ConcurrentHashMap<String, ZNRecordUpdate> updateCache = null;
-
- synchronized (_dataBufferRef) {
- updateCache = _dataBufferRef.getAndSet(new ConcurrentHashMap<String, ZNRecordUpdate>());
- }
-
- if (updateCache != null) {
- List<String> paths = new ArrayList<String>();
- List<DataUpdater<ZNRecord>> updaters = new ArrayList<DataUpdater<ZNRecord>>();
- List<ZNRecord> vals = new ArrayList<ZNRecord>();
- // BUGBUG : what if the instance is dropped?
- for (ZNRecordUpdate holder : updateCache.values()) {
- paths.add(holder.getPath());
- updaters.add(holder.getZNRecordUpdater());
- vals.add(holder.getRecord());
- }
- // Batch write the accumulated updates into zookeeper
- long timeStart = System.currentTimeMillis();
- if (paths.size() > 0) {
- _accessor.updateChildren(paths, updaters, AccessOption.PERSISTENT);
- }
- LOG.info("ZKPropertyTransferServer updated " + vals.size() + " records in "
- + (System.currentTimeMillis() - timeStart) + " ms");
- } else {
- LOG.warn("null _dataQueueRef. Should be in the beginning only");
- }
- }
-
- static ZKPropertyTransferServer _instance = new ZKPropertyTransferServer();
-
- private ZKPropertyTransferServer() {
- _dataBufferRef.getAndSet(new ConcurrentHashMap<String, ZNRecordUpdate>());
- }
-
- public static ZKPropertyTransferServer getInstance() {
- return _instance;
- }
-
- public boolean isInitialized() {
- return _initialized;
- }
-
- public void init(int localWebservicePort, String zkAddress) {
- if (!_initialized && !_shutdownFlag) {
- LOG.error("Initializing with port " + localWebservicePort + " zkAddress: " + zkAddress);
- _localWebservicePort = localWebservicePort;
- ZkClient zkClient = new ZkClient(zkAddress);
- zkClient.setZkSerializer(new ZNRecordSerializer());
- _accessor = new ZkBaseDataAccessor<ZNRecord>(zkClient);
- _zkAddress = zkAddress;
- startServer();
- } else {
- LOG.error("Already initialized with port " + _localWebservicePort + " shutdownFlag: "
- + _shutdownFlag);
- }
- }
-
- public String getWebserviceUrl() {
- if (!_initialized || _shutdownFlag) {
- LOG.debug("inited:" + _initialized + " shutdownFlag:" + _shutdownFlag + " , return");
- return null;
- }
- return _webserviceUrl;
- }
-
- /**
- * Add an ZNRecordUpdate to the change queue.
- * Called by the webservice front-end.
- */
- void enqueueData(ZNRecordUpdate e) {
- if (!_initialized || _shutdownFlag) {
- LOG.error("zkDataTransferServer inited:" + _initialized + " shutdownFlag:" + _shutdownFlag
- + " , return");
- return;
- }
- // Do local merge if receive multiple update on the same path
- synchronized (_dataBufferRef) {
- e.getRecord().setSimpleField(SERVER, _webserviceUrl);
- if (_dataBufferRef.get().containsKey(e.getPath())) {
- ZNRecord oldVal = _dataBufferRef.get().get(e.getPath()).getRecord();
- oldVal = e.getZNRecordUpdater().update(oldVal);
- _dataBufferRef.get().get(e.getPath())._record = oldVal;
- } else {
- _dataBufferRef.get().put(e.getPath(), e);
- }
- }
- if (_dataBufferRef.get().size() > MAX_UPDATE_LIMIT) {
- sendData();
- }
- }
-
- void startServer() {
- LOG.info("zkDataTransferServer starting on Port " + _localWebservicePort + " zkAddress "
- + _zkAddress);
-
- _component = new Component();
-
- _component.getServers().add(Protocol.HTTP, _localWebservicePort);
- Context applicationContext = _component.getContext().createChildContext();
- applicationContext.getAttributes().put(SERVER, this);
- applicationContext.getAttributes().put(PORT, "" + _localWebservicePort);
- ZkPropertyTransferApplication application =
- new ZkPropertyTransferApplication(applicationContext);
- // Attach the application to the component and start it
- _component.getDefaultHost().attach(application);
- _timer = new Timer(true);
- _timer.schedule(new ZKPropertyTransferTask(), PERIOD, PERIOD);
-
- try {
- _webserviceUrl =
- "http://" + InetAddress.getLocalHost().getCanonicalHostName() + ":"
- + _localWebservicePort + "/" + RESTRESOURCENAME;
- _component.start();
- _initialized = true;
- } catch (Exception e) {
- LOG.error("", e);
- }
- LOG.info("zkDataTransferServer started on Port " + _localWebservicePort + " zkAddress "
- + _zkAddress);
- }
-
- public void shutdown() {
- if (_shutdownFlag) {
- LOG.error("ZKPropertyTransferServer already has been shutdown...");
- return;
- }
- LOG.info("zkDataTransferServer shuting down on Port " + _localWebservicePort + " zkAddress "
- + _zkAddress);
- if (_timer != null) {
- _timer.cancel();
- }
- if (_component != null) {
- try {
- _component.stop();
- } catch (Exception e) {
- LOG.error("", e);
- }
- }
- _shutdownFlag = true;
- }
-
- public void reset() {
- if (_shutdownFlag == true) {
- _shutdownFlag = false;
- _initialized = false;
- _component = null;
- _timer = null;
- _dataBufferRef.getAndSet(new ConcurrentHashMap<String, ZNRecordUpdate>());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/85277291/helix-core/src/main/java/org/apache/helix/controller/restlet/ZNRecordUpdate.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/restlet/ZNRecordUpdate.java b/helix-core/src/main/java/org/apache/helix/controller/restlet/ZNRecordUpdate.java
deleted file mode 100644
index deef748..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/restlet/ZNRecordUpdate.java
+++ /dev/null
@@ -1,83 +0,0 @@
-package org.apache.helix.controller.restlet;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import org.I0Itec.zkclient.DataUpdater;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.ZNRecordUpdater;
-import org.codehaus.jackson.annotate.JsonCreator;
-import org.codehaus.jackson.annotate.JsonIgnore;
-import org.codehaus.jackson.annotate.JsonProperty;
-
-/**
- * Unit of transfered ZNRecord updates. Contains the ZNRecord Value, zkPath
- * to store the update value, and the property type (used to merge the ZNRecord)
- * For ZNRecord subtraction, it is currently not supported yet.
- */
-public class ZNRecordUpdate {
- public enum OpCode {
- // TODO: create is not supported; but update will create if not exist
- CREATE,
- UPDATE,
- SET
- }
-
- final String _path;
- ZNRecord _record;
- final OpCode _code;
-
- @JsonCreator
- public ZNRecordUpdate(@JsonProperty("path") String path, @JsonProperty("opcode") OpCode code,
- @JsonProperty("record") ZNRecord record) {
- _path = path;
- _record = record;
- _code = code;
- }
-
- public String getPath() {
- return _path;
- }
-
- public ZNRecord getRecord() {
- return _record;
- }
-
- public OpCode getOpcode() {
- return _code;
- }
-
- @JsonIgnore(true)
- public DataUpdater<ZNRecord> getZNRecordUpdater() {
- if (_code == OpCode.SET)
-
- {
- return new ZNRecordUpdater(_record) {
- @Override
- public ZNRecord update(ZNRecord current) {
- return _record;
- }
- };
- } else if ((_code == OpCode.UPDATE)) {
- return new ZNRecordUpdater(_record);
- } else {
- throw new UnsupportedOperationException("Not supported : " + _code);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/85277291/helix-core/src/main/java/org/apache/helix/controller/restlet/ZNRecordUpdateResource.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/restlet/ZNRecordUpdateResource.java b/helix-core/src/main/java/org/apache/helix/controller/restlet/ZNRecordUpdateResource.java
deleted file mode 100644
index 3877686..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/restlet/ZNRecordUpdateResource.java
+++ /dev/null
@@ -1,79 +0,0 @@
-package org.apache.helix.controller.restlet;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.io.StringReader;
-import java.util.Map;
-import java.util.TreeMap;
-
-import org.apache.log4j.Logger;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.codehaus.jackson.type.TypeReference;
-import org.restlet.data.Form;
-import org.restlet.data.MediaType;
-import org.restlet.data.Status;
-import org.restlet.representation.Representation;
-import org.restlet.representation.Variant;
-import org.restlet.resource.ServerResource;
-
-/**
- * REST resource for ZkPropertyTransfer server to receive PUT requests
- * that submits ZNRecordUpdates
- */
-public class ZNRecordUpdateResource extends ServerResource {
- public static final String UPDATEKEY = "ZNRecordUpdate";
- private static Logger LOG = Logger.getLogger(ZNRecordUpdateResource.class);
-
- public ZNRecordUpdateResource() {
- getVariants().add(new Variant(MediaType.TEXT_PLAIN));
- getVariants().add(new Variant(MediaType.APPLICATION_JSON));
- setNegotiated(false);
- }
-
- @Override
- public Representation put(Representation entity) {
- try {
- ZKPropertyTransferServer server = ZKPropertyTransferServer.getInstance();
-
- Form form = new Form(entity);
- String jsonPayload = form.getFirstValue(UPDATEKEY, true);
-
- // Parse the map from zkPath --> ZNRecordUpdate from the payload
- StringReader sr = new StringReader(jsonPayload);
- ObjectMapper mapper = new ObjectMapper();
- TypeReference<TreeMap<String, ZNRecordUpdate>> typeRef =
- new TypeReference<TreeMap<String, ZNRecordUpdate>>() {
- };
- Map<String, ZNRecordUpdate> holderMap = mapper.readValue(sr, typeRef);
- // Enqueue the ZNRecordUpdate for sending
- for (ZNRecordUpdate holder : holderMap.values()) {
- server.enqueueData(holder);
- LOG.info("Received " + holder.getPath() + " from "
- + getRequest().getClientInfo().getAddress());
- }
- getResponse().setStatus(Status.SUCCESS_OK);
- } catch (Exception e) {
- LOG.error("", e);
- getResponse().setStatus(Status.SERVER_ERROR_INTERNAL);
- }
- return null;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/85277291/helix-core/src/main/java/org/apache/helix/controller/restlet/ZkPropertyTransferApplication.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/restlet/ZkPropertyTransferApplication.java b/helix-core/src/main/java/org/apache/helix/controller/restlet/ZkPropertyTransferApplication.java
deleted file mode 100644
index 68d35cb..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/restlet/ZkPropertyTransferApplication.java
+++ /dev/null
@@ -1,45 +0,0 @@
-package org.apache.helix.controller.restlet;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import org.restlet.Application;
-import org.restlet.Context;
-import org.restlet.Restlet;
-import org.restlet.routing.Router;
-
-/**
- * Restlet application for ZkPropertyTransfer server
- */
-public class ZkPropertyTransferApplication extends Application {
- public ZkPropertyTransferApplication() {
- super();
- }
-
- public ZkPropertyTransferApplication(Context context) {
- super(context);
- }
-
- @Override
- public Restlet createInboundRoot() {
- Router router = new Router(getContext());
- router.attach("/" + ZKPropertyTransferServer.RESTRESOURCENAME, ZNRecordUpdateResource.class);
- return router;
- }
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/85277291/helix-core/src/main/java/org/apache/helix/controller/restlet/ZkPropertyTransferClient.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/restlet/ZkPropertyTransferClient.java b/helix-core/src/main/java/org/apache/helix/controller/restlet/ZkPropertyTransferClient.java
deleted file mode 100644
index 092d845..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/restlet/ZkPropertyTransferClient.java
+++ /dev/null
@@ -1,177 +0,0 @@
-package org.apache.helix.controller.restlet;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.io.StringWriter;
-import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.logging.Level;
-
-import org.apache.helix.ZNRecord;
-import org.apache.log4j.Logger;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.restlet.Client;
-import org.restlet.Request;
-import org.restlet.Response;
-import org.restlet.data.MediaType;
-import org.restlet.data.Method;
-import org.restlet.data.Protocol;
-import org.restlet.data.Reference;
-import org.restlet.data.Status;
-
-public class ZkPropertyTransferClient {
- private static Logger LOG = Logger.getLogger(ZkPropertyTransferClient.class);
- public static final int DEFAULT_MAX_CONCURRENTTASKS = 2;
- public static int SEND_PERIOD = 10 * 1000;
-
- public static final String USE_PROPERTYTRANSFER = "UsePropertyTransfer";
-
- int _maxConcurrentTasks;
- ExecutorService _executorService;
- Client[] _clients;
- AtomicInteger _requestCount = new AtomicInteger(0);
-
- // ZNRecord update buffer: key is the zkPath, value is the ZNRecordUpdate
- AtomicReference<ConcurrentHashMap<String, ZNRecordUpdate>> _dataBufferRef =
- new AtomicReference<ConcurrentHashMap<String, ZNRecordUpdate>>();
- Timer _timer;
- volatile String _webServiceUrl = "";
-
- static {
- org.restlet.engine.Engine.setLogLevel(Level.SEVERE);
- }
-
- public ZkPropertyTransferClient(int maxConcurrentTasks) {
- _maxConcurrentTasks = maxConcurrentTasks;
- _executorService = Executors.newFixedThreadPool(_maxConcurrentTasks);
- _clients = new Client[_maxConcurrentTasks];
- for (int i = 0; i < _clients.length; i++) {
- _clients[i] = new Client(Protocol.HTTP);
- }
- _timer = new Timer(true);
- _timer.schedule(new SendZNRecordTimerTask(), SEND_PERIOD, SEND_PERIOD);
- _dataBufferRef.getAndSet(new ConcurrentHashMap<String, ZNRecordUpdate>());
- }
-
- class SendZNRecordTimerTask extends TimerTask {
- @Override
- public void run() {
- sendUpdateBatch();
- }
- }
-
- public void enqueueZNRecordUpdate(ZNRecordUpdate update, String webserviceUrl) {
- try {
- LOG.info("Enqueue update to " + update.getPath() + " opcode: " + update.getOpcode() + " to "
- + webserviceUrl);
- _webServiceUrl = webserviceUrl;
- update.getRecord().setSimpleField(USE_PROPERTYTRANSFER, "true");
- synchronized (_dataBufferRef) {
- if (_dataBufferRef.get().containsKey(update._path)) {
- ZNRecord oldVal = _dataBufferRef.get().get(update.getPath()).getRecord();
- oldVal = update.getZNRecordUpdater().update(oldVal);
- _dataBufferRef.get().get(update.getPath())._record = oldVal;
- } else {
- _dataBufferRef.get().put(update.getPath(), update);
- }
- }
- } catch (Exception e) {
- LOG.error("", e);
- }
- }
-
- void sendUpdateBatch() {
- LOG.debug("Actual sending update with " + _dataBufferRef.get().size() + " updates to "
- + _webServiceUrl);
- Map<String, ZNRecordUpdate> updateCache = null;
-
- synchronized (_dataBufferRef) {
- updateCache = _dataBufferRef.getAndSet(new ConcurrentHashMap<String, ZNRecordUpdate>());
- }
-
- if (updateCache != null && updateCache.size() > 0) {
- ZNRecordUpdateUploadTask task =
- new ZNRecordUpdateUploadTask(updateCache, _webServiceUrl,
- _clients[_requestCount.intValue() % _maxConcurrentTasks]);
- _requestCount.incrementAndGet();
- _executorService.submit(task);
- LOG.trace("Queue size :" + ((ThreadPoolExecutor) _executorService).getQueue().size());
- }
- }
-
- public void shutdown() {
- LOG.info("Shutting down ZkPropertyTransferClient");
- _executorService.shutdown();
- _timer.cancel();
- for (Client client : _clients) {
- try {
- client.stop();
- } catch (Exception e) {
- LOG.error("", e);
- }
- }
- }
-
- class ZNRecordUpdateUploadTask implements Callable<Void> {
- Map<String, ZNRecordUpdate> _updateMap;
- String _webServiceUrl;
- Client _client;
-
- ZNRecordUpdateUploadTask(Map<String, ZNRecordUpdate> update, String webserviceUrl, Client client) {
- _updateMap = update;
- _webServiceUrl = webserviceUrl;
- _client = client;
- }
-
- @Override
- public Void call() throws Exception {
- LOG.debug("Actual sending update with " + _updateMap.size() + " updates to " + _webServiceUrl);
- long time = System.currentTimeMillis();
- Reference resourceRef = new Reference(_webServiceUrl);
- Request request = new Request(Method.PUT, resourceRef);
-
- ObjectMapper mapper = new ObjectMapper();
- StringWriter sw = new StringWriter();
- try {
- mapper.writeValue(sw, _updateMap);
- } catch (Exception e) {
- LOG.error("", e);
- }
-
- request.setEntity(ZNRecordUpdateResource.UPDATEKEY + "=" + sw, MediaType.APPLICATION_ALL);
- // This is a sync call. See com.noelios.restlet.http.StreamClientCall.sendRequest()
- Response response = _client.handle(request);
-
- if (response.getStatus().getCode() != Status.SUCCESS_OK.getCode()) {
- LOG.error("Status : " + response.getStatus());
- }
- LOG.info("Using time : " + (System.currentTimeMillis() - time));
- return null;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/85277291/helix-core/src/main/java/org/apache/helix/controller/restlet/package-info.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/restlet/package-info.java b/helix-core/src/main/java/org/apache/helix/controller/restlet/package-info.java
deleted file mode 100644
index 0ef7a79..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/restlet/package-info.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-/**
- * Restlet server for Helix controller
- *
- */
-package org.apache.helix.controller.restlet;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/helix/blob/85277291/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManagerHelper.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManagerHelper.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManagerHelper.java
index 42c4f11..925c52f 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManagerHelper.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManagerHelper.java
@@ -269,7 +269,6 @@ public class ParticipantManagerHelper {
_messagingService.registerMessageHandlerFactory(MessageType.STATE_TRANSITION.toString(),
_stateMachineEngine);
_manager.addMessageListener(_messagingService.getExecutor(), _instanceName);
- _manager.addControllerListener(_dataAccessor);
ScheduledTaskStateModelFactory stStateModelFactory =
new ScheduledTaskStateModelFactory(_messagingService.getExecutor());
http://git-wip-us.apache.org/repos/asf/helix/blob/85277291/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java
index 7527751..e411a72 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java
@@ -18,6 +18,7 @@ package org.apache.helix.manager.zk;
* specific language governing permissions and limitations
* under the License.
*/
+
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -28,13 +29,11 @@ import org.I0Itec.zkclient.DataUpdater;
import org.I0Itec.zkclient.exception.ZkNoNodeException;
import org.apache.helix.AccessOption;
import org.apache.helix.BaseDataAccessor;
-import org.apache.helix.ControllerChangeListener;
import org.apache.helix.GroupCommit;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixException;
import org.apache.helix.HelixProperty;
import org.apache.helix.InstanceType;
-import org.apache.helix.NotificationContext;
import org.apache.helix.PropertyKey;
import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.PropertyType;
@@ -42,20 +41,15 @@ import org.apache.helix.ZNRecord;
import org.apache.helix.ZNRecordAssembler;
import org.apache.helix.ZNRecordBucketizer;
import org.apache.helix.ZNRecordUpdater;
-import org.apache.helix.controller.restlet.ZNRecordUpdate;
-import org.apache.helix.controller.restlet.ZNRecordUpdate.OpCode;
-import org.apache.helix.controller.restlet.ZkPropertyTransferClient;
-import org.apache.helix.model.LiveInstance;
import org.apache.log4j.Logger;
import org.apache.zookeeper.data.Stat;
-public class ZKHelixDataAccessor implements HelixDataAccessor, ControllerChangeListener {
+public class ZKHelixDataAccessor implements HelixDataAccessor {
private static Logger LOG = Logger.getLogger(ZKHelixDataAccessor.class);
private final BaseDataAccessor<ZNRecord> _baseDataAccessor;
final InstanceType _instanceType;
private final String _clusterName;
private final Builder _propertyKeyBuilder;
- ZkPropertyTransferClient _zkPropertyTransferClient = null;
private final GroupCommit _groupCommit = new GroupCommit();
String _zkPropertyTransferSvcUrl = null;
@@ -100,14 +94,6 @@ public class ZKHelixDataAccessor implements HelixDataAccessor, ControllerChangeL
String path = key.getPath();
int options = constructOptions(type);
- if (type.usePropertyTransferServer()) {
- if (_zkPropertyTransferSvcUrl != null && _zkPropertyTransferClient != null) {
- ZNRecordUpdate update = new ZNRecordUpdate(path, OpCode.SET, value.getRecord());
- _zkPropertyTransferClient.enqueueZNRecordUpdate(update, _zkPropertyTransferSvcUrl);
- return true;
- }
- }
-
boolean success = false;
switch (type) {
case RESOURCEASSIGNMENTS:
@@ -155,20 +141,12 @@ public class ZKHelixDataAccessor implements HelixDataAccessor, ControllerChangeL
case CURRENTSTATES:
success = _groupCommit.commit(_baseDataAccessor, options, path, value.getRecord());
break;
- default:
- if (type.usePropertyTransferServer()) {
- if (_zkPropertyTransferSvcUrl != null && _zkPropertyTransferClient != null) {
- ZNRecordUpdate update = new ZNRecordUpdate(path, OpCode.UPDATE, value.getRecord());
- _zkPropertyTransferClient.enqueueZNRecordUpdate(update, _zkPropertyTransferSvcUrl);
-
- return true;
- } else {
- if (LOG.isTraceEnabled()) {
- LOG.trace("getPropertyTransferUrl is null, skip updating the value");
- }
- return true;
- }
+ case STATUSUPDATES:
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Update status. path: " + key.getPath() + ", record: " + value.getRecord());
}
+ break;
+ default:
success = _baseDataAccessor.update(path, new ZNRecordUpdater(value.getRecord()), options);
break;
}
@@ -487,39 +465,4 @@ public class ZKHelixDataAccessor implements HelixDataAccessor, ControllerChangeL
List<DataUpdater<ZNRecord>> updaters, int options) {
return _baseDataAccessor.updateChildren(paths, updaters, options);
}
-
- public void shutdown() {
- if (_zkPropertyTransferClient != null) {
- _zkPropertyTransferClient.shutdown();
- }
- }
-
- @Override
- public void onControllerChange(NotificationContext changeContext) {
- LOG.info("Controller has changed");
- refreshZkPropertyTransferUrl();
- if (_zkPropertyTransferClient == null) {
- if (_zkPropertyTransferSvcUrl != null && _zkPropertyTransferSvcUrl.length() > 0) {
- LOG.info("Creating ZkPropertyTransferClient as we get url " + _zkPropertyTransferSvcUrl);
- _zkPropertyTransferClient =
- new ZkPropertyTransferClient(ZkPropertyTransferClient.DEFAULT_MAX_CONCURRENTTASKS);
- }
- }
- }
-
- void refreshZkPropertyTransferUrl() {
- try {
- LiveInstance leader = getProperty(keyBuilder().controllerLeader());
- if (leader != null) {
- _zkPropertyTransferSvcUrl = leader.getWebserviceUrl();
- LOG.info("_zkPropertyTransferSvcUrl : " + _zkPropertyTransferSvcUrl + " Controller "
- + leader.getInstanceName());
- } else {
- _zkPropertyTransferSvcUrl = null;
- }
- } catch (Exception e) {
- // LOG.error("", e);
- _zkPropertyTransferSvcUrl = null;
- }
- }
}
http://git-wip-us.apache.org/repos/asf/helix/blob/85277291/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
index b71304a..d446430 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
@@ -548,8 +548,6 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
// TODO reset user defined handlers only
resetHandlers();
- _dataAccessor.shutdown();
-
if (_leaderElectionHandler != null) {
_leaderElectionHandler.reset();
}
http://git-wip-us.apache.org/repos/asf/helix/blob/85277291/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordStreamingSerializer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordStreamingSerializer.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordStreamingSerializer.java
index bd37d42..2d7cb3c 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordStreamingSerializer.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordStreamingSerializer.java
@@ -28,6 +28,7 @@ import java.util.TreeMap;
import org.I0Itec.zkclient.exception.ZkMarshallingError;
import org.I0Itec.zkclient.serialize.ZkSerializer;
+import org.apache.commons.codec.binary.Base64;
import org.apache.helix.HelixException;
import org.apache.helix.ZNRecord;
import org.apache.log4j.Logger;
@@ -35,7 +36,6 @@ import org.codehaus.jackson.JsonFactory;
import org.codehaus.jackson.JsonGenerator;
import org.codehaus.jackson.JsonParser;
import org.codehaus.jackson.JsonToken;
-import org.restlet.engine.util.Base64;
import com.google.common.collect.Maps;
@@ -141,7 +141,7 @@ public class ZNRecordStreamingSerializer implements ZkSerializer {
if (rawPayload != null && rawPayload.length > 0) {
// write rawPayload
g.writeRaw("\n ");
- g.writeStringField("rawPayload", Base64.encode(rawPayload, false));
+ g.writeStringField("rawPayload", new String(Base64.encodeBase64(rawPayload), "UTF-8"));
}
g.writeRaw("\n");
@@ -226,7 +226,7 @@ public class ZNRecordStreamingSerializer implements ZkSerializer {
}
} else if ("rawPayload".equals(fieldname)) {
- rawPayload = Base64.decode(jp.getText());
+ rawPayload = Base64.decodeBase64(jp.getText());
} else {
throw new IllegalStateException("Unrecognized field '" + fieldname + "'!");
}
http://git-wip-us.apache.org/repos/asf/helix/blob/85277291/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixLeaderElection.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixLeaderElection.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixLeaderElection.java
index 88ca1b0..df8b5c9 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixLeaderElection.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixLeaderElection.java
@@ -31,7 +31,6 @@ import org.apache.helix.PropertyType;
import org.apache.helix.api.id.ClusterId;
import org.apache.helix.api.id.ControllerId;
import org.apache.helix.controller.GenericHelixController;
-import org.apache.helix.controller.restlet.ZKPropertyTransferServer;
import org.apache.helix.model.LeaderHistory;
import org.apache.helix.model.LiveInstance;
import org.apache.log4j.Logger;
@@ -134,15 +133,6 @@ public class ZkHelixLeaderElection implements ControllerChangeListener {
leader.setLiveInstance(ManagementFactory.getRuntimeMXBean().getName());
leader.setSessionId(manager.getSessionId());
leader.setHelixVersion(manager.getVersion());
- if (ZKPropertyTransferServer.getInstance() != null) {
- String zkPropertyTransferServiceUrl =
- ZKPropertyTransferServer.getInstance().getWebserviceUrl();
- if (zkPropertyTransferServiceUrl != null) {
- leader.setWebserviceUrl(zkPropertyTransferServiceUrl);
- }
- } else {
- LOG.warn("ZKPropertyTransferServer instnace is null");
- }
boolean success = accessor.createProperty(keyBuilder.controllerLeader(), leader);
if (success) {
return true;
http://git-wip-us.apache.org/repos/asf/helix/blob/85277291/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixParticipant.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixParticipant.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixParticipant.java
index 39ded2c..0c6e772 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixParticipant.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixParticipant.java
@@ -353,7 +353,6 @@ public class ZkHelixParticipant implements HelixParticipant {
*/
_connection.addMessageListener(this, _messagingService.getExecutor(), _clusterId,
_participantId);
- _connection.addControllerListener(this, _accessor, _clusterId);
ScheduledTaskStateModelFactory stStateModelFactory =
new ScheduledTaskStateModelFactory(_messagingService.getExecutor());
@@ -432,7 +431,6 @@ public class ZkHelixParticipant implements HelixParticipant {
* transition?
*/
_messagingService.getExecutor().shutdown();
- _accessor.shutdown();
/**
* remove live instance ephemeral znode
http://git-wip-us.apache.org/repos/asf/helix/blob/85277291/helix-core/src/test/java/org/apache/helix/healthcheck/TestAlertActionTriggering.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/healthcheck/TestAlertActionTriggering.java b/helix-core/src/test/java/org/apache/helix/healthcheck/TestAlertActionTriggering.java
deleted file mode 100644
index 37f8205..0000000
--- a/helix-core/src/test/java/org/apache/helix/healthcheck/TestAlertActionTriggering.java
+++ /dev/null
@@ -1,224 +0,0 @@
-package org.apache.helix.healthcheck;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.helix.ConfigAccessor;
-import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.HelixManager;
-import org.apache.helix.PropertyKey.Builder;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.integration.ZkStandAloneCMTestBaseWithPropertyServerCheck;
-import org.apache.helix.model.ExternalView;
-import org.apache.helix.model.HealthStat;
-import org.apache.helix.model.HelixConfigScope;
-import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
-import org.apache.helix.model.InstanceConfig;
-import org.apache.helix.model.InstanceConfig.InstanceConfigProperty;
-import org.apache.helix.model.builder.HelixConfigScopeBuilder;
-import org.apache.helix.tools.ClusterStateVerifier;
-import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
-import org.apache.log4j.Logger;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-public class TestAlertActionTriggering extends ZkStandAloneCMTestBaseWithPropertyServerCheck {
- private static Logger LOG = Logger.getLogger(TestAlertActionTriggering.class);
-
- String _statName = "TestStat@DB=db1";
- String _stat = "TestStat";
- String metricName1 = "TestMetric1";
- String metricName2 = "TestMetric2";
-
- void setHealthData(int[] val1, int[] val2) {
- for (int i = 0; i < NODE_NR; i++) {
- HelixManager manager = _participants[i];
- ZNRecord record = new ZNRecord(_stat);
- Map<String, String> valMap = new HashMap<String, String>();
- valMap.put(metricName1, val1[i] + "");
- valMap.put(metricName2, val2[i] + "");
- record.setSimpleField("TimeStamp", new Date().getTime() + "");
- record.setMapField(_statName, valMap);
- HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
- Builder keyBuilder = helixDataAccessor.keyBuilder();
- helixDataAccessor.setProperty(
- keyBuilder.healthReport(manager.getInstanceName(), record.getId()),
- new HealthStat(record));
- }
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- LOG.error("sleep interrupted", e);
- }
- }
-
- void setHealthData2(int[] val1) {
- for (int i = 0; i < NODE_NR; i++) {
- HelixManager manager = _participants[i];
- ZNRecord record = new ZNRecord(_stat);
- Map<String, String> valMap = new HashMap<String, String>();
- valMap.put(metricName2, val1[i] + "");
- record.setSimpleField("TimeStamp", new Date().getTime() + "");
- record.setMapField("TestStat@DB=TestDB;Partition=TestDB_3", valMap);
- HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
- Builder keyBuilder = helixDataAccessor.keyBuilder();
- helixDataAccessor.setProperty(
- keyBuilder.healthReport(manager.getInstanceName(), record.getId()),
- new HealthStat(record));
- }
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- LOG.error("sleep interrupted", e);
- }
- }
-
- @Test
- public void testAlertActionDisableNode() throws InterruptedException {
- // ConfigScope scope = new ConfigScopeBuilder().forCluster(CLUSTER_NAME).build();
- HelixConfigScope scope =
- new HelixConfigScopeBuilder(ConfigScopeProperty.CLUSTER).forCluster(CLUSTER_NAME).build();
- Map<String, String> properties = new HashMap<String, String>();
- properties.put("healthChange.enabled", "true");
- _setupTool.getClusterManagementTool().setConfig(scope, properties);
-
- String alertStr1 =
- "EXP(decay(1.0)(localhost_*.TestStat@DB=db1.TestMetric1))CMP(GREATER)CON(20)ACTION(DISABLE_INSTANCE)";
- String alertStr2 =
- "EXP(decay(1.0)(localhost_*.TestStat@DB=db1.TestMetric2))CMP(GREATER)CON(120)ACTION(DISABLE_INSTANCE)";
- String alertStr3 =
- "EXP(decay(1.0)(localhost_*.TestStat@DB=TestDB;Partition=*.TestMetric2))CMP(GREATER)CON(160)ACTION(DISABLE_PARTITION)";
-
- _setupTool.getClusterManagementTool().addAlert(CLUSTER_NAME, alertStr1);
- _setupTool.getClusterManagementTool().addAlert(CLUSTER_NAME, alertStr2);
- _setupTool.getClusterManagementTool().addAlert(CLUSTER_NAME, alertStr3);
-
- int[] metrics1 = {
- 10, 15, 22, 12, 16
- };
- int[] metrics2 = {
- 22, 115, 22, 163, 16
- };
- int[] metrics3 = {
- 0, 0, 0, 0, 0
- };
- setHealthData(metrics1, metrics2);
-
- HelixManager manager = _controller;
-
- HealthStatsAggregator task = new HealthStatsAggregator(manager);
- task.aggregate();
- Thread.sleep(4000);
- HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
- Builder keyBuilder = helixDataAccessor.keyBuilder();
-
- boolean result =
- ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
- CLUSTER_NAME));
- Assert.assertTrue(result);
-
- Builder kb = manager.getHelixDataAccessor().keyBuilder();
- ExternalView externalView =
- manager.getHelixDataAccessor().getProperty(kb.externalView("TestDB"));
- // Test the DISABLE_INSTANCE alerts
- String participant1 = "localhost_" + (START_PORT + 3);
- String participant2 = "localhost_" + (START_PORT + 2);
- ConfigAccessor configAccessor = manager.getConfigAccessor();
- // scope = new
- // ConfigScopeBuilder().forCluster(manager.getClusterName()).forParticipant(participant1).build();
- scope =
- new HelixConfigScopeBuilder(ConfigScopeProperty.PARTICIPANT)
- .forCluster(manager.getClusterName()).forParticipant(participant1).build();
- String isEnabled = configAccessor.get(scope, "HELIX_ENABLED");
- Assert.assertFalse(Boolean.parseBoolean(isEnabled));
-
- // scope = new
- // ConfigScopeBuilder().forCluster(manager.getClusterName()).forParticipant(participant2).build();
- scope =
- new HelixConfigScopeBuilder(ConfigScopeProperty.PARTICIPANT)
- .forCluster(manager.getClusterName()).forParticipant(participant2).build();
- isEnabled = configAccessor.get(scope, "HELIX_ENABLED");
- Assert.assertFalse(Boolean.parseBoolean(isEnabled));
-
- for (String partitionName : externalView.getRecord().getMapFields().keySet()) {
- for (String hostName : externalView.getRecord().getMapField(partitionName).keySet()) {
- if (hostName.equals(participant1) || hostName.equals(participant2)) {
- Assert.assertEquals(externalView.getRecord().getMapField(partitionName).get(hostName),
- "OFFLINE");
- }
- }
- }
-
- // enable the disabled instances
- setHealthData(metrics3, metrics3);
- task.aggregate();
- Thread.sleep(1000);
-
- manager.getClusterManagmentTool().enableInstance(manager.getClusterName(), participant2, true);
- manager.getClusterManagmentTool().enableInstance(manager.getClusterName(), participant1, true);
-
- result =
- ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
- CLUSTER_NAME));
- Assert.assertTrue(result);
-
- // Test the DISABLE_PARTITION case
- int[] metrics4 = {
- 22, 115, 22, 16, 163
- };
- setHealthData2(metrics4);
- task.aggregate();
-
- // scope = new
- // ConfigScopeBuilder().forCluster(manager.getClusterName()).forParticipant(participant1).build();
- scope =
- new HelixConfigScopeBuilder(ConfigScopeProperty.PARTICIPANT)
- .forCluster(manager.getClusterName()).forParticipant(participant1).build();
- isEnabled = configAccessor.get(scope, "HELIX_ENABLED");
- Assert.assertTrue(Boolean.parseBoolean(isEnabled));
-
- // scope = new
- // ConfigScopeBuilder().forCluster(manager.getClusterName()).forParticipant(participant2).build();
- scope =
- new HelixConfigScopeBuilder(ConfigScopeProperty.PARTICIPANT)
- .forCluster(manager.getClusterName()).forParticipant(participant2).build();
- isEnabled = configAccessor.get(scope, "HELIX_ENABLED");
- Assert.assertTrue(Boolean.parseBoolean(isEnabled));
-
- result =
- ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
- CLUSTER_NAME));
- Assert.assertTrue(result);
- String participant3 = "localhost_" + (START_PORT + 4);
- externalView = manager.getHelixDataAccessor().getProperty(kb.externalView("TestDB"));
- Assert.assertTrue(externalView.getRecord().getMapField("TestDB_3").get(participant3)
- .equalsIgnoreCase("OFFLINE"));
-
- InstanceConfig nodeConfig =
- helixDataAccessor.getProperty(keyBuilder.instanceConfig(participant3));
- Assert.assertTrue(nodeConfig.getRecord()
- .getListField(InstanceConfigProperty.HELIX_DISABLED_PARTITION.toString())
- .contains("TestDB_3"));
-
- }
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/85277291/helix-core/src/test/java/org/apache/helix/healthcheck/TestAlertFireHistory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/healthcheck/TestAlertFireHistory.java b/helix-core/src/test/java/org/apache/helix/healthcheck/TestAlertFireHistory.java
deleted file mode 100644
index 798ce92..0000000
--- a/helix-core/src/test/java/org/apache/helix/healthcheck/TestAlertFireHistory.java
+++ /dev/null
@@ -1,422 +0,0 @@
-package org.apache.helix.healthcheck;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.TreeMap;
-
-import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.HelixManager;
-import org.apache.helix.HelixProperty;
-import org.apache.helix.HelixTimerTask;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.PropertyKey.Builder;
-import org.apache.helix.integration.ZkStandAloneCMTestBaseWithPropertyServerCheck;
-import org.apache.helix.model.AlertHistory;
-import org.apache.helix.model.HealthStat;
-import org.apache.helix.model.HelixConfigScope;
-import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
-import org.apache.helix.model.builder.HelixConfigScopeBuilder;
-import org.apache.log4j.Logger;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-/**
- * setup a storage cluster and start a zk-based cluster controller in stand-alone mode
- * start 5 dummy participants verify the current states at end
- */
-
-public class TestAlertFireHistory extends ZkStandAloneCMTestBaseWithPropertyServerCheck {
- private final static Logger LOG = Logger.getLogger(TestAlertFireHistory.class);
-
- String _statName = "TestStat@DB=db1";
- String _stat = "TestStat";
- String metricName1 = "TestMetric1";
- String metricName2 = "TestMetric2";
-
- String _alertStr1 = "EXP(decay(1.0)(localhost_*.TestStat@DB=db1.TestMetric1))CMP(GREATER)CON(20)";
- String _alertStr2 =
- "EXP(decay(1.0)(localhost_*.TestStat@DB=db1.TestMetric2))CMP(GREATER)CON(100)";
-
- void setHealthData(int[] val1, int[] val2) {
- for (int i = 0; i < NODE_NR; i++) {
- HelixManager manager = _participants[i];
- ZNRecord record = new ZNRecord(_stat);
- Map<String, String> valMap = new HashMap<String, String>();
- valMap.put(metricName1, val1[i] + "");
- valMap.put(metricName2, val2[i] + "");
- record.setSimpleField("TimeStamp", new Date().getTime() + "");
- record.setMapField(_statName, valMap);
- HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
- Builder keyBuilder = helixDataAccessor.keyBuilder();
- helixDataAccessor.setProperty(
- keyBuilder.healthReport(manager.getInstanceName(), record.getId()),
- new HealthStat(record));
- }
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- LOG.error("Interrupted sleep", e);
- }
- }
-
- @Test
- public void testAlertDisable() throws InterruptedException {
-
- int[] metrics1 = {
- 10, 15, 22, 24, 16
- };
- int[] metrics2 = {
- 22, 115, 22, 141, 16
- };
- setHealthData(metrics1, metrics2);
-
- HelixManager manager = _controller;
- manager.startTimerTasks();
-
- _setupTool.getClusterManagementTool().addAlert(CLUSTER_NAME, _alertStr1);
- _setupTool.getClusterManagementTool().addAlert(CLUSTER_NAME, _alertStr2);
-
- HelixConfigScope scope =
- new HelixConfigScopeBuilder(ConfigScopeProperty.CLUSTER).forCluster(CLUSTER_NAME).build();
- Map<String, String> properties = new HashMap<String, String>();
- properties.put("healthChange.enabled", "false");
- _setupTool.getClusterManagementTool().setConfig(scope, properties);
-
- HealthStatsAggregator task = new HealthStatsAggregator(_controller);
-
- task.aggregate();
- Thread.sleep(100);
- HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
- Builder keyBuilder = helixDataAccessor.keyBuilder();
-
- AlertHistory history = manager.getHelixDataAccessor().getProperty(keyBuilder.alertHistory());
-
- Assert.assertEquals(history, null);
-
- properties.put("healthChange.enabled", "true");
- _setupTool.getClusterManagementTool().setConfig(scope, properties);
-
- task.aggregate();
- Thread.sleep(100);
-
- history = manager.getHelixDataAccessor().getProperty(keyBuilder.alertHistory());
- //
- Assert.assertNotNull(history);
- Assert.assertEquals(history.getRecord().getMapFields().size(), 1);
- }
-
- @Test
- public void testAlertHistory() throws InterruptedException {
- int[] metrics1 = {
- 10, 15, 22, 24, 16
- };
- int[] metrics2 = {
- 22, 115, 22, 141, 16
- };
- setHealthData(metrics1, metrics2);
-
- HelixManager manager = _controller;
- for (HelixTimerTask task : _controller.getControllerTimerTasks()) {
- task.stop();
- }
-
- _setupTool.getClusterManagementTool().addAlert(CLUSTER_NAME, _alertStr1);
- _setupTool.getClusterManagementTool().addAlert(CLUSTER_NAME, _alertStr2);
-
- int historySize = 0;
- HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
- Builder keyBuilder = helixDataAccessor.keyBuilder();
- HelixProperty property = helixDataAccessor.getProperty(keyBuilder.alertHistory());
- ZNRecord history = null;
- if (property != null) {
- history = property.getRecord();
- historySize = property.getRecord().getMapFields().size();
- }
-
- HealthStatsAggregator task = new HealthStatsAggregator(_controller);
-
- task.aggregate();
- Thread.sleep(100);
-
- history = helixDataAccessor.getProperty(keyBuilder.alertHistory()).getRecord();
- //
- Assert.assertEquals(history.getMapFields().size(), 1 + historySize);
- TreeMap<String, Map<String, String>> recordMap = new TreeMap<String, Map<String, String>>();
- recordMap.putAll(history.getMapFields());
- Map<String, String> lastRecord = recordMap.firstEntry().getValue();
- Assert.assertTrue(lastRecord.size() == 4);
- Assert.assertTrue(lastRecord.get("(localhost_12920.TestStat@DB#db1.TestMetric1)GREATER(20)")
- .equals("ON"));
- Assert.assertTrue(lastRecord.get("(localhost_12919.TestStat@DB#db1.TestMetric2)GREATER(100)")
- .equals("ON"));
- Assert.assertTrue(lastRecord.get("(localhost_12921.TestStat@DB#db1.TestMetric1)GREATER(20)")
- .equals("ON"));
- Assert.assertTrue(lastRecord.get("(localhost_12921.TestStat@DB#db1.TestMetric2)GREATER(100)")
- .equals("ON"));
-
- setHealthData(metrics1, metrics2);
- task.aggregate();
- Thread.sleep(100);
- history = helixDataAccessor.getProperty(keyBuilder.alertHistory()).getRecord();
- // no change
- Assert.assertEquals(history.getMapFields().size(), 1 + historySize);
- recordMap = new TreeMap<String, Map<String, String>>();
- recordMap.putAll(history.getMapFields());
- lastRecord = recordMap.firstEntry().getValue();
- Assert.assertTrue(lastRecord.size() == 4);
- Assert.assertTrue(lastRecord.get("(localhost_12920.TestStat@DB#db1.TestMetric1)GREATER(20)")
- .equals("ON"));
- Assert.assertTrue(lastRecord.get("(localhost_12919.TestStat@DB#db1.TestMetric2)GREATER(100)")
- .equals("ON"));
- Assert.assertTrue(lastRecord.get("(localhost_12921.TestStat@DB#db1.TestMetric1)GREATER(20)")
- .equals("ON"));
- Assert.assertTrue(lastRecord.get("(localhost_12921.TestStat@DB#db1.TestMetric2)GREATER(100)")
- .equals("ON"));
-
- int[] metrics3 = {
- 21, 44, 22, 14, 16
- };
- int[] metrics4 = {
- 122, 115, 222, 41, 16
- };
- setHealthData(metrics3, metrics4);
- task.aggregate();
- Thread.sleep(100);
- history = helixDataAccessor.getProperty(keyBuilder.alertHistory()).getRecord();
- // new delta should be recorded
- Assert.assertEquals(history.getMapFields().size(), 2 + historySize);
- recordMap = new TreeMap<String, Map<String, String>>();
- recordMap.putAll(history.getMapFields());
- lastRecord = recordMap.lastEntry().getValue();
- Assert.assertEquals(lastRecord.size(), 6);
- Assert.assertTrue(lastRecord.get("(localhost_12918.TestStat@DB#db1.TestMetric1)GREATER(20)")
- .equals("ON"));
- Assert.assertTrue(lastRecord.get("(localhost_12921.TestStat@DB#db1.TestMetric1)GREATER(20)")
- .equals("OFF"));
- Assert.assertTrue(lastRecord.get("(localhost_12920.TestStat@DB#db1.TestMetric2)GREATER(100)")
- .equals("ON"));
- Assert.assertTrue(lastRecord.get("(localhost_12921.TestStat@DB#db1.TestMetric2)GREATER(100)")
- .equals("OFF"));
- Assert.assertTrue(lastRecord.get("(localhost_12918.TestStat@DB#db1.TestMetric2)GREATER(100)")
- .equals("ON"));
- Assert.assertTrue(lastRecord.get("(localhost_12919.TestStat@DB#db1.TestMetric1)GREATER(20)")
- .equals("ON"));
-
- int[] metrics5 = {
- 0, 0, 0, 0, 0
- };
- int[] metrics6 = {
- 0, 0, 0, 0, 0
- };
- setHealthData(metrics5, metrics6);
- task.aggregate();
-
- for (int i = 0; i < 10; i++) {
- Thread.sleep(500);
- history = helixDataAccessor.getProperty(keyBuilder.alertHistory()).getRecord();
- recordMap = new TreeMap<String, Map<String, String>>();
- recordMap.putAll(history.getMapFields());
- lastRecord = recordMap.lastEntry().getValue();
-
- if (history.getMapFields().size() == 3 + historySize && lastRecord.size() == 6) {
- break;
- }
- }
-
- // reset everything
- Assert.assertEquals(history.getMapFields().size(), 3 + historySize,
- "expect history-map-field size is " + (3 + historySize) + ", but was " + history);
- Assert
- .assertTrue(lastRecord.size() == 6, "expect last-record size is 6, but was " + lastRecord);
-
- Assert.assertTrue(lastRecord.get("(localhost_12918.TestStat@DB#db1.TestMetric1)GREATER(20)")
- .equals("OFF"));
- Assert.assertTrue(lastRecord.get("(localhost_12920.TestStat@DB#db1.TestMetric1)GREATER(20)")
- .equals("OFF"));
- Assert.assertTrue(lastRecord.get("(localhost_12920.TestStat@DB#db1.TestMetric2)GREATER(100)")
- .equals("OFF"));
- Assert.assertTrue(lastRecord.get("(localhost_12919.TestStat@DB#db1.TestMetric2)GREATER(100)")
- .equals("OFF"));
- Assert.assertTrue(lastRecord.get("(localhost_12918.TestStat@DB#db1.TestMetric2)GREATER(100)")
- .equals("OFF"));
- Assert.assertTrue(lastRecord.get("(localhost_12919.TestStat@DB#db1.TestMetric1)GREATER(20)")
- .equals("OFF"));
-
- // Size of the history should be 30
- for (int i = 0; i < 27; i++) {
- int x = i % 2;
- int y = (i + 1) % 2;
- int[] metricsx = {
- 19 + 3 * x, 19 + 3 * y, 19 + 4 * x, 18 + 4 * y, 17 + 5 * y
- };
- int[] metricsy = {
- 99 + 3 * x, 99 + 3 * y, 98 + 4 * x, 98 + 4 * y, 97 + 5 * y
- };
-
- setHealthData(metricsx, metricsy);
- task.aggregate();
- Thread.sleep(100);
- history = helixDataAccessor.getProperty(keyBuilder.alertHistory()).getRecord();
-
- Assert.assertEquals(history.getMapFields().size(), Math.min(3 + i + 1 + historySize, 30));
- recordMap = new TreeMap<String, Map<String, String>>();
- recordMap.putAll(history.getMapFields());
- lastRecord = recordMap.lastEntry().getValue();
- if (i == 0) {
- Assert.assertTrue(lastRecord.size() == 6);
- Assert.assertTrue(lastRecord
- .get("(localhost_12922.TestStat@DB#db1.TestMetric1)GREATER(20)").equals("ON"));
- Assert.assertTrue(lastRecord.get(
- "(localhost_12922.TestStat@DB#db1.TestMetric2)GREATER(100)").equals("ON"));
- Assert.assertTrue(lastRecord.get(
- "(localhost_12919.TestStat@DB#db1.TestMetric2)GREATER(100)").equals("ON"));
- Assert.assertTrue(lastRecord
- .get("(localhost_12921.TestStat@DB#db1.TestMetric1)GREATER(20)").equals("ON"));
- Assert.assertTrue(lastRecord.get(
- "(localhost_12921.TestStat@DB#db1.TestMetric2)GREATER(100)").equals("ON"));
- Assert.assertTrue(lastRecord
- .get("(localhost_12919.TestStat@DB#db1.TestMetric1)GREATER(20)").equals("ON"));
- } else {
- System.out.println(lastRecord.size());
- Assert.assertEquals(lastRecord.size(), 10);
- if (x == 0) {
- Assert.assertTrue(lastRecord.get(
- "(localhost_12922.TestStat@DB#db1.TestMetric2)GREATER(100)").equals("ON"));
- Assert.assertTrue(lastRecord.get(
- "(localhost_12922.TestStat@DB#db1.TestMetric1)GREATER(20)").equals("ON"));
- Assert.assertTrue(lastRecord.get(
- "(localhost_12920.TestStat@DB#db1.TestMetric1)GREATER(20)").equals("OFF"));
- Assert.assertTrue(lastRecord.get(
- "(localhost_12918.TestStat@DB#db1.TestMetric1)GREATER(20)").equals("OFF"));
- Assert.assertTrue(lastRecord.get(
- "(localhost_12919.TestStat@DB#db1.TestMetric2)GREATER(100)").equals("ON"));
- Assert.assertTrue(lastRecord.get(
- "(localhost_12921.TestStat@DB#db1.TestMetric1)GREATER(20)").equals("ON"));
- Assert.assertTrue(lastRecord.get(
- "(localhost_12920.TestStat@DB#db1.TestMetric2)GREATER(100)").equals("OFF"));
- Assert.assertTrue(lastRecord.get(
- "(localhost_12921.TestStat@DB#db1.TestMetric2)GREATER(100)").equals("ON"));
- Assert.assertTrue(lastRecord.get(
- "(localhost_12918.TestStat@DB#db1.TestMetric2)GREATER(100)").equals("OFF"));
- Assert.assertTrue(lastRecord.get(
- "(localhost_12919.TestStat@DB#db1.TestMetric1)GREATER(20)").equals("ON"));
- } else {
- Assert.assertTrue(lastRecord.get(
- "(localhost_12922.TestStat@DB#db1.TestMetric2)GREATER(100)").equals("OFF"));
- Assert.assertTrue(lastRecord.get(
- "(localhost_12922.TestStat@DB#db1.TestMetric1)GREATER(20)").equals("OFF"));
- Assert.assertTrue(lastRecord.get(
- "(localhost_12920.TestStat@DB#db1.TestMetric1)GREATER(20)").equals("ON"));
- Assert.assertTrue(lastRecord.get(
- "(localhost_12918.TestStat@DB#db1.TestMetric1)GREATER(20)").equals("ON"));
- Assert.assertTrue(lastRecord.get(
- "(localhost_12919.TestStat@DB#db1.TestMetric2)GREATER(100)").equals("OFF"));
- Assert.assertTrue(lastRecord.get(
- "(localhost_12921.TestStat@DB#db1.TestMetric1)GREATER(20)").equals("OFF"));
- Assert.assertTrue(lastRecord.get(
- "(localhost_12920.TestStat@DB#db1.TestMetric2)GREATER(100)").equals("ON"));
- Assert.assertTrue(lastRecord.get(
- "(localhost_12921.TestStat@DB#db1.TestMetric2)GREATER(100)").equals("OFF"));
- Assert.assertTrue(lastRecord.get(
- "(localhost_12918.TestStat@DB#db1.TestMetric2)GREATER(100)").equals("ON"));
- Assert.assertTrue(lastRecord.get(
- "(localhost_12919.TestStat@DB#db1.TestMetric1)GREATER(20)").equals("OFF"));
- }
- }
- }
- // size limit is 30
- for (int i = 0; i < 10; i++) {
- int x = i % 2;
- int y = (i + 1) % 2;
- int[] metricsx = {
- 19 + 3 * x, 19 + 3 * y, 19 + 4 * x, 18 + 4 * y, 17 + 5 * y
- };
- int[] metricsy = {
- 99 + 3 * x, 99 + 3 * y, 98 + 4 * x, 98 + 4 * y, 97 + 5 * y
- };
-
- setHealthData(metricsx, metricsy);
- task.aggregate();
- for (int j = 0; j < 10; j++) {
- Thread.sleep(100);
- history = helixDataAccessor.getProperty(keyBuilder.alertHistory()).getRecord();
- recordMap = new TreeMap<String, Map<String, String>>();
- recordMap.putAll(history.getMapFields());
- lastRecord = recordMap.lastEntry().getValue();
-
- if (history.getMapFields().size() == 30 && lastRecord.size() == 10)
- break;
- }
- Assert.assertEquals(history.getMapFields().size(), 30,
- "expect history.map-field size is 30, but was " + history);
- Assert.assertEquals(lastRecord.size(), 10, "expect last-record size is 10, but was "
- + lastRecord);
-
- if (x == 0) {
- Assert.assertTrue(lastRecord.get(
- "(localhost_12922.TestStat@DB#db1.TestMetric2)GREATER(100)").equals("ON"));
- Assert.assertTrue(lastRecord
- .get("(localhost_12922.TestStat@DB#db1.TestMetric1)GREATER(20)").equals("ON"));
- Assert.assertTrue(lastRecord
- .get("(localhost_12920.TestStat@DB#db1.TestMetric1)GREATER(20)").equals("OFF"));
- Assert.assertTrue(lastRecord
- .get("(localhost_12918.TestStat@DB#db1.TestMetric1)GREATER(20)").equals("OFF"));
- Assert.assertTrue(lastRecord.get(
- "(localhost_12919.TestStat@DB#db1.TestMetric2)GREATER(100)").equals("ON"));
- Assert.assertTrue(lastRecord
- .get("(localhost_12921.TestStat@DB#db1.TestMetric1)GREATER(20)").equals("ON"));
- Assert.assertTrue(lastRecord.get(
- "(localhost_12920.TestStat@DB#db1.TestMetric2)GREATER(100)").equals("OFF"));
- Assert.assertTrue(lastRecord.get(
- "(localhost_12921.TestStat@DB#db1.TestMetric2)GREATER(100)").equals("ON"));
- Assert.assertTrue(lastRecord.get(
- "(localhost_12918.TestStat@DB#db1.TestMetric2)GREATER(100)").equals("OFF"));
- Assert.assertTrue(lastRecord
- .get("(localhost_12919.TestStat@DB#db1.TestMetric1)GREATER(20)").equals("ON"));
- } else {
- Assert.assertTrue(lastRecord.get(
- "(localhost_12922.TestStat@DB#db1.TestMetric2)GREATER(100)").equals("OFF"));
- Assert.assertTrue(lastRecord
- .get("(localhost_12922.TestStat@DB#db1.TestMetric1)GREATER(20)").equals("OFF"));
- Assert.assertTrue(lastRecord
- .get("(localhost_12920.TestStat@DB#db1.TestMetric1)GREATER(20)").equals("ON"));
- Assert.assertTrue(lastRecord
- .get("(localhost_12918.TestStat@DB#db1.TestMetric1)GREATER(20)").equals("ON"));
- Assert.assertTrue(lastRecord.get(
- "(localhost_12919.TestStat@DB#db1.TestMetric2)GREATER(100)").equals("OFF"));
- Assert.assertTrue(lastRecord
- .get("(localhost_12921.TestStat@DB#db1.TestMetric1)GREATER(20)").equals("OFF"));
- Assert.assertTrue(lastRecord.get(
- "(localhost_12920.TestStat@DB#db1.TestMetric2)GREATER(100)").equals("ON"));
- Assert.assertTrue(lastRecord.get(
- "(localhost_12921.TestStat@DB#db1.TestMetric2)GREATER(100)").equals("OFF"));
- Assert.assertTrue(lastRecord.get(
- "(localhost_12918.TestStat@DB#db1.TestMetric2)GREATER(100)").equals("ON"));
- Assert.assertTrue(lastRecord
- .get("(localhost_12919.TestStat@DB#db1.TestMetric1)GREATER(20)").equals("OFF"));
- }
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/85277291/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalance.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalance.java
index cc43ecb..330b906 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalance.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalance.java
@@ -30,8 +30,6 @@ import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.TestHelper;
import org.apache.helix.ZNRecord;
import org.apache.helix.api.State;
-import org.apache.helix.controller.HelixControllerMain;
-import org.apache.helix.controller.stages.ClusterDataCache;
import org.apache.helix.integration.manager.ClusterControllerManager;
import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
@@ -46,13 +44,11 @@ import org.apache.helix.model.StateModelDefinition;
import org.apache.helix.tools.ClusterSetup;
import org.apache.helix.tools.ClusterStateVerifier;
import org.apache.helix.tools.ClusterStateVerifier.ZkVerifier;
-import org.apache.log4j.Logger;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
-public class TestAutoRebalance extends ZkStandAloneCMTestBaseWithPropertyServerCheck {
- private static final Logger LOG = Logger.getLogger(TestAutoRebalance.class.getName());
+public class TestAutoRebalance extends ZkStandAloneCMTestBase {
String db2 = TEST_DB + "2";
String _tag = "SSDSSD";
http://git-wip-us.apache.org/repos/asf/helix/blob/85277291/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalancePartitionLimit.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalancePartitionLimit.java b/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalancePartitionLimit.java
index 5f9f48c..59e1b27 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalancePartitionLimit.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalancePartitionLimit.java
@@ -48,7 +48,7 @@ import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
-public class TestAutoRebalancePartitionLimit extends ZkStandAloneCMTestBaseWithPropertyServerCheck {
+public class TestAutoRebalancePartitionLimit extends ZkStandAloneCMTestBase {
private static final Logger LOG = Logger.getLogger(TestAutoRebalancePartitionLimit.class
.getName());
http://git-wip-us.apache.org/repos/asf/helix/blob/85277291/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedIdealStateRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedIdealStateRebalancer.java b/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedIdealStateRebalancer.java
index 7bf4f94..33b95c1 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedIdealStateRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedIdealStateRebalancer.java
@@ -57,7 +57,7 @@ import org.testng.Assert;
import org.testng.annotations.Test;
public class TestCustomizedIdealStateRebalancer extends
- ZkStandAloneCMTestBaseWithPropertyServerCheck {
+ ZkStandAloneCMTestBase {
String db2 = TEST_DB + "2";
static boolean testRebalancerCreated = false;
static boolean testRebalancerInvoked = false;
http://git-wip-us.apache.org/repos/asf/helix/blob/85277291/helix-core/src/test/java/org/apache/helix/integration/TestDisableNode.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestDisableNode.java b/helix-core/src/test/java/org/apache/helix/integration/TestDisableNode.java
index be523d0..ca54fb3 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestDisableNode.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestDisableNode.java
@@ -25,7 +25,7 @@ import org.apache.helix.tools.ClusterStateVerifier;
import org.testng.Assert;
import org.testng.annotations.Test;
-public class TestDisableNode extends ZkStandAloneCMTestBaseWithPropertyServerCheck {
+public class TestDisableNode extends ZkStandAloneCMTestBase {
@Test()
public void testDisableNode() throws Exception {
http://git-wip-us.apache.org/repos/asf/helix/blob/85277291/helix-core/src/test/java/org/apache/helix/integration/TestDisablePartition.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestDisablePartition.java b/helix-core/src/test/java/org/apache/helix/integration/TestDisablePartition.java
index ba7e8e4..c81f6e0 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestDisablePartition.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestDisablePartition.java
@@ -33,7 +33,7 @@ import org.apache.log4j.Logger;
import org.testng.Assert;
import org.testng.annotations.Test;
-public class TestDisablePartition extends ZkStandAloneCMTestBaseWithPropertyServerCheck {
+public class TestDisablePartition extends ZkStandAloneCMTestBase {
private static Logger LOG = Logger.getLogger(TestDisablePartition.class);
@Test()
http://git-wip-us.apache.org/repos/asf/helix/blob/85277291/helix-core/src/test/java/org/apache/helix/integration/TestDropResource.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestDropResource.java b/helix-core/src/test/java/org/apache/helix/integration/TestDropResource.java
index 0d02d12..047ea75 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestDropResource.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestDropResource.java
@@ -26,7 +26,7 @@ import org.apache.helix.tools.ClusterStateVerifier;
import org.testng.Assert;
import org.testng.annotations.Test;
-public class TestDropResource extends ZkStandAloneCMTestBaseWithPropertyServerCheck {
+public class TestDropResource extends ZkStandAloneCMTestBase {
@Test()
public void testDropResource() throws Exception {
// add a resource to be dropped
http://git-wip-us.apache.org/repos/asf/helix/blob/85277291/helix-core/src/test/java/org/apache/helix/integration/TestMessagingService.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestMessagingService.java b/helix-core/src/test/java/org/apache/helix/integration/TestMessagingService.java
index 86f1ce4..7df9e8b 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestMessagingService.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestMessagingService.java
@@ -38,7 +38,7 @@ import org.apache.helix.model.Message.MessageType;
import org.testng.AssertJUnit;
import org.testng.annotations.Test;
-public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServerCheck {
+public class TestMessagingService extends ZkStandAloneCMTestBase {
public static class TestMessagingHandlerFactory implements MessageHandlerFactory {
public static HashSet<String> _processedMsgIds = new HashSet<String>();
@@ -168,8 +168,6 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ
.get("ReplyMessage"));
if (message.getRecord().getMapField(Message.Attributes.MESSAGE_RESULT.toString())
.get("ReplyMessage") == null) {
- int x = 0;
- x++;
}
_replyedMessageContents.add(message.getRecord()
.getMapField(Message.Attributes.MESSAGE_RESULT.toString()).get("ReplyMessage"));
http://git-wip-us.apache.org/repos/asf/helix/blob/85277291/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
index d78bd9d..e6117d6 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
@@ -64,7 +64,7 @@ import org.codehaus.jackson.map.SerializationConfig;
import org.testng.Assert;
import org.testng.annotations.Test;
-public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServerCheck {
+public class TestSchedulerMessage extends ZkStandAloneCMTestBase {
class MockAsyncCallback extends AsyncCallback {
Message _message;
@@ -386,7 +386,7 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
PropertyType.STATUSUPDATES);
subPaths = _gZkClient.getChildren(instanceStatusPath);
- Assert.assertTrue(subPaths.size() > 0);
+ Assert.assertTrue(subPaths.size() == 0);
for (String subPath : subPaths) {
String nextPath = instanceStatusPath + "/" + subPath;
List<String> subsubPaths = _gZkClient.getChildren(nextPath);
@@ -409,7 +409,7 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
}
subPaths = _gZkClient.getChildren(instanceStatusPath);
- Assert.assertTrue(subPaths.size() > 0);
+ Assert.assertTrue(subPaths.size() == 0);
for (String subPath : subPaths) {
String nextPath = instanceStatusPath + "/" + subPath;
List<String> subsubPaths = _gZkClient.getChildren(nextPath);
http://git-wip-us.apache.org/repos/asf/helix/blob/85277291/helix-core/src/test/java/org/apache/helix/integration/TestUserDefRebalancerCompatibility.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestUserDefRebalancerCompatibility.java b/helix-core/src/test/java/org/apache/helix/integration/TestUserDefRebalancerCompatibility.java
index 2a026c2..e7cef4f 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestUserDefRebalancerCompatibility.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestUserDefRebalancerCompatibility.java
@@ -37,8 +37,7 @@ import org.testng.Assert;
import org.testng.annotations.Test;
@SuppressWarnings("deprecation")
-public class TestUserDefRebalancerCompatibility extends
- ZkStandAloneCMTestBaseWithPropertyServerCheck {
+public class TestUserDefRebalancerCompatibility extends ZkStandAloneCMTestBase {
String db2 = TEST_DB + "2";
static boolean testRebalancerCreated = false;
static boolean testRebalancerInvoked = false;
[5/7] git commit: [HELIX-444] add per-participant partition count
gauges to helix, rb=21419
Posted by ka...@apache.org.
[HELIX-444] add per-participant partition count gauges to helix, rb=21419
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/96aef71c
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/96aef71c
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/96aef71c
Branch: refs/heads/master
Commit: 96aef71c899dc1f3956e1211fc1e9a7459a258d1
Parents: 8527729
Author: zzhang <zz...@apache.org>
Authored: Wed May 21 15:56:57 2014 -0700
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Thu Jul 10 14:53:41 2014 -0700
----------------------------------------------------------------------
.../main/java/org/apache/helix/api/State.java | 6 +-
.../stages/BestPossibleStateCalcStage.java | 9 +
.../controller/stages/ClusterDataCache.java | 42 +---
.../stages/ExternalViewComputeStage.java | 23 +-
.../monitoring/mbeans/ClusterStatusMonitor.java | 182 +++++++++++++--
.../monitoring/mbeans/InstanceMonitor.java | 4 +-
.../mbeans/PerInstanceResourceMonitor.java | 147 +++++++++++++
.../mbeans/PerInstanceResourceMonitorMBean.java | 34 +++
.../monitoring/mbeans/ResourceMonitor.java | 23 +-
.../TestClusterStatusMonitorLifecycle.java | 42 ++--
.../mbeans/TestClusterStatusMonitor.java | 220 ++++++++++++-------
.../monitoring/mbeans/TestResourceMonitor.java | 59 ++---
12 files changed, 586 insertions(+), 205 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/96aef71c/helix-core/src/main/java/org/apache/helix/api/State.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/State.java b/helix-core/src/main/java/org/apache/helix/api/State.java
index 3315987..aa98df2 100644
--- a/helix-core/src/main/java/org/apache/helix/api/State.java
+++ b/helix-core/src/main/java/org/apache/helix/api/State.java
@@ -47,9 +47,11 @@ public class State {
@Override
public boolean equals(Object that) {
if (that instanceof State) {
- return this.toString().equals(((State) that).toString());
+ return this.toString().equalsIgnoreCase(((State) that).toString());
} else if (that instanceof String) {
- return _state.equals(that);
+ return _state.equalsIgnoreCase(that.toString());
+ } else if (that instanceof HelixDefinedState) {
+ return _state.equalsIgnoreCase(that.toString());
}
return false;
}
http://git-wip-us.apache.org/repos/asf/helix/blob/96aef71c/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
index 644b9f6..2f93b7f 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
@@ -42,6 +42,7 @@ import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
import org.apache.helix.model.ResourceAssignment;
import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
import org.apache.log4j.Logger;
import com.google.common.collect.Maps;
@@ -69,6 +70,7 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
Map<ResourceId, ResourceConfig> resourceMap =
event.getAttribute(AttributeName.RESOURCES.toString());
Cluster cluster = event.getAttribute("Cluster");
+ ClusterDataCache cache = event.getAttribute("ClusterDataCache");
if (currentStateOutput == null || resourceMap == null || cluster == null) {
throw new StageException("Missing attributes in event:" + event
@@ -79,6 +81,13 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
compute(cluster, event, resourceMap, currentStateOutput);
event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.toString(), bestPossibleStateOutput);
+ ClusterStatusMonitor clusterStatusMonitor =
+ (ClusterStatusMonitor) event.getAttribute("clusterStatusMonitor");
+ if (clusterStatusMonitor != null) {
+ clusterStatusMonitor.setPerInstanceResourceStatus(bestPossibleStateOutput,
+ cache.getInstanceConfigMap(), resourceMap, cache.getStateModelDefMap());
+ }
+
long endTime = System.currentTimeMillis();
if (LOG.isInfoEnabled()) {
LOG.info("END BestPossibleStateCalcStage.process(). took: " + (endTime - startTime) + " ms");
http://git-wip-us.apache.org/repos/asf/helix/blob/96aef71c/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
index 0c28bdf..8bcfaae 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
@@ -79,12 +79,6 @@ public class ClusterDataCache {
boolean _init = true;
- // Map<String, Map<String, HealthStat>> _healthStatMap;
- // private HealthStat _globalStats; // DON'T THINK I WILL USE THIS ANYMORE
- // private PersistentStats _persistentStats;
- // private Alerts _alerts;
- // private AlertStatus _alertStatus;
-
private static final Logger LOG = Logger.getLogger(ClusterDataCache.class.getName());
/**
@@ -334,37 +328,6 @@ public class ClusterDataCache {
return _messageMap;
}
- // public HealthStat getGlobalStats()
- // {
- // return _globalStats;
- // }
- //
- // public PersistentStats getPersistentStats()
- // {
- // return _persistentStats;
- // }
- //
- // public Alerts getAlerts()
- // {
- // return _alerts;
- // }
- //
- // public AlertStatus getAlertStatus()
- // {
- // return _alertStatus;
- // }
- //
- // public Map<String, HealthStat> getHealthStats(String instanceName)
- // {
- // Map<String, HealthStat> map = _healthStatMap.get(instanceName);
- // if (map != null)
- // {
- // return map;
- // } else
- // {
- // return Collections.emptyMap();
- // }
- // }
/**
* Provides the state model definition for a given state model
* @param stateModelDefRef
@@ -375,8 +338,13 @@ public class ClusterDataCache {
}
/**
+<<<<<<< HEAD
* Get all state model definitions
* @return map of name to state model definition
+=======
+ * Provides all state model definitions
+ * @return state model definition map
+>>>>>>> 8d5c27c... [HELIX-444] add per-participant partition count gauges to helix, rb=21419
*/
public Map<String, StateModelDefinition> getStateModelDefMap() {
return _stateModelDefMap;
http://git-wip-us.apache.org/repos/asf/helix/blob/96aef71c/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
index a46acbd..3086a83 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
@@ -35,7 +35,6 @@ import org.apache.helix.ZNRecord;
import org.apache.helix.ZNRecordDelta;
import org.apache.helix.ZNRecordDelta.MergeOperation;
import org.apache.helix.api.Cluster;
-import org.apache.helix.api.Resource;
import org.apache.helix.api.State;
import org.apache.helix.api.config.ResourceConfig;
import org.apache.helix.api.config.SchedulerTaskConfig;
@@ -46,6 +45,7 @@ import org.apache.helix.api.id.StateModelDefId;
import org.apache.helix.controller.pipeline.AbstractBaseStage;
import org.apache.helix.controller.pipeline.StageException;
import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
+import org.apache.helix.manager.zk.DefaultSchedulerMessageHandlerFactory;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.Message;
@@ -66,10 +66,11 @@ public class ExternalViewComputeStage extends AbstractBaseStage {
Map<ResourceId, ResourceConfig> resourceMap =
event.getAttribute(AttributeName.RESOURCES.toString());
Cluster cluster = event.getAttribute("Cluster");
+ ClusterDataCache cache = event.getAttribute("ClusterDataCache");
- if (manager == null || resourceMap == null || cluster == null) {
+ if (manager == null || resourceMap == null || cluster == null || cache == null) {
throw new StageException("Missing attributes in event:" + event
- + ". Requires ClusterManager|RESOURCES|Cluster");
+ + ". Requires ClusterManager|RESOURCES|Cluster|ClusterDataCache");
}
HelixDataAccessor dataAccessor = manager.getHelixDataAccessor();
@@ -118,15 +119,13 @@ public class ExternalViewComputeStage extends AbstractBaseStage {
// Update cluster status monitor mbean
ClusterStatusMonitor clusterStatusMonitor =
(ClusterStatusMonitor) event.getAttribute("clusterStatusMonitor");
- Resource currentResource = cluster.getResourceMap().get(view.getResourceId());
- if (clusterStatusMonitor != null && currentResource != null) {
- IdealState idealState = currentResource.getIdealState();
- if (idealState != null) {
- StateModelDefId stateModelDefId = idealState.getStateModelDefId();
- if (stateModelDefId != null
- && !stateModelDefId.equals(StateModelDefId.SchedulerTaskQueue)) {
- clusterStatusMonitor.onExternalViewChange(view, idealState);
- }
+ IdealState idealState = cache._idealStateMap.get(view.getResourceName());
+ if (idealState != null) {
+ if (clusterStatusMonitor != null
+ && !idealState.getStateModelDefRef().equalsIgnoreCase(
+ DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE)) {
+ clusterStatusMonitor.setResourceStatus(view,
+ cache._idealStateMap.get(view.getResourceName()));
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/96aef71c/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
index b468856..99dee75 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
@@ -23,6 +23,7 @@ import java.lang.management.ManagementFactory;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
@@ -32,8 +33,18 @@ import javax.management.MBeanServer;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
+import org.apache.helix.api.State;
+import org.apache.helix.api.config.ResourceConfig;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.api.id.StateModelDefId;
+import org.apache.helix.controller.stages.BestPossibleStateOutput;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.model.StateModelDefinition;
import org.apache.log4j.Logger;
import com.google.common.collect.Sets;
@@ -41,15 +52,15 @@ import com.google.common.collect.Sets;
public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
private static final Logger LOG = Logger.getLogger(ClusterStatusMonitor.class);
- static final String CLUSTER_STATUS_KEY = "ClusterStatus";
+ public static final String CLUSTER_STATUS_KEY = "ClusterStatus";
static final String MESSAGE_QUEUE_STATUS_KEY = "MessageQueueStatus";
static final String RESOURCE_STATUS_KEY = "ResourceStatus";
- static final String PARTICIPANT_STATUS_KEY = "ParticipantStatus";
+ public static final String PARTICIPANT_STATUS_KEY = "ParticipantStatus";
static final String CLUSTER_DN_KEY = "cluster";
static final String RESOURCE_DN_KEY = "resourceName";
static final String INSTANCE_DN_KEY = "instanceName";
- static final String DEFAULT_TAG = "DEFAULT";
+ public static final String DEFAULT_TAG = "DEFAULT";
private final String _clusterName;
private final MBeanServer _beanServer;
@@ -68,20 +79,27 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
private final ConcurrentHashMap<String, InstanceMonitor> _instanceMbeanMap =
new ConcurrentHashMap<String, InstanceMonitor>();
+ /**
+ * PerInstanceResource bean map: beanName->bean
+ */
+ private final Map<PerInstanceResourceMonitor.BeanName, PerInstanceResourceMonitor> _perInstanceResourceMap =
+ new ConcurrentHashMap<PerInstanceResourceMonitor.BeanName, PerInstanceResourceMonitor>();
+
public ClusterStatusMonitor(String clusterName) {
_clusterName = clusterName;
_beanServer = ManagementFactory.getPlatformMBeanServer();
try {
- register(this, getObjectName(CLUSTER_DN_KEY + "=" + _clusterName));
+ register(this, getObjectName(clusterBeanName()));
} catch (Exception e) {
- LOG.error("Register self failed.", e);
+ LOG.error("Fail to regiter ClusterStatusMonitor", e);
}
}
public ObjectName getObjectName(String name) throws MalformedObjectNameException {
- return new ObjectName(CLUSTER_STATUS_KEY + ": " + name);
+ return new ObjectName(String.format("%s: %s", CLUSTER_STATUS_KEY, name));
}
+ // TODO remove getBeanName()?
// Used by other external JMX consumers like ingraph
public String getBeanName() {
return CLUSTER_STATUS_KEY + " " + _clusterName;
@@ -144,10 +162,10 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
}
try {
- LOG.info("Registering " + name.toString());
+ LOG.info("Register MBean: " + name);
_beanServer.registerMBean(bean, name);
} catch (Exception e) {
- LOG.warn("Could not register MBean" + name, e);
+ LOG.warn("Could not register MBean: " + name, e);
}
}
@@ -158,7 +176,7 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
_beanServer.unregisterMBean(name);
}
} catch (Exception e) {
- LOG.warn("Could not unregister MBean" + name, e);
+ LOG.warn("Could not unregister MBean: " + name, e);
}
}
@@ -227,28 +245,98 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
}
}
- public void onExternalViewChange(ExternalView externalView, IdealState idealState) {
+ /**
+ * Update gauges for resource at instance level
+ * @param bestPossibleStates
+ * @param resourceMap
+ * @param stateModelDefMap
+ */
+ public void setPerInstanceResourceStatus(BestPossibleStateOutput bestPossibleStates,
+ Map<String, InstanceConfig> instanceConfigMap, Map<ResourceId, ResourceConfig> resourceMap,
+ Map<String, StateModelDefinition> stateModelDefMap) {
+
+ // Convert to perInstanceResource beanName->partition->state
+ Map<PerInstanceResourceMonitor.BeanName, Map<PartitionId, State>> beanMap =
+ new HashMap<PerInstanceResourceMonitor.BeanName, Map<PartitionId, State>>();
+ for (ResourceId resource : bestPossibleStates.getAssignedResources()) {
+ ResourceAssignment assignment = bestPossibleStates.getResourceAssignment(resource);
+ for (PartitionId partition : assignment.getMappedPartitionIds()) {
+ Map<ParticipantId, State> instanceStateMap = assignment.getReplicaMap(partition);
+ for (ParticipantId instance : instanceStateMap.keySet()) {
+ State state = instanceStateMap.get(instance);
+ PerInstanceResourceMonitor.BeanName beanName =
+ new PerInstanceResourceMonitor.BeanName(instance.toString(), resource.toString());
+ if (!beanMap.containsKey(beanName)) {
+ beanMap.put(beanName, new HashMap<PartitionId, State>());
+ }
+ beanMap.get(beanName).put(partition, state);
+ }
+ }
+ }
+ // Unregister beans for per-instance resources that no longer exist
+ Set<PerInstanceResourceMonitor.BeanName> toUnregister =
+ Sets.newHashSet(_perInstanceResourceMap.keySet());
+ toUnregister.removeAll(beanMap.keySet());
+ try {
+ unregisterPerInstanceResources(toUnregister);
+ } catch (MalformedObjectNameException e) {
+ LOG.error("Fail to unregister per-instance resource from MBean server: " + toUnregister, e);
+ }
+ // Register beans for per-instance resources that are newly configured
+ Set<PerInstanceResourceMonitor.BeanName> toRegister = Sets.newHashSet(beanMap.keySet());
+ toRegister.removeAll(_perInstanceResourceMap.keySet());
+ Set<PerInstanceResourceMonitor> monitorsToRegister = Sets.newHashSet();
+ for (PerInstanceResourceMonitor.BeanName beanName : toRegister) {
+ PerInstanceResourceMonitor bean =
+ new PerInstanceResourceMonitor(_clusterName, beanName.instanceName(),
+ beanName.resourceName());
+ StateModelDefId stateModelDefId =
+ resourceMap.get(ResourceId.from(beanName.resourceName())).getRebalancerConfig()
+ .getStateModelDefId();
+ InstanceConfig config = instanceConfigMap.get(beanName.instanceName());
+ bean.update(beanMap.get(beanName), Sets.newHashSet(config.getTags()),
+ stateModelDefMap.get(stateModelDefId.toString()));
+ monitorsToRegister.add(bean);
+ }
+ try {
+ registerPerInstanceResources(monitorsToRegister);
+ } catch (MalformedObjectNameException e) {
+ LOG.error("Fail to register per-instance resource with MBean server: " + toRegister, e);
+ }
+ // Update existing beans
+ for (PerInstanceResourceMonitor.BeanName beanName : _perInstanceResourceMap.keySet()) {
+ PerInstanceResourceMonitor bean = _perInstanceResourceMap.get(beanName);
+ StateModelDefId stateModelDefId =
+ resourceMap.get(ResourceId.from(beanName.resourceName())).getRebalancerConfig()
+ .getStateModelDefId();
+ InstanceConfig config = instanceConfigMap.get(beanName.instanceName());
+ bean.update(beanMap.get(beanName), Sets.newHashSet(config.getTags()),
+ stateModelDefMap.get(stateModelDefId.toString()));
+ }
+ }
+
+ public void setResourceStatus(ExternalView externalView, IdealState idealState) {
try {
String resourceName = externalView.getId();
if (!_resourceMbeanMap.containsKey(resourceName)) {
synchronized (this) {
if (!_resourceMbeanMap.containsKey(resourceName)) {
ResourceMonitor bean = new ResourceMonitor(_clusterName, resourceName);
- bean.updateExternalView(externalView, idealState);
+ bean.updateResource(externalView, idealState);
registerResources(Arrays.asList(bean));
}
}
}
ResourceMonitor bean = _resourceMbeanMap.get(resourceName);
String oldSensorName = bean.getSensorName();
- bean.updateExternalView(externalView, idealState);
+ bean.updateResource(externalView, idealState);
String newSensorName = bean.getSensorName();
if (!oldSensorName.equals(newSensorName)) {
unregisterResources(Arrays.asList(resourceName));
registerResources(Arrays.asList(bean));
}
} catch (Exception e) {
- LOG.warn(e);
+ LOG.error("Fail to set resource status, resource: " + idealState.getResourceName(), e);
}
}
@@ -264,18 +352,15 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
}
_instanceMsgQueueMbeanMap.get(instanceName).addMessageQueueSize(msgQueueSize);
} catch (Exception e) {
- LOG.warn("fail to add message queue size to mbean", e);
+ LOG.error("Fail to add message queue size to mbean, instance: " + instanceName, e);
}
}
public void reset() {
- LOG.info("Resetting ClusterStatusMonitor");
+ LOG.info("Reset ClusterStatusMonitor");
try {
- for (String resourceName : _resourceMbeanMap.keySet()) {
- String beanName =
- CLUSTER_DN_KEY + "=" + _clusterName + "," + RESOURCE_DN_KEY + "=" + resourceName;
- unregister(getObjectName(beanName));
- }
+ unregisterResources(_resourceMbeanMap.keySet());
+
_resourceMbeanMap.clear();
for (MessageQueueMonitor bean : _instanceMsgQueueMbeanMap.values()) {
@@ -286,9 +371,10 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
unregisterInstances(_instanceMbeanMap.keySet());
_instanceMbeanMap.clear();
- unregister(getObjectName(CLUSTER_DN_KEY + "=" + _clusterName));
+ unregisterPerInstanceResources(_perInstanceResourceMap.keySet());
+ unregister(getObjectName(clusterBeanName()));
} catch (Exception e) {
- LOG.error("fail to reset ClusterStatusMonitor", e);
+ LOG.error("Fail to reset ClusterStatusMonitor, cluster: " + _clusterName, e);
}
}
@@ -330,12 +416,60 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
_resourceMbeanMap.keySet().removeAll(resources);
}
+ private synchronized void registerPerInstanceResources(
+ Collection<PerInstanceResourceMonitor> monitors) throws MalformedObjectNameException {
+ for (PerInstanceResourceMonitor monitor : monitors) {
+ String instanceName = monitor.getInstanceName();
+ String resourceName = monitor.getResourceName();
+ String beanName = getPerInstanceResourceBeanName(instanceName, resourceName);
+ register(monitor, getObjectName(beanName));
+ _perInstanceResourceMap.put(new PerInstanceResourceMonitor.BeanName(instanceName,
+ resourceName), monitor);
+ }
+ }
+
+ private synchronized void unregisterPerInstanceResources(
+ Collection<PerInstanceResourceMonitor.BeanName> beanNames)
+ throws MalformedObjectNameException {
+ for (PerInstanceResourceMonitor.BeanName beanName : beanNames) {
+ unregister(getObjectName(getPerInstanceResourceBeanName(beanName.instanceName(),
+ beanName.resourceName())));
+ }
+ _perInstanceResourceMap.keySet().removeAll(beanNames);
+ }
+
+ public String clusterBeanName() {
+ return String.format("%s=%s", CLUSTER_DN_KEY, _clusterName);
+ }
+
+ /**
+ * Build instance bean name
+ * @param instanceName
+ * @return instance bean name
+ */
private String getInstanceBeanName(String instanceName) {
- return CLUSTER_DN_KEY + "=" + _clusterName + "," + INSTANCE_DN_KEY + "=" + instanceName;
+ return String.format("%s,%s=%s", clusterBeanName(), INSTANCE_DN_KEY, instanceName);
}
+ /**
+ * Build resource bean name
+ * @param resourceName
+ * @return resource bean name
+ */
private String getResourceBeanName(String resourceName) {
- return CLUSTER_DN_KEY + "=" + _clusterName + "," + RESOURCE_DN_KEY + "=" + resourceName;
+ return String.format("%s,%s=%s", clusterBeanName(), RESOURCE_DN_KEY, resourceName);
+ }
+
+ /**
+ * Build per-instance resource bean name:
+ * "cluster={clusterName},instanceName={instanceName},resourceName={resourceName}"
+ * @param instanceName
+ * @param resourceName
+ * @return per-instance resource bean name
+ */
+ public String getPerInstanceResourceBeanName(String instanceName, String resourceName) {
+ return String.format("%s,%s", clusterBeanName(), new PerInstanceResourceMonitor.BeanName(
+ instanceName, resourceName).toString());
}
@Override
http://git-wip-us.apache.org/repos/asf/helix/blob/96aef71c/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitor.java
index 1385568..d9875cc 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitor.java
@@ -54,8 +54,8 @@ public class InstanceMonitor implements InstanceMonitorMBean {
@Override
public String getSensorName() {
- return ClusterStatusMonitor.PARTICIPANT_STATUS_KEY + "." + _clusterName + "."
- + serializedTags() + "." + _participantName;
+ return String.format("%s.%s.%s.%s", ClusterStatusMonitor.PARTICIPANT_STATUS_KEY, _clusterName,
+ serializedTags(), _participantName);
}
@Override
http://git-wip-us.apache.org/repos/asf/helix/blob/96aef71c/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/PerInstanceResourceMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/PerInstanceResourceMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/PerInstanceResourceMonitor.java
new file mode 100644
index 0000000..476445c
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/PerInstanceResourceMonitor.java
@@ -0,0 +1,147 @@
+package org.apache.helix.monitoring.mbeans;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.HelixDefinedState;
+import org.apache.helix.api.State;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.model.StateModelDefinition;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+
+public class PerInstanceResourceMonitor implements PerInstanceResourceMonitorMBean {
+ public static class BeanName {
+ private final String _instanceName;
+ private final String _resourceName;
+
+ public BeanName(String instanceName, String resourceName) {
+ if (instanceName == null || resourceName == null) {
+ throw new NullPointerException("Illegal beanName. instanceName: " + instanceName
+ + ", resourceName: " + resourceName);
+ }
+ _instanceName = instanceName;
+ _resourceName = resourceName;
+ }
+
+ public String instanceName() {
+ return _instanceName;
+ }
+
+ public String resourceName() {
+ return _resourceName;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == null || !(obj instanceof BeanName)) {
+ return false;
+ }
+
+ BeanName that = (BeanName) obj;
+ return _instanceName.equals(that._instanceName) && _resourceName.equals(that._resourceName);
+ }
+
+ @Override
+ public int hashCode() {
+ return toString().hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return String.format("%s=%s,%s=%s", ClusterStatusMonitor.INSTANCE_DN_KEY, _instanceName,
+ ClusterStatusMonitor.RESOURCE_DN_KEY, _resourceName);
+ }
+ }
+
+ private final String _clusterName;
+ private List<String> _tags;
+ private final String _participantName;
+ private final String _resourceName;
+ private long _partitions;
+
+ public PerInstanceResourceMonitor(String clusterName, String participantName, String resourceName) {
+ _clusterName = clusterName;
+ _tags = ImmutableList.of(ClusterStatusMonitor.DEFAULT_TAG);
+ _participantName = participantName;
+ _resourceName = resourceName;
+ _partitions = 0;
+ }
+
+ @Override
+ public String getSensorName() {
+ return Joiner
+ .on('.')
+ .join(
+ ImmutableList.of(ClusterStatusMonitor.PARTICIPANT_STATUS_KEY, _clusterName,
+ serializedTags(), _participantName, _resourceName)).toString();
+ }
+
+ private String serializedTags() {
+ return Joiner.on('|').skipNulls().join(_tags).toString();
+ }
+
+ @Override
+ public long getPartitionGauge() {
+ return _partitions;
+ }
+
+ public String getInstanceName() {
+ return _participantName;
+ }
+
+ public String getResourceName() {
+ return _resourceName;
+ }
+
+ /**
+ * Update per-instance resource bean
+ * @param stateMap partition->state
+ * @tags tags instance tags
+ * @param stateModelDef
+ */
+ public synchronized void update(Map<PartitionId, State> stateMap, Set<String> tags,
+ StateModelDefinition stateModelDef) {
+ if (tags == null || tags.isEmpty()) {
+ _tags = ImmutableList.of(ClusterStatusMonitor.DEFAULT_TAG);
+ } else {
+ _tags = Lists.newArrayList(tags);
+ Collections.sort(_tags);
+ }
+
+ int cnt = 0;
+ for (State state : stateMap.values()) {
+ // Skip DROPPED and initial state (e.g. OFFLINE)
+ if (state.equals(HelixDefinedState.DROPPED)
+ || state.equals(stateModelDef.getTypedInitialState())) {
+ continue;
+ }
+ cnt++;
+ }
+ _partitions = cnt;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/96aef71c/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/PerInstanceResourceMonitorMBean.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/PerInstanceResourceMonitorMBean.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/PerInstanceResourceMonitorMBean.java
new file mode 100644
index 0000000..4b544b1
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/PerInstanceResourceMonitorMBean.java
@@ -0,0 +1,34 @@
+package org.apache.helix.monitoring.mbeans;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.monitoring.SensorNameProvider;
+
+/**
+ * A bean that describes the resource on each instance
+ */
+public interface PerInstanceResourceMonitorMBean extends SensorNameProvider {
+ /**
+ * Get the number of partitions of the resource in best possible ideal state
+ * for the instance
+ * @return number of partitions
+ */
+ long getPartitionGauge();
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/96aef71c/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
index d1ba595..4739fab 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
@@ -19,6 +19,7 @@ package org.apache.helix.monitoring.mbeans;
* under the License.
*/
+import java.util.Collections;
import java.util.Map;
import org.apache.helix.HelixDefinedState;
@@ -30,14 +31,15 @@ import org.apache.helix.model.IdealState;
import org.apache.log4j.Logger;
public class ResourceMonitor implements ResourceMonitorMBean {
- private int _numOfPartitions;
- int _numOfPartitionsInExternalView;
- int _numOfErrorPartitions;
- int _externalViewIdealStateDiff;
- String _tag = ClusterStatusMonitor.DEFAULT_TAG;
private static final Logger LOG = Logger.getLogger(ResourceMonitor.class);
- String _resourceName, _clusterName;
+ private int _numOfPartitions;
+ private int _numOfPartitionsInExternalView;
+ private int _numOfErrorPartitions;
+ private int _externalViewIdealStateDiff;
+ private String _tag = ClusterStatusMonitor.DEFAULT_TAG;
+ private String _resourceName;
+ private String _clusterName;
public ResourceMonitor(String clusterName, String resourceName) {
_clusterName = clusterName;
@@ -61,15 +63,15 @@ public class ResourceMonitor implements ResourceMonitorMBean {
@Override
public String getSensorName() {
- return ClusterStatusMonitor.RESOURCE_STATUS_KEY + "." + _clusterName + "." + _tag + "."
- + _resourceName;
+ return String.format("%s.%s.%s.%s", ClusterStatusMonitor.RESOURCE_STATUS_KEY, _clusterName,
+ _tag, _resourceName);
}
public String getResourceName() {
return _resourceName;
}
- public void updateExternalView(ExternalView externalView, IdealState idealState) {
+ public void updateResource(ExternalView externalView, IdealState idealState) {
if (externalView == null) {
LOG.warn("external view is null");
return;
@@ -97,6 +99,9 @@ public class ResourceMonitor implements ResourceMonitorMBean {
// or list fields (AUDO mode)
for (PartitionId partitionId : idealState.getPartitionIdSet()) {
Map<ParticipantId, State> idealRecord = idealState.getParticipantStateMap(partitionId);
+ if (idealRecord == null) {
+ idealRecord = Collections.emptyMap();
+ }
Map<ParticipantId, State> externalViewRecord = externalView.getStateMap(partitionId);
if (externalViewRecord == null) {
http://git-wip-us.apache.org/repos/asf/helix/blob/96aef71c/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java b/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java
index a00db67..0981a2e 100644
--- a/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java
+++ b/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java
@@ -37,12 +37,14 @@ import org.apache.helix.monitoring.mbeans.ClusterMBeanObserver;
import org.apache.helix.tools.ClusterSetup;
import org.apache.helix.tools.ClusterStateVerifier;
import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
+import org.apache.log4j.Logger;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
public class TestClusterStatusMonitorLifecycle extends ZkIntegrationTestBase {
+ private static final Logger LOG = Logger.getLogger(TestClusterStatusMonitorLifecycle.class);
MockParticipantManager[] _participants;
ClusterDistributedController[] _controllers;
@@ -176,12 +178,14 @@ public class TestClusterStatusMonitorLifecycle extends ZkIntegrationTestBase {
@Override
public void onMBeanRegistered(MBeanServerConnection server,
MBeanServerNotification mbsNotification) {
+ LOG.info("Register mbean: " + mbsNotification.getMBeanName());
_nMbeansRegistered++;
}
@Override
public void onMBeanUnRegistered(MBeanServerConnection server,
MBeanServerNotification mbsNotification) {
+ LOG.info("Unregister mbean: " + mbsNotification.getMBeanName());
_nMbeansUnregistered++;
}
}
@@ -196,10 +200,12 @@ public class TestClusterStatusMonitorLifecycle extends ZkIntegrationTestBase {
_participants[0].disconnect();
- // participant goes away. should be no change in number of beans as config is still present
+ // 1 participant goes away
+ // No change in instance/resource mbean
+ // Unregister 1 per-instance resource mbean
Thread.sleep(1000);
- Assert.assertEquals(nMbeansUnregistered, listener._nMbeansUnregistered);
- Assert.assertEquals(nMbeansRegistered, listener._nMbeansRegistered);
+ Assert.assertTrue(nMbeansUnregistered == listener._nMbeansUnregistered - 1);
+ Assert.assertTrue(nMbeansRegistered == listener._nMbeansRegistered);
HelixDataAccessor accessor = _participants[n - 1].getHelixDataAccessor();
String firstControllerName =
@@ -215,19 +221,25 @@ public class TestClusterStatusMonitorLifecycle extends ZkIntegrationTestBase {
Thread.sleep(1000);
// 1 cluster status monitor, 1 resource monitor, 5 instances
- Assert.assertEquals(nMbeansUnregistered, listener._nMbeansUnregistered - 7);
- Assert.assertEquals(nMbeansRegistered, listener._nMbeansRegistered - 7);
+ // Unregister 1+4+1 per-instance resource mbean
+ // Register 4 per-instance resource mbean
+ Assert.assertTrue(nMbeansUnregistered == listener._nMbeansUnregistered - 13);
+ Assert.assertTrue(nMbeansRegistered == listener._nMbeansRegistered - 11);
String instanceName = "localhost0_" + (12918 + 0);
_participants[0] = new MockParticipantManager(ZK_ADDR, _firstClusterName, instanceName);
_participants[0].syncStart();
- // participant goes back. should be no change
+ // 1 participant comes back
+ // No change in instance/resource mbean
+ // Register 1 per-instance resource mbean
Thread.sleep(1000);
- Assert.assertEquals(nMbeansUnregistered, listener._nMbeansUnregistered - 7);
- Assert.assertEquals(nMbeansRegistered, listener._nMbeansRegistered - 7);
+ Assert.assertTrue(nMbeansUnregistered == listener._nMbeansUnregistered - 13);
+ Assert.assertTrue(nMbeansRegistered == listener._nMbeansRegistered - 12);
- // Add a resource, one more mbean registered
+ // Add a resource
+ // Register 1 resource mbean
+ // Register 5 per-instance resource mbean
ClusterSetup setupTool = new ClusterSetup(ZK_ADDR);
IdealState idealState = accessor.getProperty(accessor.keyBuilder().idealStates("TestDB00"));
@@ -237,14 +249,16 @@ public class TestClusterStatusMonitorLifecycle extends ZkIntegrationTestBase {
Integer.parseInt(idealState.getReplicas()));
Thread.sleep(1000);
- Assert.assertEquals(nMbeansUnregistered, listener._nMbeansUnregistered - 7);
- Assert.assertEquals(nMbeansRegistered, listener._nMbeansRegistered - 8);
+ Assert.assertTrue(nMbeansUnregistered == listener._nMbeansUnregistered - 13);
+ Assert.assertTrue(nMbeansRegistered == listener._nMbeansRegistered - 18);
- // remove resource, no change
+ // Remove a resource
+ // No change in instance/resource mbean
+ // Unregister 5 per-instance resource mbean
setupTool.dropResourceFromCluster(_firstClusterName, "TestDB1");
Thread.sleep(1000);
- Assert.assertEquals(nMbeansUnregistered, listener._nMbeansUnregistered - 7);
- Assert.assertEquals(nMbeansRegistered, listener._nMbeansRegistered - 8);
+ Assert.assertTrue(nMbeansUnregistered == listener._nMbeansUnregistered - 18);
+ Assert.assertTrue(nMbeansRegistered == listener._nMbeansRegistered - 18);
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/96aef71c/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterStatusMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterStatusMonitor.java b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterStatusMonitor.java
index facb4ea..8c9ab01 100644
--- a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterStatusMonitor.java
+++ b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterStatusMonitor.java
@@ -19,98 +19,162 @@ package org.apache.helix.monitoring.mbeans;
* under the License.
*/
-import java.util.ArrayList;
+import java.lang.management.ManagementFactory;
import java.util.Date;
-import java.util.List;
-import java.util.UUID;
-
-import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.Mocks;
-import org.apache.helix.NotificationContext;
-import org.apache.helix.PropertyType;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.controller.strategy.DefaultTwoStateStrategy;
-import org.apache.helix.model.LiveInstance.LiveInstanceProperty;
+import java.util.Map;
+
+import javax.management.InstanceNotFoundException;
+import javax.management.MBeanServerConnection;
+import javax.management.ObjectName;
+
+import org.apache.helix.TestHelper;
+import org.apache.helix.api.Partition;
+import org.apache.helix.api.State;
+import org.apache.helix.api.config.ResourceConfig;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.api.id.StateModelDefId;
+import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.SemiAutoRebalancerConfig;
+import org.apache.helix.controller.stages.BestPossibleStateOutput;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.tools.StateModelConfigGenerator;
+import org.testng.Assert;
import org.testng.annotations.Test;
+import com.google.common.collect.Maps;
+
public class TestClusterStatusMonitor {
- List<String> _instances;
- List<ZNRecord> _liveInstances;
- String _db = "DB";
- String _db2 = "TestDB";
- int _replicas = 3;
- int _partitions = 50;
- ZNRecord _externalView, _externalView2;
-
- class MockDataAccessor extends Mocks.MockAccessor {
- public MockDataAccessor() {
- _instances = new ArrayList<String>();
- for (int i = 0; i < 5; i++) {
- String instance = "localhost_" + (12918 + i);
- _instances.add(instance);
- }
- ZNRecord externalView =
- DefaultTwoStateStrategy.calculateIdealState(_instances, _partitions, _replicas, _db,
- "MASTER", "SLAVE");
-
- ZNRecord externalView2 =
- DefaultTwoStateStrategy.calculateIdealState(_instances, 80, 2, _db2, "MASTER", "SLAVE");
- }
+ private static final MBeanServerConnection _server = ManagementFactory.getPlatformMBeanServer();
+
+ @Test()
+ public void testReportData() throws Exception {
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+ int n = 5;
+ String testDB = "TestDB";
+ String testDB_0 = testDB + "_0";
- public ZNRecord getProperty(PropertyType type, String resource) {
- if (type == PropertyType.IDEALSTATES || type == PropertyType.EXTERNALVIEW) {
- if (resource.equals(_db)) {
- return _externalView;
- } else if (resource.equals(_db2)) {
- return _externalView2;
- }
- }
- return null;
+ System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+ ClusterStatusMonitor monitor = new ClusterStatusMonitor(clusterName);
+ ObjectName clusterMonitorObjName = monitor.getObjectName(monitor.clusterBeanName());
+ try {
+ _server.getMBeanInfo(clusterMonitorObjName);
+ } catch (Exception e) {
+ Assert.fail("Fail to register ClusterStatusMonitor");
}
- }
- class MockHelixManager extends Mocks.MockManager {
- MockDataAccessor _accessor = new MockDataAccessor();
+ // Test #setPerInstanceResourceStatus()
+ BestPossibleStateOutput bestPossibleStates = new BestPossibleStateOutput();
+ ResourceAssignment assignment = new ResourceAssignment(ResourceId.from(testDB));
+ Map<ParticipantId, State> replicaMap = Maps.newHashMap();
+ replicaMap.put(ParticipantId.from("localhost_12918"), State.from("MASTER"));
+ replicaMap.put(ParticipantId.from("localhost_12919"), State.from("SLAVE"));
+ replicaMap.put(ParticipantId.from("localhost_12920"), State.from("SLAVE"));
+ replicaMap.put(ParticipantId.from("localhost_12921"), State.from("OFFLINE"));
+ replicaMap.put(ParticipantId.from("localhost_12922"), State.from("DROPPED"));
+ assignment.addReplicaMap(PartitionId.from(testDB_0), replicaMap);
+ bestPossibleStates.setResourceAssignment(ResourceId.from(testDB), assignment);
- @Override
- public HelixDataAccessor getHelixDataAccessor() {
- return _accessor;
+ Map<String, InstanceConfig> instanceConfigMap = Maps.newHashMap();
+ for (int i = 0; i < n; i++) {
+ String instanceName = "localhost_" + (12918 + i);
+ InstanceConfig config = new InstanceConfig(instanceName);
+ instanceConfigMap.put(instanceName, config);
}
- }
+ Map<ResourceId, ResourceConfig> resourceMap = Maps.newHashMap();
+ ResourceId resourceId = ResourceId.from(testDB);
+ RebalancerConfig rebalancerConfig =
+ new SemiAutoRebalancerConfig.Builder(resourceId)
+ .addPartition(new Partition(PartitionId.from(testDB_0)))
+ .stateModelDefId(StateModelDefId.from("MasterSlave")).build();
+ ResourceConfig resourceConfig =
+ new ResourceConfig.Builder(resourceId).rebalancerConfig(rebalancerConfig).build();
+ resourceMap.put(resourceId, resourceConfig);
- @Test()
- public void TestReportData() {
- System.out.println("START TestClusterStatusMonitor at" + new Date(System.currentTimeMillis()));
- List<String> _instances;
- List<ZNRecord> _liveInstances = new ArrayList<ZNRecord>();
- String _db = "DB";
- int _replicas = 3;
- int _partitions = 50;
-
- _instances = new ArrayList<String>();
- for (int i = 0; i < 5; i++) {
- String instance = "localhost_" + (12918 + i);
- _instances.add(instance);
- ZNRecord metaData = new ZNRecord(instance);
- metaData.setSimpleField(LiveInstanceProperty.SESSION_ID.toString(), UUID.randomUUID()
- .toString());
- _liveInstances.add(metaData);
+ Map<String, StateModelDefinition> stateModelDefMap = Maps.newHashMap();
+ StateModelDefinition msStateModelDef =
+ new StateModelDefinition(StateModelConfigGenerator.generateConfigForMasterSlave());
+ stateModelDefMap.put("MasterSlave", msStateModelDef);
+
+ monitor.setPerInstanceResourceStatus(bestPossibleStates, instanceConfigMap, resourceMap,
+ stateModelDefMap);
+
+ // localhost_12918 should have 1 partition because it's MASTER
+ ObjectName objName =
+ monitor.getObjectName(monitor.getPerInstanceResourceBeanName("localhost_12918", testDB));
+ Object value = _server.getAttribute(objName, "PartitionGauge");
+ Assert.assertTrue(value instanceof Long);
+ Assert.assertEquals((Long) value, new Long(1));
+ value = _server.getAttribute(objName, "SensorName");
+ Assert.assertTrue(value instanceof String);
+ Assert.assertEquals((String) value, String.format("%s.%s.%s.%s.%s",
+ ClusterStatusMonitor.PARTICIPANT_STATUS_KEY, clusterName, ClusterStatusMonitor.DEFAULT_TAG,
+ "localhost_12918", testDB));
+
+ // localhost_12919 should have 1 partition because it's SLAVE
+ objName =
+ monitor.getObjectName(monitor.getPerInstanceResourceBeanName("localhost_12919", testDB));
+ value = _server.getAttribute(objName, "PartitionGauge");
+ Assert.assertTrue(value instanceof Long);
+ Assert.assertEquals((Long) value, new Long(1));
+
+ // localhost_12921 should have 0 partition because it's OFFLINE
+ objName =
+ monitor.getObjectName(monitor.getPerInstanceResourceBeanName("localhost_12921", testDB));
+ value = _server.getAttribute(objName, "PartitionGauge");
+ Assert.assertTrue(value instanceof Long);
+ Assert.assertEquals((Long) value, new Long(0));
+
+ // localhost_12922 should have 0 partition because it's DROPPED
+ objName =
+ monitor.getObjectName(monitor.getPerInstanceResourceBeanName("localhost_12922", testDB));
+ value = _server.getAttribute(objName, "PartitionGauge");
+ Assert.assertTrue(value instanceof Long);
+ Assert.assertEquals((Long) value, new Long(0));
+
+ // Missing localhost_12918 in best possible ideal-state should remove it from mbean
+ replicaMap.remove(ParticipantId.from("localhost_12918"));
+ assignment.addReplicaMap(PartitionId.from(testDB_0), replicaMap);
+ bestPossibleStates.setResourceAssignment(ResourceId.from(testDB), assignment);
+ monitor.setPerInstanceResourceStatus(bestPossibleStates, instanceConfigMap, resourceMap,
+ stateModelDefMap);
+ try {
+ objName =
+ monitor.getObjectName(monitor.getPerInstanceResourceBeanName("localhost_12918", testDB));
+ _server.getMBeanInfo(objName);
+ Assert.fail("Fail to unregister PerInstanceResource mbean for localhost_12918");
+
+ } catch (InstanceNotFoundException e) {
+ // OK
}
- ZNRecord externalView =
- DefaultTwoStateStrategy.calculateIdealState(_instances, _partitions, _replicas, _db,
- "MASTER", "SLAVE");
- ZNRecord externalView2 =
- DefaultTwoStateStrategy.calculateIdealState(_instances, 80, 2, "TestDB", "MASTER", "SLAVE");
+ // Clean up
+ monitor.reset();
+
+ try {
+ objName =
+ monitor.getObjectName(monitor.getPerInstanceResourceBeanName("localhost_12920", testDB));
+ _server.getMBeanInfo(objName);
+ Assert.fail("Fail to unregister PerInstanceResource mbean for localhost_12920");
- List<ZNRecord> externalViews = new ArrayList<ZNRecord>();
- externalViews.add(externalView);
- externalViews.add(externalView2);
+ } catch (InstanceNotFoundException e) {
+ // OK
+ }
+
+ try {
+ _server.getMBeanInfo(clusterMonitorObjName);
+ Assert.fail("Fail to unregister ClusterStatusMonitor");
+ } catch (InstanceNotFoundException e) {
+ // OK
+ }
- ClusterStatusMonitor monitor = new ClusterStatusMonitor("cluster1");
- MockHelixManager manager = new MockHelixManager();
- NotificationContext context = new NotificationContext(manager);
- System.out.println("END TestClusterStatusMonitor at" + new Date(System.currentTimeMillis()));
+ System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/96aef71c/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestResourceMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestResourceMonitor.java b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestResourceMonitor.java
index e8bb4b6..dcca755 100644
--- a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestResourceMonitor.java
+++ b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestResourceMonitor.java
@@ -28,14 +28,13 @@ import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixProperty;
import org.apache.helix.Mocks;
import org.apache.helix.PropertyKey;
-import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.PropertyType;
import org.apache.helix.ZNRecord;
import org.apache.helix.controller.strategy.DefaultTwoStateStrategy;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.LiveInstance.LiveInstanceProperty;
-import org.testng.AssertJUnit;
+import org.testng.Assert;
import org.testng.annotations.Test;
public class TestResourceMonitor {
@@ -106,46 +105,52 @@ public class TestResourceMonitor {
}
@Test()
- public void TestReportData() {
- MockHelixManager manager = new MockHelixManager();
+ public void testReportData() {
+ final int n = 5;
ResourceMonitor monitor = new ResourceMonitor(_clusterName, _dbName);
- HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
- Builder keyBuilder = helixDataAccessor.keyBuilder();
- ExternalView externalView = helixDataAccessor.getProperty(keyBuilder.externalView(_dbName));
- IdealState idealState = helixDataAccessor.getProperty(keyBuilder.idealStates(_dbName));
+ List<String> instances = new ArrayList<String>();
+ for (int i = 0; i < n; i++) {
+ String instance = "localhost_" + (12918 + i);
+ instances.add(instance);
+ }
- monitor.updateExternalView(externalView, idealState);
+ ZNRecord idealStateRecord =
+ DefaultTwoStateStrategy.calculateIdealState(instances, _partitions, _replicas, _dbName,
+ "MASTER", "SLAVE");
+ IdealState idealState = new IdealState(idealStateRecord);
+ ExternalView externalView = new ExternalView(idealStateRecord);
- AssertJUnit.assertEquals(monitor.getDifferenceWithIdealStateGauge(), 0);
- AssertJUnit.assertEquals(monitor.getErrorPartitionGauge(), 0);
- AssertJUnit.assertEquals(monitor.getExternalViewPartitionGauge(), _partitions);
- AssertJUnit.assertEquals(monitor.getPartitionGauge(), _partitions);
- monitor.getBeanName();
+ monitor.updateResource(externalView, idealState);
- int n = 4;
- for (int i = 0; i < n; i++) {
+ Assert.assertEquals(monitor.getDifferenceWithIdealStateGauge(), 0);
+ Assert.assertEquals(monitor.getErrorPartitionGauge(), 0);
+ Assert.assertEquals(monitor.getExternalViewPartitionGauge(), _partitions);
+ Assert.assertEquals(monitor.getPartitionGauge(), _partitions);
+ // monitor.getBeanName();
+
+ final int m = n - 1;
+ for (int i = 0; i < m; i++) {
Map<String, String> map = externalView.getStateMap(_dbName + "_" + 3 * i);
String key = map.keySet().toArray()[0].toString();
map.put(key, "ERROR");
externalView.setStateMap(_dbName + "_" + 3 * i, map);
}
- monitor.updateExternalView(externalView, idealState);
- AssertJUnit.assertEquals(monitor.getDifferenceWithIdealStateGauge(), 0);
- AssertJUnit.assertEquals(monitor.getErrorPartitionGauge(), n);
- AssertJUnit.assertEquals(monitor.getExternalViewPartitionGauge(), _partitions);
- AssertJUnit.assertEquals(monitor.getPartitionGauge(), _partitions);
+ monitor.updateResource(externalView, idealState);
+ Assert.assertEquals(monitor.getDifferenceWithIdealStateGauge(), 0);
+ Assert.assertEquals(monitor.getErrorPartitionGauge(), m);
+ Assert.assertEquals(monitor.getExternalViewPartitionGauge(), _partitions);
+ Assert.assertEquals(monitor.getPartitionGauge(), _partitions);
- n = 5;
for (int i = 0; i < n; i++) {
externalView.getRecord().getMapFields().remove(_dbName + "_" + 4 * i);
}
- monitor.updateExternalView(externalView, idealState);
- AssertJUnit.assertEquals(monitor.getDifferenceWithIdealStateGauge(), n * (_replicas + 1));
- AssertJUnit.assertEquals(monitor.getErrorPartitionGauge(), 3);
- AssertJUnit.assertEquals(monitor.getExternalViewPartitionGauge(), _partitions - n);
- AssertJUnit.assertEquals(monitor.getPartitionGauge(), _partitions);
+ monitor.updateResource(externalView, idealState);
+ Assert.assertEquals(monitor.getDifferenceWithIdealStateGauge(), n * (_replicas + 1));
+ Assert.assertEquals(monitor.getErrorPartitionGauge(), 3);
+ Assert.assertEquals(monitor.getExternalViewPartitionGauge(), _partitions - n);
+ Assert.assertEquals(monitor.getPartitionGauge(), _partitions);
}
}
[6/7] git commit: [HELIX-94] Add the ability to enable and disable a
resource, rb=20401
Posted by ka...@apache.org.
[HELIX-94] Add the ability to enable and disable a resource, rb=20401
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/c1af744a
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/c1af744a
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/c1af744a
Branch: refs/heads/master
Commit: c1af744af6088ad98fe2a8a5074f2c56199b6f82
Parents: 96aef71
Author: zzhang <zz...@apache.org>
Authored: Wed May 21 14:56:47 2014 -0700
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Fri Jul 11 10:04:37 2014 -0700
----------------------------------------------------------------------
.../helix/webapp/resources/JsonParameters.java | 4 +
.../webapp/resources/ResourceGroupResource.java | 7 +
.../helix/webapp/TestDisableResource.java | 84 ++++++
.../main/java/org/apache/helix/HelixAdmin.java | 7 +
.../controller/rebalancer/CustomRebalancer.java | 5 +-
.../rebalancer/FallbackRebalancer.java | 4 +-
.../rebalancer/FullAutoRebalancer.java | 5 +-
.../rebalancer/SemiAutoRebalancer.java | 5 +-
.../config/SemiAutoRebalancerConfig.java | 2 +-
.../util/ConstraintBasedAssignment.java | 26 +-
.../stages/BestPossibleStateCalcStage.java | 4 +-
.../apache/helix/manager/zk/ZKHelixAdmin.java | 84 ++++--
.../java/org/apache/helix/model/IdealState.java | 21 +-
.../participant/HelixCustomCodeRunner.java | 16 +-
.../org/apache/helix/tools/ClusterSetup.java | 17 +-
.../strategy/TestAutoRebalanceStrategy.java | 2 +-
.../strategy/TestNewAutoRebalanceStrategy.java | 2 +-
.../TestDisableCustomCodeRunner.java | 252 +++++++++++++++++
.../helix/integration/TestDisableResource.java | 267 +++++++++++++++++++
.../helix/manager/zk/TestZkHelixAdmin.java | 34 ++-
.../apache/helix/tools/TestClusterSetup.java | 39 ++-
21 files changed, 826 insertions(+), 61 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/c1af744a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JsonParameters.java
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JsonParameters.java b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JsonParameters.java
index 5f405d8..19ac71a 100644
--- a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JsonParameters.java
+++ b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JsonParameters.java
@@ -165,6 +165,10 @@ public class JsonParameters {
if (!_parameterMap.containsKey(ENABLED)) {
throw new HelixException("Missing Json parameters: '" + ENABLED + "'");
}
+ } else if (command.equalsIgnoreCase(ClusterSetup.enableResource)) {
+ if (!_parameterMap.containsKey(ENABLED)) {
+ throw new HelixException("Missing Json parameters: '" + ENABLED + "'");
+ }
} else if (command.equalsIgnoreCase(ClusterSetup.enablePartition)) {
if (!_parameterMap.containsKey(ENABLED)) {
throw new HelixException("Missing Json parameters: '" + ENABLED + "'");
http://git-wip-us.apache.org/repos/asf/helix/blob/c1af744a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/ResourceGroupResource.java
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/ResourceGroupResource.java b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/ResourceGroupResource.java
index 055f64a..6dc721d 100644
--- a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/ResourceGroupResource.java
+++ b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/ResourceGroupResource.java
@@ -114,6 +114,13 @@ public class ResourceGroupResource extends ServerResource {
ClusterSetup setupTool = new ClusterSetup(zkClient);
setupTool.getClusterManagementTool()
.resetResource(clusterName, Arrays.asList(resourceName));
+ } else if (command.equalsIgnoreCase(ClusterSetup.enableResource)) {
+ jsonParameters.verifyCommand(ClusterSetup.enableResource);
+ boolean enabled = Boolean.parseBoolean(jsonParameters.getParameter(JsonParameters.ENABLED));
+ ZkClient zkClient =
+ (ZkClient) getContext().getAttributes().get(RestAdminApplication.ZKCLIENT);
+ ClusterSetup setupTool = new ClusterSetup(zkClient);
+ setupTool.getClusterManagementTool().enableResource(clusterName, resourceName, enabled);
} else {
throw new HelixException("Unsupported command: " + command + ". Should be one of ["
+ ClusterSetup.resetResource + "]");
http://git-wip-us.apache.org/repos/asf/helix/blob/c1af744a/helix-admin-webapp/src/test/java/org/apache/helix/webapp/TestDisableResource.java
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/src/test/java/org/apache/helix/webapp/TestDisableResource.java b/helix-admin-webapp/src/test/java/org/apache/helix/webapp/TestDisableResource.java
new file mode 100644
index 0000000..9800179
--- /dev/null
+++ b/helix-admin-webapp/src/test/java/org/apache/helix/webapp/TestDisableResource.java
@@ -0,0 +1,84 @@
+package org.apache.helix.webapp;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.webapp.resources.JsonParameters;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestDisableResource extends AdminTestBase {
+
+ @Test
+ public void test() throws Exception {
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+ final int n = 5;
+
+ System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+ TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+ "localhost", // participant name prefix
+ "TestDB", // resource name prefix
+ 1, // resources
+ 10, // partitions per resource
+ n, // number of nodes
+ 3, // replicas
+ "MasterSlave", true); // do rebalance
+
+ String instanceUrl =
+ "http://localhost:" + ADMIN_PORT + "/clusters/" + clusterName + "/resourceGroups/"
+ + "TestDB0";
+
+ // Disable TestDB0
+ Map<String, String> paramMap = new HashMap<String, String>();
+ paramMap.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.enableResource);
+ paramMap.put(JsonParameters.ENABLED, Boolean.toString(false));
+ TestHelixAdminScenariosRest.assertSuccessPostOperation(instanceUrl, paramMap, false);
+
+ BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
+ HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, baseAccessor);
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+ IdealState idealState = accessor.getProperty(keyBuilder.idealStates("TestDB0"));
+ Assert.assertFalse(idealState.isEnabled());
+
+ // Re-enable TestDB0
+ paramMap.put(JsonParameters.ENABLED, Boolean.toString(true));
+ TestHelixAdminScenariosRest.assertSuccessPostOperation(instanceUrl, paramMap, false);
+
+ idealState = accessor.getProperty(keyBuilder.idealStates("TestDB0"));
+ Assert.assertTrue(idealState.isEnabled());
+
+ System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/c1af744a/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/HelixAdmin.java b/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
index 892bada..dce3893 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
@@ -177,6 +177,13 @@ public interface HelixAdmin {
void enableInstance(String clusterName, String instanceName, boolean enabled);
/**
+ * Disable or enable a resource
+ * @param clusterName
+ * @param resourceName
+ */
+ void enableResource(String clusterName, String resourceName, boolean enabled);
+
+ /**
* Disable or enable a list of partitions on an instance
* @param enabled
* @param clusterName
http://git-wip-us.apache.org/repos/asf/helix/blob/c1af744a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java
index 6629bec..4d5c373 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java
@@ -14,6 +14,7 @@ import org.apache.helix.controller.rebalancer.config.CustomRebalancerConfig;
import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
import org.apache.helix.controller.stages.ResourceCurrentState;
+import org.apache.helix.model.IdealState;
import org.apache.helix.model.ResourceAssignment;
import org.apache.helix.model.StateModelDefinition;
import org.apache.log4j.Logger;
@@ -51,6 +52,8 @@ public class CustomRebalancer implements HelixRebalancer {
ResourceAssignment prevAssignment, Cluster cluster, ResourceCurrentState currentState) {
CustomRebalancerConfig config =
BasicRebalancerConfig.convert(rebalancerConfig, CustomRebalancerConfig.class);
+ IdealState idealState = cluster.getResource(rebalancerConfig.getResourceId()).getIdealState();
+ boolean isEnabled = (idealState != null) ? idealState.isEnabled() : true;
StateModelDefinition stateModelDef =
cluster.getStateModelMap().get(config.getStateModelDefId());
if (LOG.isDebugEnabled()) {
@@ -65,7 +68,7 @@ public class CustomRebalancer implements HelixRebalancer {
Map<ParticipantId, State> bestStateForPartition =
ConstraintBasedAssignment.computeCustomizedBestStateForPartition(cluster
.getLiveParticipantMap().keySet(), stateModelDef, config.getPreferenceMap(partition),
- currentStateMap, disabledInstancesForPartition);
+ currentStateMap, disabledInstancesForPartition, isEnabled);
partitionMapping.addReplicaMap(partition, bestStateForPartition);
}
return partitionMapping;
http://git-wip-us.apache.org/repos/asf/helix/blob/c1af744a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FallbackRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FallbackRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FallbackRebalancer.java
index 3aa41d7..e00e57c 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FallbackRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FallbackRebalancer.java
@@ -160,7 +160,7 @@ public class FallbackRebalancer implements HelixRebalancer {
ConstraintBasedAssignment.computeCustomizedBestStateForPartition(cluster
.getLiveParticipantMap().keySet(), stateModelDef, newIdealState
.getParticipantStateMap(partitionId), currentState.getCurrentStateMap(resourceId,
- partitionId), disabledParticipants);
+ partitionId), disabledParticipants, idealState.isEnabled());
assignment.addReplicaMap(partitionId, replicaMap);
}
} else {
@@ -176,7 +176,7 @@ public class FallbackRebalancer implements HelixRebalancer {
ConstraintBasedAssignment.computeAutoBestStateForPartition(upperBounds, cluster
.getLiveParticipantMap().keySet(), stateModelDef, newIdealState
.getPreferenceList(partitionId), currentState.getCurrentStateMap(resourceId,
- partitionId), disabledParticipants);
+ partitionId), disabledParticipants, idealState.isEnabled());
assignment.addReplicaMap(partitionId, replicaMap);
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/c1af744a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FullAutoRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FullAutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FullAutoRebalancer.java
index 13616b3..4bf030b 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FullAutoRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FullAutoRebalancer.java
@@ -25,6 +25,7 @@ import org.apache.helix.controller.stages.ResourceCurrentState;
import org.apache.helix.controller.strategy.AutoRebalanceStrategy;
import org.apache.helix.controller.strategy.AutoRebalanceStrategy.DefaultPlacementScheme;
import org.apache.helix.controller.strategy.AutoRebalanceStrategy.ReplicaPlacementScheme;
+import org.apache.helix.model.IdealState;
import org.apache.helix.model.ResourceAssignment;
import org.apache.helix.model.StateModelDefinition;
import org.apache.log4j.Logger;
@@ -67,6 +68,8 @@ public class FullAutoRebalancer implements HelixRebalancer {
ResourceAssignment prevAssignment, Cluster cluster, ResourceCurrentState currentState) {
FullAutoRebalancerConfig config =
BasicRebalancerConfig.convert(rebalancerConfig, FullAutoRebalancerConfig.class);
+ IdealState idealState = cluster.getResource(rebalancerConfig.getResourceId()).getIdealState();
+ boolean isEnabled = (idealState != null) ? idealState.isEnabled() : true;
StateModelDefinition stateModelDef =
cluster.getStateModelMap().get(config.getStateModelDefId());
// Compute a preference list based on the current ideal state
@@ -176,7 +179,7 @@ public class FullAutoRebalancer implements HelixRebalancer {
ConstraintBasedAssignment.computeAutoBestStateForPartition(upperBounds,
liveParticipants.keySet(), stateModelDef, preferenceList,
currentState.getCurrentStateMap(config.getResourceId(), partition),
- disabledParticipantsForPartition);
+ disabledParticipantsForPartition, isEnabled);
partitionMapping.addReplicaMap(partition, bestStateForPartition);
}
return partitionMapping;
http://git-wip-us.apache.org/repos/asf/helix/blob/c1af744a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/SemiAutoRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/SemiAutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/SemiAutoRebalancer.java
index 51ca463..07f6337 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/SemiAutoRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/SemiAutoRebalancer.java
@@ -15,6 +15,7 @@ import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
import org.apache.helix.controller.rebalancer.config.SemiAutoRebalancerConfig;
import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
import org.apache.helix.controller.stages.ResourceCurrentState;
+import org.apache.helix.model.IdealState;
import org.apache.helix.model.ResourceAssignment;
import org.apache.helix.model.StateModelDefinition;
import org.apache.log4j.Logger;
@@ -55,6 +56,8 @@ public class SemiAutoRebalancer implements HelixRebalancer {
ResourceAssignment prevAssignment, Cluster cluster, ResourceCurrentState currentState) {
SemiAutoRebalancerConfig config =
BasicRebalancerConfig.convert(rebalancerConfig, SemiAutoRebalancerConfig.class);
+ IdealState idealState = cluster.getResource(rebalancerConfig.getResourceId()).getIdealState();
+ boolean isEnabled = (idealState != null) ? idealState.isEnabled() : true;
StateModelDefinition stateModelDef =
cluster.getStateModelMap().get(config.getStateModelDefId());
if (LOG.isDebugEnabled()) {
@@ -77,7 +80,7 @@ public class SemiAutoRebalancer implements HelixRebalancer {
Map<ParticipantId, State> bestStateForPartition =
ConstraintBasedAssignment.computeAutoBestStateForPartition(upperBounds, cluster
.getLiveParticipantMap().keySet(), stateModelDef, preferenceList, currentStateMap,
- disabledInstancesForPartition);
+ disabledInstancesForPartition, isEnabled);
partitionMapping.addReplicaMap(partition, bestStateForPartition);
}
return partitionMapping;
http://git-wip-us.apache.org/repos/asf/helix/blob/c1af744a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/SemiAutoRebalancerConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/SemiAutoRebalancerConfig.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/SemiAutoRebalancerConfig.java
index 727c3df..60e30f4 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/SemiAutoRebalancerConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/SemiAutoRebalancerConfig.java
@@ -116,7 +116,7 @@ public final class SemiAutoRebalancerConfig extends PartitionedRebalancerConfig
Map<ParticipantId, State> emptyCurrentState = Collections.emptyMap();
Map<ParticipantId, State> initialMap =
ConstraintBasedAssignment.computeAutoBestStateForPartition(upperBounds, participantSet,
- stateModelDef, preferenceList, emptyCurrentState, disabledParticipants);
+ stateModelDef, preferenceList, emptyCurrentState, disabledParticipants, true);
currentMapping.put(partitionId, initialMap);
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/c1af744a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java
index 7951784..addd652 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java
@@ -117,23 +117,24 @@ public class ConstraintBasedAssignment {
* @param participants participants selected to serve the partition
* @param disabledParticipants participants that have been disabled for this partition
* @param initialState the initial state of the resource state model
+ * @param isEnabled true if resource is enabled, false otherwise
* @return map of participant id to state of dropped and disabled partitions
*/
public static Map<ParticipantId, State> dropAndDisablePartitions(
Map<ParticipantId, State> currentStateMap, Collection<ParticipantId> participants,
- Set<ParticipantId> disabledParticipants, State initialState) {
+ Set<ParticipantId> disabledParticipants, boolean isEnabled, State initialState) {
Map<ParticipantId, State> participantStateMap = new HashMap<ParticipantId, State>();
// if the resource is deleted, instancePreferenceList will be empty and
// we should drop all resources.
if (currentStateMap != null) {
for (ParticipantId participantId : currentStateMap.keySet()) {
if ((participants == null || !participants.contains(participantId))
- && !disabledParticipants.contains(participantId)) {
+ && !disabledParticipants.contains(participantId) && isEnabled) {
// if dropped and not disabled, transit to DROPPED
participantStateMap.put(participantId, State.from(HelixDefinedState.DROPPED));
} else if ((currentStateMap.get(participantId) == null || !currentStateMap.get(
participantId).equals(State.from(HelixDefinedState.ERROR)))
- && disabledParticipants.contains(participantId)) {
+ && (disabledParticipants.contains(participantId) || !isEnabled)) {
// if disabled and not in ERROR state, transit to initial-state (e.g. OFFLINE)
participantStateMap.put(participantId, initialState);
}
@@ -151,16 +152,18 @@ public class ConstraintBasedAssignment {
* @param currentStateMap
* : participant->state for each partition
* @param disabledParticipantsForPartition
+ * @param isEnabled true if resource is enabled, false if disabled
* @return
*/
public static Map<ParticipantId, State> computeAutoBestStateForPartition(
Map<State, String> upperBounds, Set<ParticipantId> liveParticipantSet,
StateModelDefinition stateModelDef, List<ParticipantId> participantPreferenceList,
- Map<ParticipantId, State> currentStateMap, Set<ParticipantId> disabledParticipantsForPartition) {
+ Map<ParticipantId, State> currentStateMap,
+ Set<ParticipantId> disabledParticipantsForPartition, boolean isEnabled) {
// drop and disable participants if necessary
Map<ParticipantId, State> participantStateMap =
dropAndDisablePartitions(currentStateMap, participantPreferenceList,
- disabledParticipantsForPartition, stateModelDef.getTypedInitialState());
+ disabledParticipantsForPartition, isEnabled, stateModelDef.getTypedInitialState());
// resource is deleted
if (participantPreferenceList == null) {
@@ -176,7 +179,7 @@ public class ConstraintBasedAssignment {
if ("N".equals(num)) {
Set<ParticipantId> liveAndEnabled = new HashSet<ParticipantId>(liveParticipantSet);
liveAndEnabled.removeAll(disabledParticipantsForPartition);
- stateCount = liveAndEnabled.size();
+ stateCount = isEnabled ? liveAndEnabled.size() : 0;
} else if ("R".equals(num)) {
stateCount = participantPreferenceList.size();
} else {
@@ -198,7 +201,7 @@ public class ConstraintBasedAssignment {
.equals(State.from(HelixDefinedState.ERROR));
if (liveParticipantSet.contains(participantId) && !assigned[i] && notInErrorState
- && !disabledParticipantsForPartition.contains(participantId)) {
+ && !disabledParticipantsForPartition.contains(participantId) && isEnabled) {
participantStateMap.put(participantId, state);
count = count + 1;
assigned[i] = true;
@@ -268,12 +271,13 @@ public class ConstraintBasedAssignment {
* @param preferenceMap
* @param currentStateMap
* @param disabledParticipantsForPartition
+ * @param isEnabled
* @return
*/
public static Map<ParticipantId, State> computeCustomizedBestStateForPartition(
Set<ParticipantId> liveParticipantSet, StateModelDefinition stateModelDef,
Map<ParticipantId, State> preferenceMap, Map<ParticipantId, State> currentStateMap,
- Set<ParticipantId> disabledParticipantsForPartition) {
+ Set<ParticipantId> disabledParticipantsForPartition, boolean isEnabled) {
Map<ParticipantId, State> participantStateMap = new HashMap<ParticipantId, State>();
// if the resource is deleted, idealStateMap will be null/empty and
@@ -281,12 +285,12 @@ public class ConstraintBasedAssignment {
if (currentStateMap != null) {
for (ParticipantId participantId : currentStateMap.keySet()) {
if ((preferenceMap == null || !preferenceMap.containsKey(participantId))
- && !disabledParticipantsForPartition.contains(participantId)) {
+ && !disabledParticipantsForPartition.contains(participantId) && isEnabled) {
// if dropped and not disabled, transit to DROPPED
participantStateMap.put(participantId, State.from(HelixDefinedState.DROPPED));
} else if ((currentStateMap.get(participantId) == null || !currentStateMap.get(
participantId).equals(State.from(HelixDefinedState.ERROR)))
- && disabledParticipantsForPartition.contains(participantId)) {
+ && (disabledParticipantsForPartition.contains(participantId) || !isEnabled)) {
// if disabled and not in ERROR state, transit to initial-state (e.g. OFFLINE)
participantStateMap.put(participantId, stateModelDef.getTypedInitialState());
}
@@ -304,7 +308,7 @@ public class ConstraintBasedAssignment {
|| !currentStateMap.get(participantId).equals(State.from(HelixDefinedState.ERROR));
if (liveParticipantSet.contains(participantId) && notInErrorState
- && !disabledParticipantsForPartition.contains(participantId)) {
+ && !disabledParticipantsForPartition.contains(participantId) && isEnabled) {
participantStateMap.put(participantId, preferenceMap.get(participantId));
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/c1af744a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
index 2f93b7f..8c34f6b 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
@@ -120,7 +120,7 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
partitionMapping.addReplicaMap(partitionId, ConstraintBasedAssignment
.computeAutoBestStateForPartition(upperBounds, cluster.getLiveParticipantMap().keySet(),
stateModelDef, null, currentStateOutput.getCurrentStateMap(resourceId, partitionId),
- disabledParticipantsForPartition));
+ disabledParticipantsForPartition, true));
}
return partitionMapping;
}
@@ -163,7 +163,7 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
Set<ParticipantId> participants = participantStateMap.keySet();
Map<ParticipantId, State> droppedAndDisabledMap =
ConstraintBasedAssignment.dropAndDisablePartitions(currentStateMap, participants,
- disabledParticipants, initialState);
+ disabledParticipants, true, initialState);
// don't map error participants
for (ParticipantId participantId : errorParticipants) {
http://git-wip-us.apache.org/repos/asf/helix/blob/c1af744a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
index 6dc5541..da0c80c 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
@@ -39,6 +39,7 @@ import java.util.concurrent.TimeUnit;
import org.I0Itec.zkclient.DataUpdater;
import org.I0Itec.zkclient.exception.ZkNoNodeException;
import org.apache.helix.AccessOption;
+import org.apache.helix.BaseDataAccessor;
import org.apache.helix.ConfigAccessor;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixConstants;
@@ -130,7 +131,6 @@ public class ZKHelixAdmin implements HelixAdmin {
@Override
public void dropInstance(String clusterName, InstanceConfig instanceConfig) {
- // String instanceConfigsPath = HelixUtil.getConfigPath(clusterName);
String instanceConfigsPath =
PropertyPathConfig.getPath(PropertyType.CONFIGS, clusterName,
ConfigScopeProperty.PARTICIPANT.toString());
@@ -157,9 +157,6 @@ public class ZKHelixAdmin implements HelixAdmin {
@Override
public InstanceConfig getInstanceConfig(String clusterName, String instanceName) {
- // String instanceConfigsPath = HelixUtil.getConfigPath(clusterName);
-
- // String instanceConfigPath = instanceConfigsPath + "/" + instanceName;
String instanceConfigPath =
PropertyPathConfig.getPath(PropertyType.CONFIGS, clusterName,
ConfigScopeProperty.PARTICIPANT.toString(), instanceName);
@@ -168,7 +165,7 @@ public class ZKHelixAdmin implements HelixAdmin {
+ clusterName);
}
- ZKHelixDataAccessor accessor =
+ HelixDataAccessor accessor =
new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
Builder keyBuilder = accessor.keyBuilder();
@@ -182,7 +179,7 @@ public class ZKHelixAdmin implements HelixAdmin {
PropertyPathConfig.getPath(PropertyType.CONFIGS, clusterName,
ConfigScopeProperty.PARTICIPANT.toString(), instanceName);
- ZkBaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient);
+ BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient);
if (!baseAccessor.exists(path, 0)) {
throw new HelixException("Cluster " + clusterName + ", instance: " + instanceName
+ ", instance config does not exist");
@@ -204,13 +201,36 @@ public class ZKHelixAdmin implements HelixAdmin {
}
@Override
+ public void enableResource(final String clusterName, final String resourceName,
+ final boolean enabled) {
+ String path = PropertyPathConfig.getPath(PropertyType.IDEALSTATES, clusterName, resourceName);
+ BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient);
+ if (!baseAccessor.exists(path, 0)) {
+ throw new HelixException("Cluster " + clusterName + ", resource: " + resourceName
+ + ", ideal-state does not exist");
+ }
+ baseAccessor.update(path, new DataUpdater<ZNRecord>() {
+ @Override
+ public ZNRecord update(ZNRecord currentData) {
+ if (currentData == null) {
+ throw new HelixException("Cluster: " + clusterName + ", resource: " + resourceName
+ + ", ideal-state is null");
+ }
+ IdealState idealState = new IdealState(currentData);
+ idealState.enable(enabled);
+ return idealState.getRecord();
+ }
+ }, AccessOption.PERSISTENT);
+ }
+
+ @Override
public void enablePartition(final boolean enabled, final String clusterName,
final String instanceName, final String resourceName, final List<String> partitionNames) {
String path =
PropertyPathConfig.getPath(PropertyType.CONFIGS, clusterName,
ConfigScopeProperty.PARTICIPANT.toString(), instanceName);
- ZkBaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient);
+ BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient);
// check instanceConfig exists
if (!baseAccessor.exists(path, 0)) {
@@ -299,7 +319,7 @@ public class ZKHelixAdmin implements HelixAdmin {
@Override
public void resetPartition(String clusterName, String instanceName, String resourceName,
List<String> partitionNames) {
- ZKHelixDataAccessor accessor =
+ HelixDataAccessor accessor =
new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
Builder keyBuilder = accessor.keyBuilder();
@@ -404,7 +424,7 @@ public class ZKHelixAdmin implements HelixAdmin {
@Override
public void resetInstance(String clusterName, List<String> instanceNames) {
// TODO: not mp-safe
- ZKHelixDataAccessor accessor =
+ HelixDataAccessor accessor =
new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
Builder keyBuilder = accessor.keyBuilder();
List<ExternalView> extViews = accessor.getChildValues(keyBuilder.externalViews());
@@ -430,7 +450,7 @@ public class ZKHelixAdmin implements HelixAdmin {
@Override
public void resetResource(String clusterName, List<String> resourceNames) {
// TODO: not mp-safe
- ZKHelixDataAccessor accessor =
+ HelixDataAccessor accessor =
new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
Builder keyBuilder = accessor.keyBuilder();
List<ExternalView> extViews = accessor.getChildValues(keyBuilder.externalViews());
@@ -508,7 +528,6 @@ public class ZKHelixAdmin implements HelixAdmin {
// IDEAL STATE
_zkClient.createPersistent(HelixUtil.getIdealStatePath(clusterName));
// CONFIGURATIONS
- // _zkClient.createPersistent(HelixUtil.getConfigPath(clusterName));
path =
PropertyPathConfig.getPath(PropertyType.CONFIGS, clusterName,
ConfigScopeProperty.CLUSTER.toString(), clusterName);
@@ -564,7 +583,7 @@ public class ZKHelixAdmin implements HelixAdmin {
List<String> instances = _zkClient.getChildren(memberInstancesPath);
List<String> result = new ArrayList<String>();
- ZKHelixDataAccessor accessor =
+ HelixDataAccessor accessor =
new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
Builder keyBuilder = accessor.keyBuilder();
@@ -662,7 +681,7 @@ public class ZKHelixAdmin implements HelixAdmin {
@Override
public IdealState getResourceIdealState(String clusterName, String resourceName) {
- ZKHelixDataAccessor accessor =
+ HelixDataAccessor accessor =
new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
Builder keyBuilder = accessor.keyBuilder();
@@ -671,7 +690,7 @@ public class ZKHelixAdmin implements HelixAdmin {
@Override
public void setResourceIdealState(String clusterName, String resourceName, IdealState idealState) {
- ZKHelixDataAccessor accessor =
+ HelixDataAccessor accessor =
new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
Builder keyBuilder = accessor.keyBuilder();
@@ -680,7 +699,7 @@ public class ZKHelixAdmin implements HelixAdmin {
@Override
public ExternalView getResourceExternalView(String clusterName, String resourceName) {
- ZKHelixDataAccessor accessor =
+ HelixDataAccessor accessor =
new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
Builder keyBuilder = accessor.keyBuilder();
return accessor.getProperty(keyBuilder.externalView(resourceName));
@@ -699,7 +718,7 @@ public class ZKHelixAdmin implements HelixAdmin {
throw new HelixException("State model path " + stateModelPath + " already exists.");
}
- ZKHelixDataAccessor accessor =
+ HelixDataAccessor accessor =
new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
Builder keyBuilder = accessor.keyBuilder();
accessor.setProperty(keyBuilder.stateModelDef(stateModel.getId()), stateModel);
@@ -707,7 +726,7 @@ public class ZKHelixAdmin implements HelixAdmin {
@Override
public void dropResource(String clusterName, String resourceName) {
- ZKHelixDataAccessor accessor =
+ HelixDataAccessor accessor =
new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
Builder keyBuilder = accessor.keyBuilder();
@@ -722,7 +741,7 @@ public class ZKHelixAdmin implements HelixAdmin {
@Override
public StateModelDefinition getStateModelDef(String clusterName, String stateModelName) {
- ZKHelixDataAccessor accessor =
+ HelixDataAccessor accessor =
new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
Builder keyBuilder = accessor.keyBuilder();
@@ -805,7 +824,7 @@ public class ZKHelixAdmin implements HelixAdmin {
@Override
public void dropCluster(String clusterName) {
logger.info("Deleting cluster " + clusterName);
- ZKHelixDataAccessor accessor =
+ HelixDataAccessor accessor =
new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
Builder keyBuilder = accessor.keyBuilder();
@@ -1067,15 +1086,23 @@ public class ZKHelixAdmin implements HelixAdmin {
int size = (int) file.length();
byte[] bytes = new byte[size];
- DataInputStream dis = new DataInputStream(new FileInputStream(file));
- int read = 0;
- int numRead = 0;
- while (read < bytes.length && (numRead = dis.read(bytes, read, bytes.length - read)) >= 0) {
- read = read + numRead;
+ DataInputStream dis = null;
+ try {
+ dis = new DataInputStream(new FileInputStream(file));
+ int read = 0;
+ int numRead = 0;
+ while (read < bytes.length && (numRead = dis.read(bytes, read, bytes.length - read)) >= 0) {
+ read = read + numRead;
+ }
+ return bytes;
+ } finally {
+ if (dis != null) {
+ dis.close();
+ }
}
- return bytes;
}
+ @Override
public void addStateModelDef(String clusterName, String stateModelDefName,
String stateModelDefFile) throws IOException {
ZNRecord record =
@@ -1091,7 +1118,7 @@ public class ZKHelixAdmin implements HelixAdmin {
@Override
public void setConstraint(String clusterName, final ConstraintType constraintType,
final String constraintId, final ConstraintItem constraintItem) {
- ZkBaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient);
+ BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient);
Builder keyBuilder = new Builder(clusterName);
String path = keyBuilder.constraint(constraintType.toString()).getPath();
@@ -1112,7 +1139,7 @@ public class ZKHelixAdmin implements HelixAdmin {
@Override
public void removeConstraint(String clusterName, final ConstraintType constraintType,
final String constraintId) {
- ZkBaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient);
+ BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient);
Builder keyBuilder = new Builder(clusterName);
String path = keyBuilder.constraint(constraintType.toString()).getPath();
@@ -1210,7 +1237,7 @@ public class ZKHelixAdmin implements HelixAdmin {
throw new HelixException("cluster " + clusterName + " instance " + instanceName
+ " is not setup yet");
}
- ZKHelixDataAccessor accessor =
+ HelixDataAccessor accessor =
new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
Builder keyBuilder = accessor.keyBuilder();
@@ -1238,6 +1265,7 @@ public class ZKHelixAdmin implements HelixAdmin {
accessor.setProperty(keyBuilder.instanceConfig(instanceName), config);
}
+ @Override
public void close() {
if (_zkClient != null) {
_zkClient.close();
http://git-wip-us.apache.org/repos/asf/helix/blob/c1af744a/helix-core/src/main/java/org/apache/helix/model/IdealState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/IdealState.java b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
index ed17c5b..dc42a52 100644
--- a/helix-core/src/main/java/org/apache/helix/model/IdealState.java
+++ b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
@@ -81,7 +81,8 @@ public class IdealState extends HelixProperty {
MAX_PARTITIONS_PER_INSTANCE,
INSTANCE_GROUP_TAG,
REBALANCER_CLASS_NAME,
- REBALANCER_CONFIG_NAME
+ REBALANCER_CONFIG_NAME,
+ HELIX_ENABLED
}
public static final String QUERY_LIST = "PREFERENCE_LIST_QUERYS";
@@ -821,6 +822,7 @@ public class IdealState extends HelixProperty {
}
/**
+ * <<<<<<< HEAD
* Get the non-Helix simple fields from this property and add them to a UserConfig
* @param userConfig the user config to update
*/
@@ -950,4 +952,21 @@ public class IdealState extends HelixProperty {
Map<PartitionId, Map<ParticipantId, State>> participantStateMaps) {
return ResourceAssignment.stringMapsFromReplicaMaps(participantStateMaps);
}
+
+ /**
+ * Get if the resource is enabled or not
+ * By default, it's enabled
+ * @return true if enabled; false otherwise
+ */
+ public boolean isEnabled() {
+ return _record.getBooleanField(IdealStateProperty.HELIX_ENABLED.name(), true);
+ }
+
+ /**
+ * Enable/Disable the resource
+ * @param enabled
+ */
+ public void enable(boolean enabled) {
+ _record.setSimpleField(IdealStateProperty.HELIX_ENABLED.name(), Boolean.toString(enabled));
+ }
}
http://git-wip-us.apache.org/repos/asf/helix/blob/c1af744a/helix-core/src/main/java/org/apache/helix/participant/HelixCustomCodeRunner.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/HelixCustomCodeRunner.java b/helix-core/src/main/java/org/apache/helix/participant/HelixCustomCodeRunner.java
index bddd8d1..2f169cc 100644
--- a/helix-core/src/main/java/org/apache/helix/participant/HelixCustomCodeRunner.java
+++ b/helix-core/src/main/java/org/apache/helix/participant/HelixCustomCodeRunner.java
@@ -51,7 +51,7 @@ import org.apache.log4j.Logger;
* .invoke(_callback)
* .on(ChangeType.LIVE_INSTANCE, ChangeType.IdealState)
* .usingLeaderStandbyModel("someUniqueId")
- * .run()
+ * .start()
* </code>
*/
public class HelixCustomCodeRunner {
@@ -106,6 +106,15 @@ public class HelixCustomCodeRunner {
}
/**
+ * Get resource name for the custom-code runner
+ * Used for retrieving the external view for the custom-code runner resource
+ * @return resource name for the custom-code runner
+ */
+ public String getResourceName() {
+ return _resourceName;
+ }
+
+ /**
* This method will be invoked when there is a change in any subscribed
* notificationTypes
* @throws Exception
@@ -127,8 +136,9 @@ public class HelixCustomCodeRunner {
// manually add ideal state for participant leader using LeaderStandby
// model
- zkClient = new ZkClient(_zkAddr, ZkClient.DEFAULT_CONNECTION_TIMEOUT);
- zkClient.setZkSerializer(new ZNRecordSerializer());
+ zkClient =
+ new ZkClient(_zkAddr, ZkClient.DEFAULT_SESSION_TIMEOUT,
+ ZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer());
HelixDataAccessor accessor =
new ZKHelixDataAccessor(_manager.getClusterName(), new ZkBaseDataAccessor<ZNRecord>(
zkClient));
http://git-wip-us.apache.org/repos/asf/helix/blob/c1af744a/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
index d97a853..4479f97 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
@@ -98,6 +98,8 @@ public class ClusterSetup {
public static final String addInstanceTag = "addInstanceTag";
public static final String removeInstanceTag = "removeInstanceTag";
+ public static final String enableResource = "enableResource";
+
// Query info (TBD in V2)
public static final String listClusterInfo = "listClusterInfo";
public static final String listInstanceInfo = "listInstanceInfo";
@@ -745,6 +747,11 @@ public class ClusterSetup {
dropResourceOption.setRequired(false);
dropResourceOption.setArgName("clusterName resourceName");
+ Option enableResourceOption =
+ OptionBuilder.withLongOpt(enableResource).withDescription("Enable/disable a resource")
+ .hasArgs(3).isRequired(false).withArgName("clusterName resourceName true/false")
+ .create();
+
Option rebalanceOption =
OptionBuilder.withLongOpt(rebalance).withDescription("Rebalance a resource in a cluster")
.create();
@@ -795,11 +802,11 @@ public class ClusterSetup {
partitionInfoOption.setArgName("clusterName resourceName partitionName");
Option enableInstanceOption =
- OptionBuilder.withLongOpt(enableInstance).withDescription("Enable/disable a Instance")
+ OptionBuilder.withLongOpt(enableInstance).withDescription("Enable/disable an instance")
.create();
enableInstanceOption.setArgs(3);
enableInstanceOption.setRequired(false);
- enableInstanceOption.setArgName("clusterName InstanceName true/false");
+ enableInstanceOption.setArgName("clusterName instanceName true/false");
Option enablePartitionOption =
OptionBuilder.hasArgs().withLongOpt(enablePartition)
@@ -955,6 +962,7 @@ public class ClusterSetup {
group.addOption(dropInstanceOption);
group.addOption(swapInstanceOption);
group.addOption(dropResourceOption);
+ group.addOption(enableResourceOption);
group.addOption(instanceInfoOption);
group.addOption(clusterInfoOption);
group.addOption(resourceInfoOption);
@@ -1258,6 +1266,11 @@ public class ClusterSetup {
setupTool.getClusterManagementTool().enableInstance(clusterName, instanceName, enabled);
return 0;
+ } else if (cmd.hasOption(enableResource)) {
+ String clusterName = cmd.getOptionValues(enableResource)[0];
+ String resourceName = cmd.getOptionValues(enableResource)[1];
+ boolean enabled = Boolean.parseBoolean(cmd.getOptionValues(enableResource)[2].toLowerCase());
+ setupTool.getClusterManagementTool().enableResource(clusterName, resourceName, enabled);
} else if (cmd.hasOption(enablePartition)) {
String[] args = cmd.getOptionValues(enablePartition);
http://git-wip-us.apache.org/repos/asf/helix/blob/c1af744a/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java b/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java
index 7c74035..1322b40 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java
@@ -239,7 +239,7 @@ public class TestAutoRebalanceStrategy {
Map<ParticipantId, State> assignment =
ConstraintBasedAssignment.computeAutoBestStateForPartition(upperBounds,
liveParticipantSet, _stateModelDef, preferenceList, currentStateMap,
- disabledParticipantsForPartition);
+ disabledParticipantsForPartition, true);
mapResult.put(partition, IdealState.stringMapFromParticipantStateMap(assignment));
}
return mapResult;
http://git-wip-us.apache.org/repos/asf/helix/blob/c1af744a/helix-core/src/test/java/org/apache/helix/controller/strategy/TestNewAutoRebalanceStrategy.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/strategy/TestNewAutoRebalanceStrategy.java b/helix-core/src/test/java/org/apache/helix/controller/strategy/TestNewAutoRebalanceStrategy.java
index 2c26d5d..c8ec90a 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/strategy/TestNewAutoRebalanceStrategy.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/strategy/TestNewAutoRebalanceStrategy.java
@@ -265,7 +265,7 @@ public class TestNewAutoRebalanceStrategy {
Map<ParticipantId, State> assignment =
ConstraintBasedAssignment.computeAutoBestStateForPartition(upperBounds,
liveParticipantMap.keySet(), _stateModelDef, participantPreferenceList, replicaMap,
- disabledParticipantsForPartition);
+ disabledParticipantsForPartition, true);
mapResult.put(partitionId, assignment);
}
http://git-wip-us.apache.org/repos/asf/helix/blob/c1af744a/helix-core/src/test/java/org/apache/helix/integration/TestDisableCustomCodeRunner.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestDisableCustomCodeRunner.java b/helix-core/src/test/java/org/apache/helix/integration/TestDisableCustomCodeRunner.java
new file mode 100644
index 0000000..3223e48
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestDisableCustomCodeRunner.java
@@ -0,0 +1,252 @@
+package org.apache.helix.integration;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixConstants.ChangeType;
+import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.participant.CustomCodeCallbackHandler;
+import org.apache.helix.participant.HelixCustomCodeRunner;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestDisableCustomCodeRunner extends ZkUnitTestBase {
+
+ private static final int N = 2;
+ private static final int PARTITION_NUM = 1;
+
+ class DummyCallback implements CustomCodeCallbackHandler {
+ private final Map<NotificationContext.Type, Boolean> _callbackInvokeMap =
+ new HashMap<NotificationContext.Type, Boolean>();
+
+ @Override
+ public void onCallback(NotificationContext context) {
+ NotificationContext.Type type = context.getType();
+ _callbackInvokeMap.put(type, Boolean.TRUE);
+ }
+
+ public void reset() {
+ _callbackInvokeMap.clear();
+ }
+
+ public boolean isInitTypeInvoked() {
+ return _callbackInvokeMap.containsKey(NotificationContext.Type.INIT);
+ }
+
+ public boolean isCallbackTypeInvoked() {
+ return _callbackInvokeMap.containsKey(NotificationContext.Type.CALLBACK);
+ }
+ }
+
+ @Test
+ public void test() throws Exception {
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+
+ System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+ TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+ "localhost", // participant name prefix
+ "TestDB", // resource name prefix
+ 1, // resources
+ PARTITION_NUM, // partitions per resource
+ N, // number of nodes
+ 2, // replicas
+ "MasterSlave", true); // do rebalance
+
+ ClusterControllerManager controller =
+ new ClusterControllerManager(ZK_ADDR, clusterName, "controller");
+ controller.syncStart();
+
+ // start participants
+ Map<String, MockParticipantManager> participants =
+ new HashMap<String, MockParticipantManager>();
+ Map<String, HelixCustomCodeRunner> customCodeRunners =
+ new HashMap<String, HelixCustomCodeRunner>();
+ Map<String, DummyCallback> callbacks =
+ new HashMap<String, DummyCallback>();
+ for (int i = 0; i < N; i++) {
+ String instanceName = "localhost_" + (12918 + i);
+
+ participants
+ .put(instanceName, new MockParticipantManager(ZK_ADDR, clusterName, instanceName));
+
+ customCodeRunners.put(instanceName, new HelixCustomCodeRunner(participants.get(instanceName),
+ ZK_ADDR));
+ callbacks.put(instanceName, new DummyCallback());
+
+ customCodeRunners.get(instanceName).invoke(callbacks.get(instanceName))
+ .on(ChangeType.LIVE_INSTANCE)
+ .usingLeaderStandbyModel("TestParticLeader").start();
+ participants.get(instanceName).syncStart();
+ }
+
+ boolean result =
+ ClusterStateVerifier
+ .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+ clusterName));
+ Assert.assertTrue(result);
+
+ // Make sure callback is registered
+ BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
+ final HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, baseAccessor);
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+ final String customCodeRunnerResource =
+ customCodeRunners.get("localhost_12918").getResourceName();
+ ExternalView extView = accessor.getProperty(keyBuilder.externalView(customCodeRunnerResource));
+ Map<String, String> instanceStates = extView.getStateMap(customCodeRunnerResource + "_0");
+ String leader = null;
+ for (String instance : instanceStates.keySet()) {
+ String state = instanceStates.get(instance);
+ if ("LEADER".equals(state)) {
+ leader = instance;
+ break;
+ }
+ }
+ Assert.assertNotNull(leader);
+ for (String instance : callbacks.keySet()) {
+ DummyCallback callback = callbacks.get(instance);
+ if (instance.equals(leader)) {
+ Assert.assertTrue(callback.isInitTypeInvoked());
+ } else {
+ Assert.assertFalse(callback.isInitTypeInvoked());
+ }
+ callback.reset();
+ }
+
+ // Disable custom-code runner resource
+ HelixAdmin admin = new ZKHelixAdmin(_gZkClient);
+ admin.enableResource(clusterName, customCodeRunnerResource, false);
+
+ // Verify that states of custom-code runner are all OFFLINE
+ result = TestHelper.verify(new TestHelper.Verifier() {
+
+ @Override
+ public boolean verify() throws Exception {
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+
+ ExternalView extView =
+ accessor.getProperty(keyBuilder.externalView(customCodeRunnerResource));
+ if (extView == null) {
+ return false;
+ }
+ Set<String> partitionSet = extView.getPartitionSet();
+ if (partitionSet == null || partitionSet.size() != PARTITION_NUM) {
+ return false;
+ }
+ for (String partition : partitionSet) {
+ Map<String, String> instanceStates = extView.getStateMap(partition);
+ for (String state : instanceStates.values()) {
+ if (!"OFFLINE".equals(state)) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+ }, 10 * 1000);
+ Assert.assertTrue(result);
+
+ // Change live-instance should not invoke any custom-code runner
+ LiveInstance fakeInstance = new LiveInstance("fakeInstance");
+ fakeInstance.setSessionId("fakeSessionId");
+ fakeInstance.setHelixVersion("0.6");
+ accessor.setProperty(keyBuilder.liveInstance("fakeInstance"), fakeInstance);
+ Thread.sleep(1000);
+
+ for (DummyCallback callback : callbacks.values()) {
+ Assert.assertFalse(callback.isInitTypeInvoked());
+ Assert.assertFalse(callback.isCallbackTypeInvoked());
+ }
+
+ // Remove fake instance
+ accessor.removeProperty(keyBuilder.liveInstance("fakeInstance"));
+
+ // Re-enable custom-code runner
+ admin.enableResource(clusterName, customCodeRunnerResource, true);
+ result =
+ ClusterStateVerifier
+ .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+ clusterName));
+ Assert.assertTrue(result);
+
+ // Verify that custom-invoke is invoked again
+ extView = accessor.getProperty(keyBuilder.externalView(customCodeRunnerResource));
+ instanceStates = extView.getStateMap(customCodeRunnerResource + "_0");
+ leader = null;
+ for (String instance : instanceStates.keySet()) {
+ String state = instanceStates.get(instance);
+ if ("LEADER".equals(state)) {
+ leader = instance;
+ break;
+ }
+ }
+ Assert.assertNotNull(leader);
+ for (String instance : callbacks.keySet()) {
+ DummyCallback callback = callbacks.get(instance);
+ if (instance.equals(leader)) {
+ Assert.assertTrue(callback.isInitTypeInvoked());
+ } else {
+ Assert.assertFalse(callback.isInitTypeInvoked());
+ }
+ callback.reset();
+ }
+
+ // Add a fake instance should invoke custom-code runner
+ accessor.setProperty(keyBuilder.liveInstance("fakeInstance"), fakeInstance);
+ Thread.sleep(1000);
+ for (String instance : callbacks.keySet()) {
+ DummyCallback callback = callbacks.get(instance);
+ if (instance.equals(leader)) {
+ Assert.assertTrue(callback.isCallbackTypeInvoked());
+ } else {
+ Assert.assertFalse(callback.isCallbackTypeInvoked());
+ }
+ }
+
+ // Clean up
+ controller.syncStop();
+ for (MockParticipantManager participant : participants.values()) {
+ participant.syncStop();
+ }
+
+ System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/c1af744a/helix-core/src/test/java/org/apache/helix/integration/TestDisableResource.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestDisableResource.java b/helix-core/src/test/java/org/apache/helix/integration/TestDisableResource.java
new file mode 100644
index 0000000..ea8974d
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestDisableResource.java
@@ -0,0 +1,267 @@
+package org.apache.helix.integration;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Date;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestDisableResource extends ZkUnitTestBase {
+ private static final int N = 2;
+ private static final int PARTITION_NUM = 1;
+
+ @Test
+ public void testDisableResourceInSemiAutoMode() throws Exception {
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+
+ System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+ TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+ "localhost", // participant name prefix
+ "TestDB", // resource name prefix
+ 1, // resources
+ PARTITION_NUM, // partitions per resource
+ N, // number of nodes
+ 2, // replicas
+ "MasterSlave", true); // do rebalance
+
+ ClusterControllerManager controller =
+ new ClusterControllerManager(ZK_ADDR, clusterName, "controller");
+ controller.syncStart();
+
+ // start participants
+ MockParticipantManager participants[] = new MockParticipantManager[N];
+ for (int i = 0; i < N; i++) {
+ String instanceName = "localhost_" + (12918 + i);
+
+ participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+ participants[i].syncStart();
+ }
+
+ boolean result =
+ ClusterStateVerifier
+ .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+ clusterName));
+ Assert.assertTrue(result);
+
+ // Disable TestDB0
+ enableResource(clusterName, false);
+ checkExternalView(clusterName);
+
+ // Re-enable TestDB0
+ enableResource(clusterName, true);
+ result =
+ ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(
+ ZK_ADDR, clusterName));
+ Assert.assertTrue(result);
+
+ // Clean up
+ controller.syncStop();
+ for (int i = 0; i < N; i++) {
+ participants[i].syncStop();
+ }
+ System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+ }
+
+ @Test
+ public void testDisableResourceInFullAutoMode() throws Exception {
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+
+ System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+ TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+ "localhost", // participant name prefix
+ "TestDB", // resource name prefix
+ 1, // resources
+ PARTITION_NUM, // partitions per resource
+ N, // number of nodes
+ 2, // replicas
+ "MasterSlave", RebalanceMode.FULL_AUTO, true); // do rebalance
+
+ ClusterControllerManager controller =
+ new ClusterControllerManager(ZK_ADDR, clusterName, "controller");
+ controller.syncStart();
+
+ // start participants
+ MockParticipantManager participants[] = new MockParticipantManager[N];
+ for (int i = 0; i < N; i++) {
+ String instanceName = "localhost_" + (12918 + i);
+
+ participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+ participants[i].syncStart();
+ }
+
+ boolean result =
+ ClusterStateVerifier
+ .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+ clusterName));
+ Assert.assertTrue(result);
+
+ // disable TestDB0
+ enableResource(clusterName, false);
+ checkExternalView(clusterName);
+
+ // Re-enable TestDB0
+ enableResource(clusterName, true);
+ result =
+ ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(
+ ZK_ADDR, clusterName));
+ Assert.assertTrue(result);
+
+ // Clean up
+ controller.syncStop();
+ for (int i = 0; i < N; i++) {
+ participants[i].syncStop();
+ }
+
+ System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+ }
+
+ @Test
+ public void testDisableResourceInCustomMode() throws Exception {
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+
+ System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+ TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+ "localhost", // participant name prefix
+ "TestDB", // resource name prefix
+ 1, // resources
+ PARTITION_NUM, // partitions per resource
+ N, // number of nodes
+ 2, // replicas
+ "MasterSlave", RebalanceMode.CUSTOMIZED, true); // do rebalance
+
+ // set up custom ideal-state
+ BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
+ HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, baseAccessor);
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+ IdealState idealState = accessor.getProperty(keyBuilder.idealStates("TestDB0"));
+ idealState.setPartitionState("TestDB0_0", "localhost_12918", "SLAVE");
+ idealState.setPartitionState("TestDB0_0", "localhost_12919", "SLAVE");
+ accessor.setProperty(keyBuilder.idealStates("TestDB0"), idealState);
+
+ ClusterControllerManager controller =
+ new ClusterControllerManager(ZK_ADDR, clusterName, "controller");
+ controller.syncStart();
+
+ // start participants
+ MockParticipantManager participants[] = new MockParticipantManager[N];
+ for (int i = 0; i < N; i++) {
+ String instanceName = "localhost_" + (12918 + i);
+
+ participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+ participants[i].syncStart();
+ }
+
+ boolean result =
+ ClusterStateVerifier
+ .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+ clusterName));
+ Assert.assertTrue(result);
+
+ // Disable TestDB0
+ enableResource(clusterName, false);
+ checkExternalView(clusterName);
+
+ // Re-enable TestDB0
+ enableResource(clusterName, true);
+ result =
+ ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(
+ ZK_ADDR, clusterName));
+ Assert.assertTrue(result);
+
+ // Clean up
+ controller.syncStop();
+ for (int i = 0; i < N; i++) {
+ participants[i].syncStop();
+ }
+
+ System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+ }
+
+ private void enableResource(String clusterName, boolean enabled) {
+ HelixAdmin admin = new ZKHelixAdmin(_gZkClient);
+ admin.enableResource(clusterName, "TestDB0", enabled);
+ }
+
+ /**
+ * Check all partitions are in OFFLINE state
+ * @param accessor
+ * @throws Exception
+ */
+ private void checkExternalView(String clusterName) throws Exception {
+ BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
+ final HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, baseAccessor);
+
+ // verify that states of TestDB0 are all OFFLINE
+ boolean result = TestHelper.verify(new TestHelper.Verifier() {
+
+ @Override
+ public boolean verify() throws Exception {
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+
+ ExternalView extView = accessor.getProperty(keyBuilder.externalView("TestDB0"));
+ if (extView == null) {
+ return false;
+ }
+ Set<String> partitionSet = extView.getPartitionSet();
+ if (partitionSet == null || partitionSet.size() != PARTITION_NUM) {
+ return false;
+ }
+ for (String partition : partitionSet) {
+ Map<String, String> instanceStates = extView.getStateMap(partition);
+ for (String state : instanceStates.values()) {
+ if (!"OFFLINE".equals(state)) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+ }, 10 * 1000);
+ Assert.assertTrue(result);
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/c1af744a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java
index 89a947f..d3925be 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java
@@ -25,6 +25,8 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixException;
import org.apache.helix.PropertyKey;
@@ -41,6 +43,7 @@ import org.apache.helix.model.ConstraintItem;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.HelixConfigScope;
import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
+import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.StateModelDefinition;
import org.apache.helix.model.builder.ConstraintItemBuilder;
@@ -62,7 +65,7 @@ public class TestZkHelixAdmin extends ZkUnitTestBase {
_gZkClient.deleteRecursive(rootPath);
}
- ZKHelixAdmin tool = new ZKHelixAdmin(_gZkClient);
+ HelixAdmin tool = new ZKHelixAdmin(_gZkClient);
tool.addCluster(clusterName, true);
Assert.assertTrue(ZKUtil.isClusterSetup(clusterName, _gZkClient));
tool.addCluster(clusterName, true);
@@ -205,7 +208,7 @@ public class TestZkHelixAdmin extends ZkUnitTestBase {
System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
- ZKHelixAdmin tool = new ZKHelixAdmin(_gZkClient);
+ HelixAdmin tool = new ZKHelixAdmin(_gZkClient);
tool.addCluster(clusterName, true);
Assert.assertTrue(ZKUtil.isClusterSetup(clusterName, _gZkClient), "Cluster should be setup");
@@ -241,7 +244,7 @@ public class TestZkHelixAdmin extends ZkUnitTestBase {
System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
- ZKHelixAdmin tool = new ZKHelixAdmin(_gZkClient);
+ HelixAdmin tool = new ZKHelixAdmin(_gZkClient);
tool.addCluster(clusterName, true);
Assert.assertTrue(ZKUtil.isClusterSetup(clusterName, _gZkClient), "Cluster should be setup");
@@ -294,4 +297,29 @@ public class TestZkHelixAdmin extends ZkUnitTestBase {
System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
}
+
+ @Test
+ public void testDisableResource() {
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+ System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+ HelixAdmin admin = new ZKHelixAdmin(_gZkClient);
+ admin.addCluster(clusterName, true);
+ Assert.assertTrue(ZKUtil.isClusterSetup(clusterName, _gZkClient), "Cluster should be setup");
+ String resourceName = "TestDB";
+ admin.addStateModelDef(clusterName, "MasterSlave", new StateModelDefinition(
+ StateModelConfigGenerator.generateConfigForMasterSlave()));
+ admin.addResource(clusterName, resourceName, 4, "MasterSlave");
+ admin.enableResource(clusterName, resourceName, false);
+ BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
+ HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, baseAccessor);
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+ IdealState idealState = accessor.getProperty(keyBuilder.idealStates(resourceName));
+ Assert.assertFalse(idealState.isEnabled());
+ admin.enableResource(clusterName, resourceName, true);
+ idealState = accessor.getProperty(keyBuilder.idealStates(resourceName));
+ Assert.assertTrue(idealState.isEnabled());
+ System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+ }
}
http://git-wip-us.apache.org/repos/asf/helix/blob/c1af744a/helix-core/src/test/java/org/apache/helix/tools/TestClusterSetup.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/tools/TestClusterSetup.java b/helix-core/src/test/java/org/apache/helix/tools/TestClusterSetup.java
index d3d6736..a528a20 100644
--- a/helix-core/src/test/java/org/apache/helix/tools/TestClusterSetup.java
+++ b/helix-core/src/test/java/org/apache/helix/tools/TestClusterSetup.java
@@ -23,7 +23,10 @@ import java.io.IOException;
import java.util.Arrays;
import java.util.Date;
+import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixException;
+import org.apache.helix.PropertyKey;
import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.PropertyPathConfig;
import org.apache.helix.PropertyType;
@@ -36,8 +39,8 @@ import org.apache.helix.manager.zk.ZNRecordSerializer;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
import org.apache.helix.manager.zk.ZkClient;
import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
+import org.apache.helix.model.IdealState;
import org.apache.helix.model.LiveInstance;
-import org.apache.log4j.Logger;
import org.testng.Assert;
import org.testng.AssertJUnit;
import org.testng.annotations.AfterClass;
@@ -46,8 +49,6 @@ import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
public class TestClusterSetup extends ZkUnitTestBase {
- private static Logger LOG = Logger.getLogger(TestClusterSetup.class);
-
protected static final String CLUSTER_NAME = "TestClusterSetup";
protected static final String TEST_DB = "TestDB";
protected static final String INSTANCE_PREFIX = "instance_";
@@ -437,4 +438,36 @@ public class TestClusterSetup extends ZkUnitTestBase {
}
+ @Test
+ public void testDisableResource() throws Exception {
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+ System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+ TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+ "localhost", // participant name prefix
+ "TestDB", // resource name prefix
+ 1, // resources
+ 10, // partitions per resource
+ 5, // number of nodes
+ 3, // replicas
+ "MasterSlave", true); // do rebalance
+ // disable "TestDB0" resource
+ ClusterSetup.processCommandLineArgs(new String[] {
+ "--zkSvr", ZK_ADDR, "--enableResource", clusterName, "TestDB0", "false"
+ });
+ BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
+ HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, baseAccessor);
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+ IdealState idealState = accessor.getProperty(keyBuilder.idealStates("TestDB0"));
+ Assert.assertFalse(idealState.isEnabled());
+ // enable "TestDB0" resource
+ ClusterSetup.processCommandLineArgs(new String[] {
+ "--zkSvr", ZK_ADDR, "--enableResource", clusterName, "TestDB0", "true"
+ });
+ idealState = accessor.getProperty(keyBuilder.idealStates("TestDB0"));
+ Assert.assertTrue(idealState.isEnabled());
+ System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+ }
+
}
[7/7] git commit: [HELIX-461] Add partitions without top state metric
Posted by ka...@apache.org.
[HELIX-461] Add partitions without top state metric
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/6d30c9c5
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/6d30c9c5
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/6d30c9c5
Branch: refs/heads/master
Commit: 6d30c9c583b4b8aa1a74244269fa0f9f8ec87a40
Parents: c1af744
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Wed Jun 25 10:00:13 2014 -0700
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Fri Jul 11 10:10:17 2014 -0700
----------------------------------------------------------------------
.gitignore | 2 ++
.../stages/ExternalViewComputeStage.java | 5 ++++-
.../monitoring/mbeans/ClusterStatusMonitor.java | 15 ++++++++++++---
.../helix/monitoring/mbeans/ResourceMonitor.java | 17 ++++++++++++++++-
.../monitoring/mbeans/ResourceMonitorMBean.java | 2 ++
.../monitoring/mbeans/TestResourceMonitor.java | 6 +++---
6 files changed, 39 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/6d30c9c5/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index 2eb84c1..2135638 100644
--- a/.gitignore
+++ b/.gitignore
@@ -24,6 +24,8 @@ target/
/recipes/rabbitmq-consumer-group/target/
.idea
*.iml
+*.ipr
+*.iws
.settings/
out/
.DS_Store
http://git-wip-us.apache.org/repos/asf/helix/blob/6d30c9c5/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
index 3086a83..8c6b008 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
@@ -50,6 +50,7 @@ import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.Message;
import org.apache.helix.model.Message.MessageType;
+import org.apache.helix.model.StateModelDefinition;
import org.apache.helix.model.StatusUpdate;
import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
import org.apache.log4j.Logger;
@@ -124,8 +125,10 @@ public class ExternalViewComputeStage extends AbstractBaseStage {
if (clusterStatusMonitor != null
&& !idealState.getStateModelDefRef().equalsIgnoreCase(
DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE)) {
+ StateModelDefinition stateModelDef =
+ cache.getStateModelDef(idealState.getStateModelDefRef());
clusterStatusMonitor.setResourceStatus(view,
- cache._idealStateMap.get(view.getResourceName()));
+ cache._idealStateMap.get(view.getResourceName()), stateModelDef);
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/6d30c9c5/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
index 99dee75..1cce342 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
@@ -24,6 +24,7 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
@@ -315,21 +316,29 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
}
}
- public void setResourceStatus(ExternalView externalView, IdealState idealState) {
+ public void setResourceStatus(ExternalView externalView, IdealState idealState,
+ StateModelDefinition stateModelDef) {
+ String topState = null;
+ if (stateModelDef != null) {
+ List<String> priorityList = stateModelDef.getStatesPriorityList();
+ if (!priorityList.isEmpty()) {
+ topState = priorityList.get(0);
+ }
+ }
try {
String resourceName = externalView.getId();
if (!_resourceMbeanMap.containsKey(resourceName)) {
synchronized (this) {
if (!_resourceMbeanMap.containsKey(resourceName)) {
ResourceMonitor bean = new ResourceMonitor(_clusterName, resourceName);
- bean.updateResource(externalView, idealState);
+ bean.updateResource(externalView, idealState, topState);
registerResources(Arrays.asList(bean));
}
}
}
ResourceMonitor bean = _resourceMbeanMap.get(resourceName);
String oldSensorName = bean.getSensorName();
- bean.updateResource(externalView, idealState);
+ bean.updateResource(externalView, idealState, topState);
String newSensorName = bean.getSensorName();
if (!oldSensorName.equals(newSensorName)) {
unregisterResources(Arrays.asList(resourceName));
http://git-wip-us.apache.org/repos/asf/helix/blob/6d30c9c5/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
index 4739fab..ae936b8 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
@@ -21,6 +21,7 @@ package org.apache.helix.monitoring.mbeans;
import java.util.Collections;
import java.util.Map;
+import java.util.Set;
import org.apache.helix.HelixDefinedState;
import org.apache.helix.api.State;
@@ -30,12 +31,15 @@ import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.log4j.Logger;
+import com.google.common.collect.Sets;
+
public class ResourceMonitor implements ResourceMonitorMBean {
private static final Logger LOG = Logger.getLogger(ResourceMonitor.class);
private int _numOfPartitions;
private int _numOfPartitionsInExternalView;
private int _numOfErrorPartitions;
+ private int _numNonTopStatePartitions;
private int _externalViewIdealStateDiff;
private String _tag = ClusterStatusMonitor.DEFAULT_TAG;
private String _resourceName;
@@ -57,6 +61,11 @@ public class ResourceMonitor implements ResourceMonitorMBean {
}
@Override
+ public long getMissingTopStatePartitionGauge() {
+ return _numNonTopStatePartitions;
+ }
+
+ @Override
public long getDifferenceWithIdealStateGauge() {
return _externalViewIdealStateDiff;
}
@@ -71,7 +80,7 @@ public class ResourceMonitor implements ResourceMonitorMBean {
return _resourceName;
}
- public void updateResource(ExternalView externalView, IdealState idealState) {
+ public void updateResource(ExternalView externalView, IdealState idealState, String topState) {
if (externalView == null) {
LOG.warn("external view is null");
return;
@@ -81,6 +90,7 @@ public class ResourceMonitor implements ResourceMonitorMBean {
if (idealState == null) {
LOG.warn("ideal state is null for " + resourceName);
_numOfErrorPartitions = 0;
+ _numNonTopStatePartitions = 0;
_externalViewIdealStateDiff = 0;
_numOfPartitionsInExternalView = 0;
return;
@@ -90,6 +100,7 @@ public class ResourceMonitor implements ResourceMonitorMBean {
int numOfErrorPartitions = 0;
int numOfDiff = 0;
+ Set<PartitionId> topStatePartitions = Sets.newHashSet();
if (_numOfPartitions == 0) {
_numOfPartitions = idealState.getRecord().getMapFields().size();
@@ -120,11 +131,15 @@ public class ResourceMonitor implements ResourceMonitorMBean {
.equalsIgnoreCase(HelixDefinedState.ERROR.toString())) {
numOfErrorPartitions++;
}
+ if (topState != null && externalViewRecord.get(host).toString().equalsIgnoreCase(topState)) {
+ topStatePartitions.add(partitionId);
+ }
}
}
_numOfErrorPartitions = numOfErrorPartitions;
_externalViewIdealStateDiff = numOfDiff;
_numOfPartitionsInExternalView = externalView.getPartitionIdSet().size();
+ _numNonTopStatePartitions = _numOfPartitions - topStatePartitions.size();
String tag = idealState.getInstanceGroupTag();
if (tag == null || tag.equals("") || tag.equals("null")) {
_tag = ClusterStatusMonitor.DEFAULT_TAG;
http://git-wip-us.apache.org/repos/asf/helix/blob/6d30c9c5/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitorMBean.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitorMBean.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitorMBean.java
index 33b001d..da1e4f8 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitorMBean.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitorMBean.java
@@ -26,6 +26,8 @@ public interface ResourceMonitorMBean extends SensorNameProvider {
public long getErrorPartitionGauge();
+ public long getMissingTopStatePartitionGauge();
+
public long getDifferenceWithIdealStateGauge();
public long getExternalViewPartitionGauge();
http://git-wip-us.apache.org/repos/asf/helix/blob/6d30c9c5/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestResourceMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestResourceMonitor.java b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestResourceMonitor.java
index dcca755..caefa8a 100644
--- a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestResourceMonitor.java
+++ b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestResourceMonitor.java
@@ -121,7 +121,7 @@ public class TestResourceMonitor {
IdealState idealState = new IdealState(idealStateRecord);
ExternalView externalView = new ExternalView(idealStateRecord);
- monitor.updateResource(externalView, idealState);
+ monitor.updateResource(externalView, idealState, "MASTER");
Assert.assertEquals(monitor.getDifferenceWithIdealStateGauge(), 0);
Assert.assertEquals(monitor.getErrorPartitionGauge(), 0);
@@ -137,7 +137,7 @@ public class TestResourceMonitor {
externalView.setStateMap(_dbName + "_" + 3 * i, map);
}
- monitor.updateResource(externalView, idealState);
+ monitor.updateResource(externalView, idealState, "MASTER");
Assert.assertEquals(monitor.getDifferenceWithIdealStateGauge(), 0);
Assert.assertEquals(monitor.getErrorPartitionGauge(), m);
Assert.assertEquals(monitor.getExternalViewPartitionGauge(), _partitions);
@@ -147,7 +147,7 @@ public class TestResourceMonitor {
externalView.getRecord().getMapFields().remove(_dbName + "_" + 4 * i);
}
- monitor.updateResource(externalView, idealState);
+ monitor.updateResource(externalView, idealState, "MASTER");
Assert.assertEquals(monitor.getDifferenceWithIdealStateGauge(), n * (_replicas + 1));
Assert.assertEquals(monitor.getErrorPartitionGauge(), 3);
Assert.assertEquals(monitor.getExternalViewPartitionGauge(), _partitions - n);
[3/7] [HELIX-446] Remove ZkPropertyTransfer and restlet dependency
from helix-core
Posted by ka...@apache.org.
http://git-wip-us.apache.org/repos/asf/helix/blob/85277291/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java b/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java
index 00537a4..08af37b 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java
@@ -109,11 +109,11 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
Set<String> watchPaths = watchers.get("0x" + participantManagerToExpire.getSessionId());
// System.out.println("participant watch paths: " + watchPaths);
- // participant should have 2 zk-watchers: 1 for MESSAGE and 1 for CONTROLLER
- return watchPaths.size() == 2;
+ // participant should have 1 zk-watcher: 1 for MESSAGE
+ return watchPaths.size() == 1;
}
}, 500);
- Assert.assertTrue(result, "Participant should have 2 zk-watchers.");
+ Assert.assertTrue(result, "Participant should have 1 zk-watcher. MESSAGES->HelixTaskExecutor");
// check HelixManager#_handlers
// printHandlers(controllerManager);
@@ -122,8 +122,8 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
int particHandlerNb = participantManagerToExpire.getHandlers().size();
Assert.assertEquals(controllerHandlerNb, 9,
"HelixController should have 9 (5+2n) callback handlers for 2 (n) participant");
- Assert.assertEquals(particHandlerNb, 2,
- "HelixParticipant should have 2 (msg+cur-state) callback handlers");
+ Assert.assertEquals(particHandlerNb, 1,
+ "HelixParticipant should have 1 (msg->HelixTaskExecutor) callback handlers");
// expire the session of participant
System.out.println("Expiring participant session...");
@@ -164,11 +164,11 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
Set<String> watchPaths = watchers.get("0x" + participantManagerToExpire.getSessionId());
// System.out.println("participant watch paths after session expiry: " + watchPaths);
- // participant should have 2 zk-watchers: 1 for MESSAGE and 1 for CONTROLLER
- return watchPaths.size() == 2;
+ // participant should have 1 zk-watcher: 1 for MESSAGE
+ return watchPaths.size() == 1;
}
}, 500);
- Assert.assertTrue(result, "Participant should have 2 zk-watchers after session expiry.");
+ Assert.assertTrue(result, "Participant should have 1 zk-watcher after session expiry.");
// check handlers
// printHandlers(controllerManager);
@@ -249,8 +249,8 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
Assert.assertEquals(controllerHandlerNb, 9,
"HelixController should have 9 (5+2n) callback handlers for 2 participant, but was "
+ controllerHandlerNb + ", " + printHandlers(controller));
- Assert.assertEquals(particHandlerNb, 2,
- "HelixParticipant should have 2 (msg+cur-state) callback handlers, but was "
+ Assert.assertEquals(particHandlerNb, 1,
+ "HelixParticipant should have 1 (msg->HelixTaskExecutor) callback handler, but was "
+ particHandlerNb + ", " + printHandlers(participantManager));
// expire controller
@@ -292,11 +292,11 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
Set<String> watchPaths = watchers.get("0x" + participantManager.getSessionId());
// System.out.println("participant watch paths after session expiry: " + watchPaths);
- // participant should have 2 zk-watchers: 1 for MESSAGE and 1 for CONTROLLER
- return watchPaths.size() == 2;
+ // participant should have 1 zk-watcher: 1 for MESSAGE
+ return watchPaths.size() == 1;
}
}, 500);
- Assert.assertTrue(result, "Participant should have 2 zk-watchers after session expiry.");
+ Assert.assertTrue(result, "Participant should have 1 zk-watcher after session expiry.");
// check HelixManager#_handlers
// printHandlers(controllerManager);
@@ -370,8 +370,8 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
PropertyKey.Builder keyBuilder = new PropertyKey.Builder(clusterName);
// check manager#hanlders
- Assert.assertEquals(participantToExpire.getHandlers().size(), 3,
- "Should have 3 handlers: CURRENTSTATE/{sessionId}, CONTROLLER, and MESSAGES");
+ Assert.assertEquals(participantToExpire.getHandlers().size(), 2,
+ "Should have 2 handlers: CURRENTSTATE/{sessionId}, and MESSAGES");
// check zkclient#listeners
Map<String, Set<IZkDataListener>> dataListeners =
@@ -387,10 +387,8 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
Assert.assertEquals(dataListeners.get(path).size(), 1, "Should have 1 data-listeners on path: "
+ path);
Assert
- .assertEquals(
- childListeners.size(),
- 3,
- "Should have 3 paths (CURRENTSTATE/{sessionId}, CONTROLLER, and MESSAGES) each of which has 1 child-listener");
+ .assertEquals(childListeners.size(), 2,
+ "Should have 2 paths (CURRENTSTATE/{sessionId}, and MESSAGES) each of which has 1 child-listener");
path = keyBuilder.currentStates(participantToExpire.getInstanceName(), oldSessionId).getPath();
Assert.assertEquals(childListeners.get(path).size(), 1,
"Should have 1 child-listener on path: " + path);
@@ -398,20 +396,17 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
Assert.assertEquals(childListeners.get(path).size(), 1,
"Should have 1 child-listener on path: " + path);
path = keyBuilder.controller().getPath();
- Assert.assertEquals(childListeners.get(path).size(), 1,
- "Should have 1 child-listener on path: " + path);
+ Assert.assertNull(childListeners.get(path), "Should have no child-listener on path: " + path);
// check zookeeper#watches on client side
Map<String, List<String>> watchPaths =
ZkTestHelper.getZkWatch(participantToExpire.getZkClient());
// System.out.println("localhost_12918 zk-client side watchPaths: " + watchPaths + "\n");
Assert
- .assertEquals(
- watchPaths.get("dataWatches").size(),
- 4,
- "Should have 4 data-watches: CURRENTSTATE/{sessionId}, CURRENTSTATE/{sessionId}/TestDB, CONTROLLER, MESSAGES");
- Assert.assertEquals(watchPaths.get("childWatches").size(), 3,
- "Should have 3 child-watches: CONTROLLER, MESSAGES, and CURRENTSTATE/{sessionId}");
+ .assertEquals(watchPaths.get("dataWatches").size(), 3,
+ "Should have 3 data-watches: CURRENTSTATE/{sessionId}, CURRENTSTATE/{sessionId}/TestDB, MESSAGES");
+ Assert.assertEquals(watchPaths.get("childWatches").size(), 2,
+ "Should have 2 child-watches: MESSAGES, and CURRENTSTATE/{sessionId}");
// expire localhost_12918
System.out.println("Expire participant: " + participantToExpire.getInstanceName()
@@ -430,8 +425,8 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
Assert
.assertEquals(
participantToExpire.getHandlers().size(),
- 2,
- "Should have 2 handlers: CONTROLLER and MESSAGES. CURRENTSTATE/{sessionId} handler should be removed by CallbackHandler#handleChildChange()");
+ 1,
+ "Should have 1 handlers: MESSAGES. CURRENTSTATE/{sessionId} handler should be removed by CallbackHandler#handleChildChange()");
// check zkclient#listeners
dataListeners = ZkTestHelper.getZkDataListener(participantToExpire.getZkClient());
@@ -441,8 +436,8 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
Assert
.assertEquals(
childListeners.size(),
- 3,
- "Should have 3 paths (CURRENTSTATE/{oldSessionId}, CONTROLLER, and MESSAGES). "
+ 2,
+ "Should have 2 paths (CURRENTSTATE/{oldSessionId}, and MESSAGES). "
+ "CONTROLLER and MESSAGE has 1 child-listener each. CURRENTSTATE/{oldSessionId} doesn't have listener (ZkClient doesn't remove empty childListener set. probably a ZkClient bug. see ZkClient#unsubscribeChildChange())");
path = keyBuilder.currentStates(participantToExpire.getInstanceName(), oldSessionId).getPath();
Assert.assertEquals(childListeners.get(path).size(), 0,
@@ -451,16 +446,16 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
Assert.assertEquals(childListeners.get(path).size(), 1,
"Should have 1 child-listener on path: " + path);
path = keyBuilder.controller().getPath();
- Assert.assertEquals(childListeners.get(path).size(), 1,
- "Should have 1 child-listener on path: " + path);
+ Assert.assertNull(childListeners.get(path),
+ "Should have no child-listener on path: " + path);
// check zookeeper#watches on client side
watchPaths = ZkTestHelper.getZkWatch(participantToExpire.getZkClient());
// System.out.println("localhost_12918 zk-client side watchPaths: " + watchPaths + "\n");
- Assert.assertEquals(watchPaths.get("dataWatches").size(), 2,
- "Should have 2 data-watches: CONTROLLER and MESSAGES");
- Assert.assertEquals(watchPaths.get("childWatches").size(), 2,
- "Should have 2 child-watches: CONTROLLER and MESSAGES");
+ Assert.assertEquals(watchPaths.get("dataWatches").size(), 1,
+ "Should have 1 data-watches: MESSAGES");
+ Assert.assertEquals(watchPaths.get("childWatches").size(), 1,
+ "Should have 1 child-watches: MESSAGES");
Assert
.assertEquals(watchPaths.get("existWatches").size(), 2,
"Should have 2 exist-watches: CURRENTSTATE/{oldSessionId} and CURRENTSTATE/{oldSessionId}/TestDB0");
@@ -479,10 +474,10 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
// check zookeeper#watches on client side
watchPaths = ZkTestHelper.getZkWatch(participantToExpire.getZkClient());
// System.out.println("localhost_12918 zk-client side watchPaths: " + watchPaths + "\n");
- Assert.assertEquals(watchPaths.get("dataWatches").size(), 2,
- "Should have 2 data-watches: CONTROLLER and MESSAGES");
- Assert.assertEquals(watchPaths.get("childWatches").size(), 2,
- "Should have 2 child-watches: CONTROLLER and MESSAGES");
+ Assert.assertEquals(watchPaths.get("dataWatches").size(), 1,
+ "Should have 1 data-watches: MESSAGES");
+ Assert.assertEquals(watchPaths.get("childWatches").size(), 1,
+ "Should have 1 child-watches: MESSAGES");
Assert
.assertEquals(
watchPaths.get("existWatches").size(),
http://git-wip-us.apache.org/repos/asf/helix/blob/85277291/helix-core/src/test/java/org/apache/helix/integration/ZkStandAloneCMTestBaseWithPropertyServerCheck.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/ZkStandAloneCMTestBaseWithPropertyServerCheck.java b/helix-core/src/test/java/org/apache/helix/integration/ZkStandAloneCMTestBaseWithPropertyServerCheck.java
deleted file mode 100644
index e065ee3..0000000
--- a/helix-core/src/test/java/org/apache/helix/integration/ZkStandAloneCMTestBaseWithPropertyServerCheck.java
+++ /dev/null
@@ -1,88 +0,0 @@
-package org.apache.helix.integration;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.List;
-
-import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.PropertyKey.Builder;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.controller.restlet.ZKPropertyTransferServer;
-import org.apache.helix.controller.restlet.ZkPropertyTransferClient;
-import org.apache.helix.manager.zk.ZKHelixDataAccessor;
-import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.model.StatusUpdate;
-import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-
-/**
- * setup a storage cluster and start a zk-based cluster controller in stand-alone mode
- * start 5 dummy participants verify the current states at end
- */
-
-public class ZkStandAloneCMTestBaseWithPropertyServerCheck extends ZkStandAloneCMTestBase {
- @Override
- @BeforeClass
- public void beforeClass() throws Exception {
- ZKPropertyTransferServer.PERIOD = 500;
- ZkPropertyTransferClient.SEND_PERIOD = 500;
- ZKPropertyTransferServer.getInstance().init(19999, ZK_ADDR);
- super.beforeClass();
-
- Thread.sleep(1000);
- HelixDataAccessor accessor =
- new ZKHelixDataAccessor(CLUSTER_NAME, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
- Builder kb = accessor.keyBuilder();
-
- for (int i = 0; i < NODE_NR; i++) {
- String instanceName = _participants[i].getInstanceName();
- List<StatusUpdate> statusUpdates =
- accessor.getChildValues(kb.stateTransitionStatus(instanceName,
- _participants[i].getSessionId(), TEST_DB));
-
- for (int j = 0; j < 10; j++) {
- statusUpdates =
- accessor.getChildValues(kb.stateTransitionStatus(instanceName,
- _participants[i].getSessionId(), TEST_DB));
- if (statusUpdates.size() == 0) {
- Thread.sleep(500);
- } else {
- break;
- }
- }
- Assert.assertTrue(statusUpdates.size() > 0);
- for (StatusUpdate update : statusUpdates) {
- Assert.assertTrue(update.getRecord()
- .getSimpleField(ZkPropertyTransferClient.USE_PROPERTYTRANSFER).equals("true"));
- Assert
- .assertTrue(update.getRecord().getSimpleField(ZKPropertyTransferServer.SERVER) != null);
- }
- }
- }
-
- @Override
- @AfterClass
- public void afterClass() throws Exception {
- super.afterClass();
- ZKPropertyTransferServer.getInstance().shutdown();
- ZKPropertyTransferServer.getInstance().reset();
- }
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/85277291/helix-core/src/test/java/org/apache/helix/integration/manager/TestConsecutiveZkSessionExpiry.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/TestConsecutiveZkSessionExpiry.java b/helix-core/src/test/java/org/apache/helix/integration/manager/TestConsecutiveZkSessionExpiry.java
index 8bff2fb..69448dc 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/manager/TestConsecutiveZkSessionExpiry.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/manager/TestConsecutiveZkSessionExpiry.java
@@ -243,8 +243,8 @@ public class TestConsecutiveZkSessionExpiry extends ZkUnitTestBase {
Assert
.assertEquals(
handlers.size(),
- 2,
- "Distributed controller should have 2 handlers (message and data-accessor) after lose leadership, but was "
+ 1,
+ "Distributed controller should have 1 handler (message) after lose leadership, but was "
+ handlers.size());
// clean up
http://git-wip-us.apache.org/repos/asf/helix/blob/85277291/helix-core/src/test/java/org/apache/helix/integration/manager/TestDistributedControllerManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/TestDistributedControllerManager.java b/helix-core/src/test/java/org/apache/helix/integration/manager/TestDistributedControllerManager.java
index aa00a8d..f797d1b 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/manager/TestDistributedControllerManager.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/manager/TestDistributedControllerManager.java
@@ -146,8 +146,8 @@ public class TestDistributedControllerManager extends ZkIntegrationTestBase {
Assert
.assertEquals(
handlers.size(),
- 2,
- "Distributed controller should have 2 handlers (message and data-accessor) after lose leadership, but was "
+ 1,
+ "Distributed controller should have 1 handler (message) after lose leadership, but was "
+ handlers.size());
}
http://git-wip-us.apache.org/repos/asf/helix/blob/85277291/helix-core/src/test/java/org/apache/helix/integration/manager/TestZkCallbackHandlerLeak.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/TestZkCallbackHandlerLeak.java b/helix-core/src/test/java/org/apache/helix/integration/manager/TestZkCallbackHandlerLeak.java
index 59a3fd9..9b3c32f 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/manager/TestZkCallbackHandlerLeak.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/manager/TestZkCallbackHandlerLeak.java
@@ -107,11 +107,11 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
Set<String> watchPaths = watchers.get("0x" + participantManagerToExpire.getSessionId());
LOG.debug("participant watch paths: " + watchPaths);
- // participant should have 2 zk-watchers: 1 for MESSAGE and 1 for CONTROLLER
- return watchPaths.size() == 2;
+ // participant should have 1 zk-watcher: 1 for MESSAGE
+ return watchPaths.size() == 1;
}
}, 500);
- Assert.assertTrue(result, "Participant should have 2 zk-watchers.");
+ Assert.assertTrue(result, "Participant should have 1 zk-watcher.");
// check HelixManager#_handlers
// printHandlers(controllerManager);
@@ -120,8 +120,8 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
int particHandlerNb = participantManagerToExpire.getHandlers().size();
Assert.assertEquals(controllerHandlerNb, (5 + 2 * n),
"HelixController should have 9 (5+2n) callback handlers for 2 (n) participant");
- Assert.assertEquals(particHandlerNb, 2,
- "HelixParticipant should have 2 (msg+cur-state) callback handlers");
+ Assert.assertEquals(particHandlerNb, 1,
+ "HelixParticipant should have 1 (msg->HelixTaskExecutor) callback handlers");
// expire the session of participant
LOG.debug("Expiring participant session...");
@@ -162,11 +162,11 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
Set<String> watchPaths = watchers.get("0x" + participantManagerToExpire.getSessionId());
LOG.debug("participant watch paths after session expiry: " + watchPaths);
- // participant should have 2 zk-watchers: 1 for MESSAGE and 1 for CONTROLLER
- return watchPaths.size() == 2;
+ // participant should have 1 zk-watcher: 1 for MESSAGE
+ return watchPaths.size() == 1;
}
}, 500);
- Assert.assertTrue(result, "Participant should have 2 zk-watchers after session expiry.");
+ Assert.assertTrue(result, "Participant should have 1 zk-watcher after session expiry.");
// check handlers
// printHandlers(controllerManager);
@@ -239,8 +239,8 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
Assert.assertEquals(controllerHandlerNb, (5 + 2 * n),
"HelixController should have 9 (5+2n) callback handlers for 2 participant, but was "
+ controllerHandlerNb + ", " + TestHelper.printHandlers(controller));
- Assert.assertEquals(particHandlerNb, 2,
- "HelixParticipant should have 2 (msg+cur-state) callback handlers, but was "
+ Assert.assertEquals(particHandlerNb, 1,
+ "HelixParticipant should have 1 (msg->HelixTaskExecutor) callback handler, but was "
+ particHandlerNb + ", " + TestHelper.printHandlers(participantManager));
// expire controller
@@ -283,11 +283,11 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
Set<String> watchPaths = watchers.get("0x" + participantManager.getSessionId());
LOG.debug("participant watch paths after session expiry: " + watchPaths);
- // participant should have 2 zk-watchers: 1 for MESSAGE and 1 for CONTROLLER
- return watchPaths.size() == 2;
+ // participant should have 1 zk-watcher: 1 for MESSAGE
+ return watchPaths.size() == 1;
}
}, 500);
- Assert.assertTrue(result, "Participant should have 2 zk-watchers after session expiry.");
+ Assert.assertTrue(result, "Participant should have 1 zk-watcher after session expiry.");
// check HelixManager#_handlers
// printHandlers(controllerManager);
@@ -354,8 +354,8 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
PropertyKey.Builder keyBuilder = new PropertyKey.Builder(clusterName);
// check manager#hanlders
- Assert.assertEquals(participantToExpire.getHandlers().size(), 3,
- "Should have 3 handlers: CURRENTSTATE/{sessionId}, CONTROLLER, and MESSAGES");
+ Assert.assertEquals(participantToExpire.getHandlers().size(), 2,
+ "Should have 2 handlers: CURRENTSTATE/{sessionId}, and MESSAGES");
// check zkclient#listeners
Map<String, Set<IZkDataListener>> dataListeners =
@@ -373,8 +373,8 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
Assert
.assertEquals(
childListeners.size(),
- 3,
- "Should have 3 paths (CURRENTSTATE/{sessionId}, CONTROLLER, and MESSAGES) each of which has 1 child-listener");
+ 2,
+ "Should have 2 paths (CURRENTSTATE/{sessionId}, and MESSAGES) each of which has 1 child-listener");
path = keyBuilder.currentStates(participantToExpire.getInstanceName(), oldSessionId).getPath();
Assert.assertEquals(childListeners.get(path).size(), 1,
"Should have 1 child-listener on path: " + path);
@@ -382,8 +382,8 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
Assert.assertEquals(childListeners.get(path).size(), 1,
"Should have 1 child-listener on path: " + path);
path = keyBuilder.controller().getPath();
- Assert.assertEquals(childListeners.get(path).size(), 1,
- "Should have 1 child-listener on path: " + path);
+ Assert.assertNull(childListeners.get(path),
+ "Should have no child-listener on path: " + path);
// check zookeeper#watches on client side
Map<String, List<String>> watchPaths =
@@ -392,10 +392,10 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
Assert
.assertEquals(
watchPaths.get("dataWatches").size(),
- 4,
- "Should have 4 data-watches: CURRENTSTATE/{sessionId}, CURRENTSTATE/{sessionId}/TestDB, CONTROLLER, MESSAGES");
- Assert.assertEquals(watchPaths.get("childWatches").size(), 3,
- "Should have 3 child-watches: CONTROLLER, MESSAGES, and CURRENTSTATE/{sessionId}");
+ 3,
+ "Should have 3 data-watches: CURRENTSTATE/{sessionId}, CURRENTSTATE/{sessionId}/TestDB, MESSAGES");
+ Assert.assertEquals(watchPaths.get("childWatches").size(), 2,
+ "Should have 2 child-watches: MESSAGES, and CURRENTSTATE/{sessionId}");
// expire localhost_12918
System.out.println("Expire participant: " + participantToExpire.getInstanceName()
@@ -414,8 +414,8 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
Assert
.assertEquals(
participantToExpire.getHandlers().size(),
- 2,
- "Should have 2 handlers: CONTROLLER and MESSAGES. CURRENTSTATE/{sessionId} handler should be removed by CallbackHandler#handleChildChange()");
+ 1,
+ "Should have 1 handler: MESSAGES. CURRENTSTATE/{sessionId} handler should be removed by CallbackHandler#handleChildChange()");
// check zkclient#listeners
dataListeners = ZkTestHelper.getZkDataListener(participantToExpire.getZkClient());
@@ -425,9 +425,9 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
Assert
.assertEquals(
childListeners.size(),
- 3,
- "Should have 3 paths (CURRENTSTATE/{oldSessionId}, CONTROLLER, and MESSAGES). "
- + "CONTROLLER and MESSAGE has 1 child-listener each. CURRENTSTATE/{oldSessionId} doesn't have listener (ZkClient doesn't remove empty childListener set. probably a ZkClient bug. see ZkClient#unsubscribeChildChange())");
+ 2,
+ "Should have 2 paths (CURRENTSTATE/{oldSessionId}, and MESSAGES). "
+ + "MESSAGE has 1 child-listener each. CURRENTSTATE/{oldSessionId} doesn't have listener (ZkClient doesn't remove empty childListener set. probably a ZkClient bug. see ZkClient#unsubscribeChildChange())");
path = keyBuilder.currentStates(participantToExpire.getInstanceName(), oldSessionId).getPath();
Assert.assertEquals(childListeners.get(path).size(), 0,
"Should have no child-listener on path: " + path);
@@ -435,16 +435,16 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
Assert.assertEquals(childListeners.get(path).size(), 1,
"Should have 1 child-listener on path: " + path);
path = keyBuilder.controller().getPath();
- Assert.assertEquals(childListeners.get(path).size(), 1,
- "Should have 1 child-listener on path: " + path);
+ Assert.assertNull(childListeners.get(path),
+ "Should have no child-listener on path: " + path);
// check zookeeper#watches on client side
watchPaths = ZkTestHelper.getZkWatch(participantToExpire.getZkClient());
LOG.debug("localhost_12918 zk-client side watchPaths: " + watchPaths);
- Assert.assertEquals(watchPaths.get("dataWatches").size(), 2,
- "Should have 2 data-watches: CONTROLLER and MESSAGES");
- Assert.assertEquals(watchPaths.get("childWatches").size(), 2,
- "Should have 2 child-watches: CONTROLLER and MESSAGES");
+ Assert.assertEquals(watchPaths.get("dataWatches").size(), 1,
+ "Should have 1 data-watch: MESSAGES");
+ Assert.assertEquals(watchPaths.get("childWatches").size(), 1,
+ "Should have 1 child-watch: MESSAGES");
Assert
.assertEquals(watchPaths.get("existWatches").size(), 2,
"Should have 2 exist-watches: CURRENTSTATE/{oldSessionId} and CURRENTSTATE/{oldSessionId}/TestDB0");
@@ -463,10 +463,10 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
// check zookeeper#watches on client side
watchPaths = ZkTestHelper.getZkWatch(participantToExpire.getZkClient());
LOG.debug("localhost_12918 zk-client side watchPaths: " + watchPaths);
- Assert.assertEquals(watchPaths.get("dataWatches").size(), 2,
- "Should have 2 data-watches: CONTROLLER and MESSAGES");
- Assert.assertEquals(watchPaths.get("childWatches").size(), 2,
- "Should have 2 child-watches: CONTROLLER and MESSAGES");
+ Assert.assertEquals(watchPaths.get("dataWatches").size(), 1,
+ "Should have 1 data-watch: MESSAGES");
+ Assert.assertEquals(watchPaths.get("childWatches").size(), 1,
+ "Should have 1 child-watch: MESSAGES");
Assert
.assertEquals(
watchPaths.get("existWatches").size(),
http://git-wip-us.apache.org/repos/asf/helix/blob/85277291/helix-core/src/test/java/org/apache/helix/manager/zk/TestLiveInstanceBounce.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestLiveInstanceBounce.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestLiveInstanceBounce.java
index 547e863..c2982b2 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestLiveInstanceBounce.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestLiveInstanceBounce.java
@@ -19,13 +19,13 @@ package org.apache.helix.manager.zk;
* under the License.
*/
-import org.apache.helix.integration.ZkStandAloneCMTestBaseWithPropertyServerCheck;
+import org.apache.helix.integration.ZkStandAloneCMTestBase;
import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.tools.ClusterStateVerifier;
import org.testng.Assert;
import org.testng.annotations.Test;
-public class TestLiveInstanceBounce extends ZkStandAloneCMTestBaseWithPropertyServerCheck {
+public class TestLiveInstanceBounce extends ZkStandAloneCMTestBase {
@Test
public void testInstanceBounce() throws Exception {
int handlerSize = _controller.getHandlers().size();
http://git-wip-us.apache.org/repos/asf/helix/blob/85277291/helix-core/src/test/java/org/apache/helix/manager/zk/TestZKPropertyTransferServer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZKPropertyTransferServer.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZKPropertyTransferServer.java
deleted file mode 100644
index e01dd07..0000000
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZKPropertyTransferServer.java
+++ /dev/null
@@ -1,63 +0,0 @@
-package org.apache.helix.manager.zk;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.controller.restlet.ZKPropertyTransferServer;
-import org.apache.helix.integration.ZkStandAloneCMTestBaseWithPropertyServerCheck;
-import org.apache.helix.integration.manager.ClusterControllerManager;
-import org.apache.helix.manager.zk.ZKHelixDataAccessor;
-import org.apache.log4j.Logger;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-public class TestZKPropertyTransferServer extends ZkStandAloneCMTestBaseWithPropertyServerCheck {
- private static Logger LOG = Logger.getLogger(TestZKPropertyTransferServer.class);
-
- @Test
- public void TestControllerChange() throws Exception {
- String controllerName = CONTROLLER_PREFIX + "_0";
- _controller.syncStop();
-
- Thread.sleep(1000);
-
- // kill controller, participant should not know about the svc url
- for (int i = 0; i < NODE_NR; i++) {
- HelixDataAccessor accessor = _participants[i].getHelixDataAccessor();
- ZKHelixDataAccessor zkAccessor = (ZKHelixDataAccessor) accessor;
- Assert.assertTrue(zkAccessor._zkPropertyTransferSvcUrl == null
- || zkAccessor._zkPropertyTransferSvcUrl.equals(""));
- }
-
- _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
- _controller.syncStart();
-
- Thread.sleep(1000);
-
- // create controller again, the svc url is notified to the participants
- for (int i = 0; i < NODE_NR; i++) {
- HelixDataAccessor accessor = _participants[i].getHelixDataAccessor();
- ZKHelixDataAccessor zkAccessor = (ZKHelixDataAccessor) accessor;
- Assert.assertTrue(zkAccessor._zkPropertyTransferSvcUrl.equals(ZKPropertyTransferServer
- .getInstance().getWebserviceUrl()));
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/85277291/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkStateChangeListener.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkStateChangeListener.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkStateChangeListener.java
index aeb32f9..a9c028c 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkStateChangeListener.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkStateChangeListener.java
@@ -19,12 +19,11 @@ package org.apache.helix.manager.zk;
* under the License.
*/
-import org.apache.helix.integration.ZkStandAloneCMTestBaseWithPropertyServerCheck;
+import org.apache.helix.integration.ZkStandAloneCMTestBase;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.testng.Assert;
-import org.testng.annotations.Test;
-public class TestZkStateChangeListener extends ZkStandAloneCMTestBaseWithPropertyServerCheck {
+public class TestZkStateChangeListener extends ZkStandAloneCMTestBase {
// TODO this test has been covered by TestZkFlapping. check if still needed
// @Test
public void testDisconnectHistory() throws Exception {
[2/7] git commit: [HELIX-430] Fix test failures caused by restlet 2.2
Posted by ka...@apache.org.
[HELIX-430] Fix test failures caused by restlet 2.2
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/e914edb6
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/e914edb6
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/e914edb6
Branch: refs/heads/master
Commit: e914edb6b1796b82234a2555048654f3ae573369
Parents: 208f1fa
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Tue May 20 16:26:09 2014 -0700
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Thu Jul 10 10:51:32 2014 -0700
----------------------------------------------------------------------
...x-admin-webapp-0.7.1-incubating-SNAPSHOT.ivy | 4 +-
.../webapp/TestHelixAdminScenariosRest.java | 72 ++++++++++++--------
.../helix-core-0.7.1-incubating-SNAPSHOT.ivy | 6 +-
helix-core/pom.xml | 4 +-
pom.xml | 2 +-
5 files changed, 48 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/e914edb6/helix-admin-webapp/helix-admin-webapp-0.7.1-incubating-SNAPSHOT.ivy
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/helix-admin-webapp-0.7.1-incubating-SNAPSHOT.ivy b/helix-admin-webapp/helix-admin-webapp-0.7.1-incubating-SNAPSHOT.ivy
index 0478f34..35c6158 100644
--- a/helix-admin-webapp/helix-admin-webapp-0.7.1-incubating-SNAPSHOT.ivy
+++ b/helix-admin-webapp/helix-admin-webapp-0.7.1-incubating-SNAPSHOT.ivy
@@ -38,8 +38,8 @@ under the License.
</publications>
<dependencies>
<dependency org="org.apache.helix" name="helix-core" rev="0.7.1-incubating-SNAPSHOT" force="true" conf="compile->compile(*),master(*);runtime->runtime(*)"/>
- <dependency org="org.restlet" name="org.restlet" rev="1.1.10" force="true" conf="compile->compile(*),master(*);runtime->runtime(*)"/>
- <dependency org="com.noelios.restlet" name="com.noelios.restlet" rev="1.1.10" force="true" conf="compile->compile(*),master(*);runtime->runtime(*)"/>
+ <dependency org="org.apache.helix" name="helix-core" rev="0.6.4-SNAPSHOT" force="true" conf="compile->compile(*),master(*);runtime->runtime(*)"/>
+ <dependency org="org.restlet.jse" name="org.restlet" rev="2.2.1" force="true" conf="compile->compile(*),master(*);runtime->runtime(*)"/>
<dependency org="org.codehaus.jackson" name="jackson-core-asl" rev="1.8.5" force="true" conf="compile->compile(*),master(*);runtime->runtime(*)"/>
<dependency org="org.codehaus.jackson" name="jackson-mapper-asl" rev="1.8.5" force="true" conf="compile->compile(*),master(*);runtime->runtime(*)"/>
<dependency org="commons-cli" name="commons-cli" rev="1.2" force="true" conf="compile->compile(*),master(*);runtime->runtime(*)"/>
http://git-wip-us.apache.org/repos/asf/helix/blob/e914edb6/helix-admin-webapp/src/test/java/org/apache/helix/webapp/TestHelixAdminScenariosRest.java
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/src/test/java/org/apache/helix/webapp/TestHelixAdminScenariosRest.java b/helix-admin-webapp/src/test/java/org/apache/helix/webapp/TestHelixAdminScenariosRest.java
index 7f44174..8082f04 100644
--- a/helix-admin-webapp/src/test/java/org/apache/helix/webapp/TestHelixAdminScenariosRest.java
+++ b/helix-admin-webapp/src/test/java/org/apache/helix/webapp/TestHelixAdminScenariosRest.java
@@ -44,7 +44,6 @@ import org.apache.helix.tools.ClusterSetup;
import org.apache.helix.tools.ClusterStateVerifier;
import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
import org.apache.helix.tools.ClusterStateVerifier.MasterNbInExtViewVerifier;
-import org.apache.helix.webapp.RestAdminApplication;
import org.apache.helix.webapp.resources.ClusterRepresentationUtil;
import org.apache.helix.webapp.resources.InstancesResource.ListInstancesWrapper;
import org.apache.helix.webapp.resources.JsonParameters;
@@ -68,6 +67,8 @@ import org.testng.annotations.Test;
* Simulate all the admin tasks needed by using command line tool
*/
public class TestHelixAdminScenariosRest extends AdminTestBase {
+ private static final int MAX_RETRIES = 5;
+
RestAdminApplication _adminApp;
Component _component;
String _tag1 = "tag1123";
@@ -94,42 +95,53 @@ public class TestHelixAdminScenariosRest extends AdminTestBase {
static String assertSuccessPostOperation(String url, Map<String, String> jsonParameters,
boolean hasException) throws IOException {
- Reference resourceRef = new Reference(url);
-
- Request request = new Request(Method.POST, resourceRef);
- request.setEntity(
- JsonParameters.JSON_PARAMETERS + "="
- + ClusterRepresentationUtil.ObjectToJson(jsonParameters), MediaType.APPLICATION_ALL);
- Response response = _gClient.handle(request);
- Representation result = response.getEntity();
- StringWriter sw = new StringWriter();
- result.write(sw);
-
- Assert.assertTrue(response.getStatus().getCode() == Status.SUCCESS_OK.getCode());
- Assert.assertTrue(hasException == sw.toString().toLowerCase().contains("exception"));
- return sw.toString();
+ return assertSuccessPostOperation(url, jsonParameters, null, hasException);
}
static String assertSuccessPostOperation(String url, Map<String, String> jsonParameters,
Map<String, String> extraForm, boolean hasException) throws IOException {
Reference resourceRef = new Reference(url);
- Request request = new Request(Method.POST, resourceRef);
- String entity =
- JsonParameters.JSON_PARAMETERS + "="
- + ClusterRepresentationUtil.ObjectToJson(jsonParameters);
- for (String key : extraForm.keySet()) {
- entity = entity + "&" + (key + "=" + extraForm.get(key));
- }
- request.setEntity(entity, MediaType.APPLICATION_ALL);
- Response response = _gClient.handle(request);
- Representation result = response.getEntity();
- StringWriter sw = new StringWriter();
- result.write(sw);
+ int numRetries = 0;
+ while (numRetries <= MAX_RETRIES) {
+ Request request = new Request(Method.POST, resourceRef);
+
+ if (extraForm != null) {
+ String entity =
+ JsonParameters.JSON_PARAMETERS + "="
+ + ClusterRepresentationUtil.ObjectToJson(jsonParameters);
+ for (String key : extraForm.keySet()) {
+ entity = entity + "&" + (key + "=" + extraForm.get(key));
+ }
+ request.setEntity(entity, MediaType.APPLICATION_ALL);
+ } else {
+ request
+ .setEntity(
+ JsonParameters.JSON_PARAMETERS + "="
+ + ClusterRepresentationUtil.ObjectToJson(jsonParameters),
+ MediaType.APPLICATION_ALL);
+ }
- Assert.assertTrue(response.getStatus().getCode() == Status.SUCCESS_OK.getCode());
- Assert.assertTrue(hasException == sw.toString().toLowerCase().contains("exception"));
- return sw.toString();
+ Response response = _gClient.handle(request);
+ Representation result = response.getEntity();
+ StringWriter sw = new StringWriter();
+
+ if (result != null) {
+ result.write(sw);
+ }
+
+ int code = response.getStatus().getCode();
+ boolean successCode =
+ code == Status.SUCCESS_NO_CONTENT.getCode() || code == Status.SUCCESS_OK.getCode();
+ if (successCode || numRetries == MAX_RETRIES) {
+ Assert.assertTrue(successCode);
+ Assert.assertTrue(hasException == sw.toString().toLowerCase().contains("exception"));
+ return sw.toString();
+ }
+ numRetries++;
+ }
+ Assert.fail("Request failed after all retries");
+ return null;
}
void deleteUrl(String url, boolean hasException) throws IOException {
http://git-wip-us.apache.org/repos/asf/helix/blob/e914edb6/helix-core/helix-core-0.7.1-incubating-SNAPSHOT.ivy
----------------------------------------------------------------------
diff --git a/helix-core/helix-core-0.7.1-incubating-SNAPSHOT.ivy b/helix-core/helix-core-0.7.1-incubating-SNAPSHOT.ivy
index 137fc06..eb37bed 100644
--- a/helix-core/helix-core-0.7.1-incubating-SNAPSHOT.ivy
+++ b/helix-core/helix-core-0.7.1-incubating-SNAPSHOT.ivy
@@ -54,11 +54,7 @@ under the License.
<dependency org="commons-cli" name="commons-cli" rev="1.2" conf="compile->compile(default);runtime->runtime(default);default->default"/>
<dependency org="commons-math" name="commons-math" rev="2.1" conf="compile->compile(default);runtime->runtime(default);default->default"/>
<dependency org="com.github.sgroschupf" name="zkclient" rev="0.1" conf="compile->compile(default);runtime->runtime(default);default->default"/>
- <dependency org="org.apache.camel" name="camel-josql" rev="2.5.0" conf="compile->compile(default);runtime->runtime(default);default->default"/>
- <dependency org="org.apache.camel" name="camel-core" rev="2.5.0" conf="compile->compile(*),master(*);runtime->runtime(*)"/>
- <dependency org="net.sf.josql" name="gentlyweb-utils" rev="1.5" conf="compile->compile(*),master(*);runtime->runtime(*)"/>
- <dependency org="net.sf.josql" name="josql" rev="1.5" conf="compile->compile(*),master(*);runtime->runtime(*)"/>
- <dependency org="org.fusesource.commonman" name="commons-management" rev="1.0" conf="compile->compile(*),master(*);runtime->runtime(*)"/>
+ <dependency org="org.restlet.jse" name="org.restlet" rev="2.2.1" force="true" conf="compile->compile(*),master(*);runtime->runtime(*)"/>
<dependency org="commons-logging" name="commons-logging-api" rev="1.1" conf="compile->compile(*),master(*);runtime->runtime(*)"/>
<dependency org="org.restlet" name="org.restlet" rev="1.1.10" force="true" conf="compile->compile(*),master(*);runtime->runtime(*)"/>
<dependency org="com.noelios.restlet" name="com.noelios.restlet" rev="1.1.10" force="true" conf="compile->compile(*),master(*);runtime->runtime(*)"/>
http://git-wip-us.apache.org/repos/asf/helix/blob/e914edb6/helix-core/pom.xml
----------------------------------------------------------------------
diff --git a/helix-core/pom.xml b/helix-core/pom.xml
index a8414f4..b8e7a9a 100644
--- a/helix-core/pom.xml
+++ b/helix-core/pom.xml
@@ -45,7 +45,7 @@ under the License.
org.apache.zookeeper.txn*;resolution:=optional,
org.apache.zookeeper*;version="[3.3,4)",
org.codehaus.jackson*;version="[1.8,2)",
- org.restlet;version="[2.2.0,3]",
+ org.restlet;version="[2.2.1,3]",
*
</osgi.import>
<osgi.ignore>
@@ -133,7 +133,7 @@ under the License.
<dependency>
<groupId>org.restlet.jse</groupId>
<artifactId>org.restlet</artifactId>
- <version>2.2.0</version>
+ <version>2.2.1</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
http://git-wip-us.apache.org/repos/asf/helix/blob/e914edb6/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index acb8589..8b45e0a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -335,7 +335,7 @@ under the License.
<dependency>
<groupId>org.restlet.jse</groupId>
<artifactId>org.restlet</artifactId>
- <version>2.2.0</version>
+ <version>2.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.helix</groupId>