You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by zz...@apache.org on 2014/05/21 03:14:21 UTC
[2/2] git commit: [HELIX-446] Remove ZkPropertyTransfer and restlet
dependency from helix-core
[HELIX-446] Remove ZkPropertyTransfer and restlet dependency from helix-core
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/3146762b
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/3146762b
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/3146762b
Branch: refs/heads/helix-0.6.x
Commit: 3146762b2e3c171ed70d1cf5587c2c85992bbfa8
Parents: e8ad448
Author: zzhang <zz...@apache.org>
Authored: Tue May 20 17:56:01 2014 -0700
Committer: zzhang <zz...@apache.org>
Committed: Tue May 20 17:56:01 2014 -0700
----------------------------------------------------------------------
helix-core/pom.xml | 7 +-
.../java/org/apache/helix/PropertyType.java | 12 -
.../helix/controller/HelixControllerMain.java | 19 --
.../restlet/ZKPropertyTransferServer.java | 248 -------------------
.../controller/restlet/ZNRecordUpdate.java | 83 -------
.../restlet/ZNRecordUpdateResource.java | 77 ------
.../restlet/ZkPropertyTransferApplication.java | 45 ----
.../restlet/ZkPropertyTransferClient.java | 177 -------------
.../helix/controller/restlet/package-info.java | 23 --
.../manager/zk/DistributedLeaderElection.java | 10 -
.../manager/zk/ParticipantManagerHelper.java | 3 -
.../helix/manager/zk/ZKHelixDataAccessor.java | 71 +-----
.../apache/helix/manager/zk/ZKHelixManager.java | 2 -
.../manager/zk/ZNRecordStreamingSerializer.java | 6 +-
.../stages/TestMessageThrottleStage.java | 2 +-
.../helix/integration/TestAutoRebalance.java | 2 +-
.../TestAutoRebalancePartitionLimit.java | 2 +-
.../TestCustomizedIdealStateRebalancer.java | 2 +-
.../helix/integration/TestDisableNode.java | 2 +-
.../helix/integration/TestDisablePartition.java | 2 +-
.../helix/integration/TestDropResource.java | 2 +-
.../helix/integration/TestMessagingService.java | 4 +-
.../helix/integration/TestSchedulerMessage.java | 20 +-
.../integration/TestZkCallbackHandlerLeak.java | 77 +++---
...dAloneCMTestBaseWithPropertyServerCheck.java | 88 -------
.../manager/TestConsecutiveZkSessionExpiry.java | 4 +-
.../TestDistributedControllerManager.java | 4 +-
.../manager/TestZkCallbackHandlerLeak.java | 76 +++---
.../manager/zk/TestLiveInstanceBounce.java | 4 +-
.../zk/TestZKPropertyTransferServer.java | 65 -----
.../manager/zk/TestZkStateChangeListener.java | 4 +-
31 files changed, 113 insertions(+), 1030 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/3146762b/helix-core/pom.xml
----------------------------------------------------------------------
diff --git a/helix-core/pom.xml b/helix-core/pom.xml
index c70e8fc..09e5a22 100644
--- a/helix-core/pom.xml
+++ b/helix-core/pom.xml
@@ -45,7 +45,6 @@ under the License.
org.apache.zookeeper.txn*;resolution:=optional,
org.apache.zookeeper*;version="[3.3,4)",
org.codehaus.jackson*;version="[1.8,2)",
- org.restlet;version="[2.2.1,3]",
*
</osgi.import>
<osgi.ignore>
@@ -131,9 +130,9 @@ under the License.
<version>2.1</version>
</dependency>
<dependency>
- <groupId>org.restlet.jse</groupId>
- <artifactId>org.restlet</artifactId>
- <version>2.2.1</version>
+ <groupId>commons-codec</groupId>
+ <artifactId>commons-codec</artifactId>
+ <version>1.6</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
http://git-wip-us.apache.org/repos/asf/helix/blob/3146762b/helix-core/src/main/java/org/apache/helix/PropertyType.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/PropertyType.java b/helix-core/src/main/java/org/apache/helix/PropertyType.java
index a391f85..32c9fd3 100644
--- a/helix-core/src/main/java/org/apache/helix/PropertyType.java
+++ b/helix-core/src/main/java/org/apache/helix/PropertyType.java
@@ -78,8 +78,6 @@ public enum PropertyType {
*/
boolean isCached;
- boolean usePropertyTransferServer;
-
private PropertyType(Type type, boolean isPersistent, boolean mergeOnUpdate) {
this(type, isPersistent, mergeOnUpdate, false);
}
@@ -107,7 +105,6 @@ public enum PropertyType {
this.updateOnlyOnExists = updateOnlyOnExists;
this.createOnlyIfAbsent = createOnlyIfAbsent;
this.isCached = isCached;
- this.usePropertyTransferServer = isAsyncWrite;
}
/**
@@ -197,13 +194,4 @@ public enum PropertyType {
public boolean isCached() {
return isCached;
}
-
- /**
- * Check if this property uses a property transfer server
- * @return true if a property transfer server is used, false otherwise
- */
- public boolean usePropertyTransferServer() {
- return usePropertyTransferServer;
- }
-
}
http://git-wip-us.apache.org/repos/asf/helix/blob/3146762b/helix-core/src/main/java/org/apache/helix/controller/HelixControllerMain.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/HelixControllerMain.java b/helix-core/src/main/java/org/apache/helix/controller/HelixControllerMain.java
index 62f3b23..b6c16b5 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/HelixControllerMain.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/HelixControllerMain.java
@@ -47,7 +47,6 @@ import org.apache.commons.cli.ParseException;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
-import org.apache.helix.controller.restlet.ZKPropertyTransferServer;
import org.apache.helix.manager.zk.HelixManagerShutdownHook;
import org.apache.helix.participant.DistClusterControllerStateModelFactory;
import org.apache.helix.participant.StateMachineEngine;
@@ -58,7 +57,6 @@ public class HelixControllerMain {
public static final String cluster = "cluster";
public static final String help = "help";
public static final String mode = "mode";
- public static final String propertyTransferServicePort = "propertyTransferPort";
public static final String name = "controllerName";
public static final String STANDALONE = "STANDALONE";
public static final String DISTRIBUTED = "DISTRIBUTED";
@@ -101,19 +99,11 @@ public class HelixControllerMain {
controllerNameOption.setRequired(false);
controllerNameOption.setArgName("Cluster controller name (Optional)");
- Option portOption =
- OptionBuilder.withLongOpt(propertyTransferServicePort)
- .withDescription("Webservice port for ZkProperty controller transfer").create();
- portOption.setArgs(1);
- portOption.setRequired(false);
- portOption.setArgName("Cluster controller property transfer port (Optional)");
-
Options options = new Options();
options.addOption(helpOption);
options.addOption(zkServerOption);
options.addOption(clusterOption);
options.addOption(modeOption);
- options.addOption(portOption);
options.addOption(controllerNameOption);
return options;
@@ -208,15 +198,11 @@ public class HelixControllerMain {
String clusterName = cmd.getOptionValue(cluster);
String controllerMode = STANDALONE;
String controllerName = null;
- int propertyTransServicePort = -1;
if (cmd.hasOption(mode)) {
controllerMode = cmd.getOptionValue(mode);
}
- if (cmd.hasOption(propertyTransferServicePort)) {
- propertyTransServicePort = Integer.parseInt(cmd.getOptionValue(propertyTransferServicePort));
- }
if (controllerMode.equalsIgnoreCase(DISTRIBUTED) && !cmd.hasOption(name)) {
throw new IllegalArgumentException(
"A unique cluster controller name is required in DISTRIBUTED mode");
@@ -228,10 +214,6 @@ public class HelixControllerMain {
logger.info("Cluster manager started, zkServer: " + zkConnectString + ", clusterName:"
+ clusterName + ", controllerName:" + controllerName + ", mode:" + controllerMode);
- if (propertyTransServicePort > 0) {
- ZKPropertyTransferServer.getInstance().init(propertyTransServicePort, zkConnectString);
- }
-
HelixManager manager =
startHelixController(zkConnectString, clusterName, controllerName, controllerMode);
@@ -244,7 +226,6 @@ public class HelixControllerMain {
+ " interrupted");
} finally {
manager.disconnect();
- ZKPropertyTransferServer.getInstance().shutdown();
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/3146762b/helix-core/src/main/java/org/apache/helix/controller/restlet/ZKPropertyTransferServer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/restlet/ZKPropertyTransferServer.java b/helix-core/src/main/java/org/apache/helix/controller/restlet/ZKPropertyTransferServer.java
deleted file mode 100644
index e415da9..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/restlet/ZKPropertyTransferServer.java
+++ /dev/null
@@ -1,248 +0,0 @@
-package org.apache.helix.controller.restlet;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.net.InetAddress;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.logging.Level;
-
-import org.I0Itec.zkclient.DataUpdater;
-import org.apache.helix.AccessOption;
-import org.apache.helix.BaseDataAccessor;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.manager.zk.ZNRecordSerializer;
-import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.manager.zk.ZkClient;
-import org.apache.log4j.Logger;
-import org.restlet.Component;
-import org.restlet.Context;
-import org.restlet.data.Protocol;
-
-/**
- * Controller side restlet server that receives ZNRecordUpdate requests from
- * clients, and batch the ZNRecordUpdate and apply them to zookeeper. This is
- * to optimize the concurrency level of zookeeper access for ZNRecord updates
- * that does not require real-time, like message handling status updates and
- * healthcheck reports.
- * As one server will be used by multiple helix controllers that runs on the same machine,
- * This class is designed as a singleton. Application is responsible to call init()
- * and shutdown() on the getInstance().
- */
-public class ZKPropertyTransferServer {
- public static final String PORT = "port";
- public static String RESTRESOURCENAME = "ZNRecordUpdates";
- public static final String SERVER = "ZKPropertyTransferServer";
-
- // Frequency period for the ZNRecords are batch written to zookeeper
- public static int PERIOD = 10 * 1000;
- // If the buffered ZNRecord updates exceed the limit, do a zookeeper batch update.
- public static int MAX_UPDATE_LIMIT = 10000;
- private static Logger LOG = Logger.getLogger(ZKPropertyTransferServer.class);
-
- int _localWebservicePort;
- String _webserviceUrl;
- ZkBaseDataAccessor<ZNRecord> _accessor;
- String _zkAddress;
-
- AtomicReference<ConcurrentHashMap<String, ZNRecordUpdate>> _dataBufferRef =
- new AtomicReference<ConcurrentHashMap<String, ZNRecordUpdate>>();
-
- boolean _initialized = false;
- boolean _shutdownFlag = false;
- Component _component = null;
- Timer _timer = null;
-
- static {
- org.restlet.engine.Engine.setLogLevel(Level.SEVERE);
- }
-
- /**
- * Timertask for zookeeper batched writes
- */
- class ZKPropertyTransferTask extends TimerTask {
- @Override
- public void run() {
- try {
- sendData();
- } catch (Throwable t) {
- LOG.error("", t);
- }
-
- }
- }
-
- void sendData() {
- LOG.info("ZKPropertyTransferServer transfering data to zookeeper");
- ConcurrentHashMap<String, ZNRecordUpdate> updateCache = null;
-
- synchronized (_dataBufferRef) {
- updateCache = _dataBufferRef.getAndSet(new ConcurrentHashMap<String, ZNRecordUpdate>());
- }
-
- if (updateCache != null) {
- List<String> paths = new ArrayList<String>();
- List<DataUpdater<ZNRecord>> updaters = new ArrayList<DataUpdater<ZNRecord>>();
- List<ZNRecord> vals = new ArrayList<ZNRecord>();
- // BUGBUG : what if the instance is dropped?
- for (ZNRecordUpdate holder : updateCache.values()) {
- paths.add(holder.getPath());
- updaters.add(holder.getZNRecordUpdater());
- vals.add(holder.getRecord());
- }
- // Batch write the accumulated updates into zookeeper
- long timeStart = System.currentTimeMillis();
- if (paths.size() > 0) {
- _accessor.updateChildren(paths, updaters, AccessOption.PERSISTENT);
- }
- LOG.info("ZKPropertyTransferServer updated " + vals.size() + " records in "
- + (System.currentTimeMillis() - timeStart) + " ms");
- } else {
- LOG.warn("null _dataQueueRef. Should be in the beginning only");
- }
- }
-
- static ZKPropertyTransferServer _instance = new ZKPropertyTransferServer();
-
- private ZKPropertyTransferServer() {
- _dataBufferRef.getAndSet(new ConcurrentHashMap<String, ZNRecordUpdate>());
- }
-
- public static ZKPropertyTransferServer getInstance() {
- return _instance;
- }
-
- public boolean isInitialized() {
- return _initialized;
- }
-
- public void init(int localWebservicePort, String zkAddress) {
- if (!_initialized && !_shutdownFlag) {
- LOG.error("Initializing with port " + localWebservicePort + " zkAddress: " + zkAddress);
- _localWebservicePort = localWebservicePort;
- ZkClient zkClient = new ZkClient(zkAddress);
- zkClient.setZkSerializer(new ZNRecordSerializer());
- _accessor = new ZkBaseDataAccessor<ZNRecord>(zkClient);
- _zkAddress = zkAddress;
- startServer();
- } else {
- LOG.error("Already initialized with port " + _localWebservicePort + " shutdownFlag: "
- + _shutdownFlag);
- }
- }
-
- public String getWebserviceUrl() {
- if (!_initialized || _shutdownFlag) {
- LOG.debug("inited:" + _initialized + " shutdownFlag:" + _shutdownFlag + " , return");
- return null;
- }
- return _webserviceUrl;
- }
-
- /**
- * Add an ZNRecordUpdate to the change queue.
- * Called by the webservice front-end.
- */
- void enqueueData(ZNRecordUpdate e) {
- if (!_initialized || _shutdownFlag) {
- LOG.error("zkDataTransferServer inited:" + _initialized + " shutdownFlag:" + _shutdownFlag
- + " , return");
- return;
- }
- // Do local merge if receive multiple update on the same path
- synchronized (_dataBufferRef) {
- e.getRecord().setSimpleField(SERVER, _webserviceUrl);
- if (_dataBufferRef.get().containsKey(e.getPath())) {
- ZNRecord oldVal = _dataBufferRef.get().get(e.getPath()).getRecord();
- oldVal = e.getZNRecordUpdater().update(oldVal);
- _dataBufferRef.get().get(e.getPath())._record = oldVal;
- } else {
- _dataBufferRef.get().put(e.getPath(), e);
- }
- }
- if (_dataBufferRef.get().size() > MAX_UPDATE_LIMIT) {
- sendData();
- }
- }
-
- void startServer() {
- LOG.info("zkDataTransferServer starting on Port " + _localWebservicePort + " zkAddress "
- + _zkAddress);
-
- _component = new Component();
-
- _component.getServers().add(Protocol.HTTP, _localWebservicePort);
- Context applicationContext = _component.getContext().createChildContext();
- applicationContext.getAttributes().put(SERVER, this);
- applicationContext.getAttributes().put(PORT, "" + _localWebservicePort);
- ZkPropertyTransferApplication application =
- new ZkPropertyTransferApplication(applicationContext);
- // Attach the application to the component and start it
- _component.getDefaultHost().attach(application);
- _timer = new Timer(true);
- _timer.schedule(new ZKPropertyTransferTask(), PERIOD, PERIOD);
-
- try {
- _webserviceUrl =
- "http://" + InetAddress.getLocalHost().getCanonicalHostName() + ":"
- + _localWebservicePort + "/" + RESTRESOURCENAME;
- _component.start();
- _initialized = true;
- } catch (Exception e) {
- LOG.error("", e);
- }
- LOG.info("zkDataTransferServer started on Port " + _localWebservicePort + " zkAddress "
- + _zkAddress);
- }
-
- public void shutdown() {
- if (_shutdownFlag) {
- LOG.error("ZKPropertyTransferServer already has been shutdown...");
- return;
- }
- LOG.info("zkDataTransferServer shuting down on Port " + _localWebservicePort + " zkAddress "
- + _zkAddress);
- if (_timer != null) {
- _timer.cancel();
- }
- if (_component != null) {
- try {
- _component.stop();
- } catch (Exception e) {
- LOG.error("", e);
- }
- }
- _shutdownFlag = true;
- }
-
- public void reset() {
- if (_shutdownFlag == true) {
- _shutdownFlag = false;
- _initialized = false;
- _component = null;
- _timer = null;
- _dataBufferRef.getAndSet(new ConcurrentHashMap<String, ZNRecordUpdate>());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/3146762b/helix-core/src/main/java/org/apache/helix/controller/restlet/ZNRecordUpdate.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/restlet/ZNRecordUpdate.java b/helix-core/src/main/java/org/apache/helix/controller/restlet/ZNRecordUpdate.java
deleted file mode 100644
index deef748..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/restlet/ZNRecordUpdate.java
+++ /dev/null
@@ -1,83 +0,0 @@
-package org.apache.helix.controller.restlet;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import org.I0Itec.zkclient.DataUpdater;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.ZNRecordUpdater;
-import org.codehaus.jackson.annotate.JsonCreator;
-import org.codehaus.jackson.annotate.JsonIgnore;
-import org.codehaus.jackson.annotate.JsonProperty;
-
-/**
- * Unit of transfered ZNRecord updates. Contains the ZNRecord Value, zkPath
- * to store the update value, and the property type (used to merge the ZNRecord)
- * For ZNRecord subtraction, it is currently not supported yet.
- */
-public class ZNRecordUpdate {
- public enum OpCode {
- // TODO: create is not supported; but update will create if not exist
- CREATE,
- UPDATE,
- SET
- }
-
- final String _path;
- ZNRecord _record;
- final OpCode _code;
-
- @JsonCreator
- public ZNRecordUpdate(@JsonProperty("path") String path, @JsonProperty("opcode") OpCode code,
- @JsonProperty("record") ZNRecord record) {
- _path = path;
- _record = record;
- _code = code;
- }
-
- public String getPath() {
- return _path;
- }
-
- public ZNRecord getRecord() {
- return _record;
- }
-
- public OpCode getOpcode() {
- return _code;
- }
-
- @JsonIgnore(true)
- public DataUpdater<ZNRecord> getZNRecordUpdater() {
- if (_code == OpCode.SET)
-
- {
- return new ZNRecordUpdater(_record) {
- @Override
- public ZNRecord update(ZNRecord current) {
- return _record;
- }
- };
- } else if ((_code == OpCode.UPDATE)) {
- return new ZNRecordUpdater(_record);
- } else {
- throw new UnsupportedOperationException("Not supported : " + _code);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/3146762b/helix-core/src/main/java/org/apache/helix/controller/restlet/ZNRecordUpdateResource.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/restlet/ZNRecordUpdateResource.java b/helix-core/src/main/java/org/apache/helix/controller/restlet/ZNRecordUpdateResource.java
deleted file mode 100644
index 33593ae..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/restlet/ZNRecordUpdateResource.java
+++ /dev/null
@@ -1,77 +0,0 @@
-package org.apache.helix.controller.restlet;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.io.StringReader;
-import java.util.Map;
-import java.util.TreeMap;
-
-import org.apache.log4j.Logger;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.codehaus.jackson.type.TypeReference;
-import org.restlet.data.Form;
-import org.restlet.data.MediaType;
-import org.restlet.data.Status;
-import org.restlet.representation.Representation;
-import org.restlet.representation.Variant;
-import org.restlet.resource.ServerResource;
-
-/**
- * REST resource for ZkPropertyTransfer server to receive PUT requests
- * that submits ZNRecordUpdates
- */
-public class ZNRecordUpdateResource extends ServerResource {
- public static final String UPDATEKEY = "ZNRecordUpdate";
- private static Logger LOG = Logger.getLogger(ZNRecordUpdateResource.class);
-
- public ZNRecordUpdateResource() {
- getVariants().add(new Variant(MediaType.TEXT_PLAIN));
- getVariants().add(new Variant(MediaType.APPLICATION_JSON));
- setNegotiated(false);
- }
-
- @Override
- public Representation put(Representation entity) {
- try {
- ZKPropertyTransferServer server = ZKPropertyTransferServer.getInstance();
-
- Form form = new Form(entity);
- String jsonPayload = form.getFirstValue(UPDATEKEY, true);
-
- // Parse the map from zkPath --> ZNRecordUpdate from the payload
- StringReader sr = new StringReader(jsonPayload);
- ObjectMapper mapper = new ObjectMapper();
- TypeReference<TreeMap<String, ZNRecordUpdate>> typeRef = new TypeReference<TreeMap<String, ZNRecordUpdate>>() {
- };
- Map<String, ZNRecordUpdate> holderMap = mapper.readValue(sr, typeRef);
- // Enqueue the ZNRecordUpdate for sending
- for (ZNRecordUpdate holder : holderMap.values()) {
- server.enqueueData(holder);
- LOG.info("Received " + holder.getPath() + " from " + getRequest().getClientInfo().getAddress());
- }
- getResponse().setStatus(Status.SUCCESS_OK);
- } catch (Exception e) {
- LOG.error("", e);
- getResponse().setStatus(Status.SERVER_ERROR_INTERNAL);
- }
- return null;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/3146762b/helix-core/src/main/java/org/apache/helix/controller/restlet/ZkPropertyTransferApplication.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/restlet/ZkPropertyTransferApplication.java b/helix-core/src/main/java/org/apache/helix/controller/restlet/ZkPropertyTransferApplication.java
deleted file mode 100644
index 68d35cb..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/restlet/ZkPropertyTransferApplication.java
+++ /dev/null
@@ -1,45 +0,0 @@
-package org.apache.helix.controller.restlet;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import org.restlet.Application;
-import org.restlet.Context;
-import org.restlet.Restlet;
-import org.restlet.routing.Router;
-
-/**
- * Restlet application for ZkPropertyTransfer server
- */
-public class ZkPropertyTransferApplication extends Application {
- public ZkPropertyTransferApplication() {
- super();
- }
-
- public ZkPropertyTransferApplication(Context context) {
- super(context);
- }
-
- @Override
- public Restlet createInboundRoot() {
- Router router = new Router(getContext());
- router.attach("/" + ZKPropertyTransferServer.RESTRESOURCENAME, ZNRecordUpdateResource.class);
- return router;
- }
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/3146762b/helix-core/src/main/java/org/apache/helix/controller/restlet/ZkPropertyTransferClient.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/restlet/ZkPropertyTransferClient.java b/helix-core/src/main/java/org/apache/helix/controller/restlet/ZkPropertyTransferClient.java
deleted file mode 100644
index 092d845..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/restlet/ZkPropertyTransferClient.java
+++ /dev/null
@@ -1,177 +0,0 @@
-package org.apache.helix.controller.restlet;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.io.StringWriter;
-import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.logging.Level;
-
-import org.apache.helix.ZNRecord;
-import org.apache.log4j.Logger;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.restlet.Client;
-import org.restlet.Request;
-import org.restlet.Response;
-import org.restlet.data.MediaType;
-import org.restlet.data.Method;
-import org.restlet.data.Protocol;
-import org.restlet.data.Reference;
-import org.restlet.data.Status;
-
-public class ZkPropertyTransferClient {
- private static Logger LOG = Logger.getLogger(ZkPropertyTransferClient.class);
- public static final int DEFAULT_MAX_CONCURRENTTASKS = 2;
- public static int SEND_PERIOD = 10 * 1000;
-
- public static final String USE_PROPERTYTRANSFER = "UsePropertyTransfer";
-
- int _maxConcurrentTasks;
- ExecutorService _executorService;
- Client[] _clients;
- AtomicInteger _requestCount = new AtomicInteger(0);
-
- // ZNRecord update buffer: key is the zkPath, value is the ZNRecordUpdate
- AtomicReference<ConcurrentHashMap<String, ZNRecordUpdate>> _dataBufferRef =
- new AtomicReference<ConcurrentHashMap<String, ZNRecordUpdate>>();
- Timer _timer;
- volatile String _webServiceUrl = "";
-
- static {
- org.restlet.engine.Engine.setLogLevel(Level.SEVERE);
- }
-
- public ZkPropertyTransferClient(int maxConcurrentTasks) {
- _maxConcurrentTasks = maxConcurrentTasks;
- _executorService = Executors.newFixedThreadPool(_maxConcurrentTasks);
- _clients = new Client[_maxConcurrentTasks];
- for (int i = 0; i < _clients.length; i++) {
- _clients[i] = new Client(Protocol.HTTP);
- }
- _timer = new Timer(true);
- _timer.schedule(new SendZNRecordTimerTask(), SEND_PERIOD, SEND_PERIOD);
- _dataBufferRef.getAndSet(new ConcurrentHashMap<String, ZNRecordUpdate>());
- }
-
- class SendZNRecordTimerTask extends TimerTask {
- @Override
- public void run() {
- sendUpdateBatch();
- }
- }
-
- public void enqueueZNRecordUpdate(ZNRecordUpdate update, String webserviceUrl) {
- try {
- LOG.info("Enqueue update to " + update.getPath() + " opcode: " + update.getOpcode() + " to "
- + webserviceUrl);
- _webServiceUrl = webserviceUrl;
- update.getRecord().setSimpleField(USE_PROPERTYTRANSFER, "true");
- synchronized (_dataBufferRef) {
- if (_dataBufferRef.get().containsKey(update._path)) {
- ZNRecord oldVal = _dataBufferRef.get().get(update.getPath()).getRecord();
- oldVal = update.getZNRecordUpdater().update(oldVal);
- _dataBufferRef.get().get(update.getPath())._record = oldVal;
- } else {
- _dataBufferRef.get().put(update.getPath(), update);
- }
- }
- } catch (Exception e) {
- LOG.error("", e);
- }
- }
-
- void sendUpdateBatch() {
- LOG.debug("Actual sending update with " + _dataBufferRef.get().size() + " updates to "
- + _webServiceUrl);
- Map<String, ZNRecordUpdate> updateCache = null;
-
- synchronized (_dataBufferRef) {
- updateCache = _dataBufferRef.getAndSet(new ConcurrentHashMap<String, ZNRecordUpdate>());
- }
-
- if (updateCache != null && updateCache.size() > 0) {
- ZNRecordUpdateUploadTask task =
- new ZNRecordUpdateUploadTask(updateCache, _webServiceUrl,
- _clients[_requestCount.intValue() % _maxConcurrentTasks]);
- _requestCount.incrementAndGet();
- _executorService.submit(task);
- LOG.trace("Queue size :" + ((ThreadPoolExecutor) _executorService).getQueue().size());
- }
- }
-
- public void shutdown() {
- LOG.info("Shutting down ZkPropertyTransferClient");
- _executorService.shutdown();
- _timer.cancel();
- for (Client client : _clients) {
- try {
- client.stop();
- } catch (Exception e) {
- LOG.error("", e);
- }
- }
- }
-
- class ZNRecordUpdateUploadTask implements Callable<Void> {
- Map<String, ZNRecordUpdate> _updateMap;
- String _webServiceUrl;
- Client _client;
-
- ZNRecordUpdateUploadTask(Map<String, ZNRecordUpdate> update, String webserviceUrl, Client client) {
- _updateMap = update;
- _webServiceUrl = webserviceUrl;
- _client = client;
- }
-
- @Override
- public Void call() throws Exception {
- LOG.debug("Actual sending update with " + _updateMap.size() + " updates to " + _webServiceUrl);
- long time = System.currentTimeMillis();
- Reference resourceRef = new Reference(_webServiceUrl);
- Request request = new Request(Method.PUT, resourceRef);
-
- ObjectMapper mapper = new ObjectMapper();
- StringWriter sw = new StringWriter();
- try {
- mapper.writeValue(sw, _updateMap);
- } catch (Exception e) {
- LOG.error("", e);
- }
-
- request.setEntity(ZNRecordUpdateResource.UPDATEKEY + "=" + sw, MediaType.APPLICATION_ALL);
- // This is a sync call. See com.noelios.restlet.http.StreamClientCall.sendRequest()
- Response response = _client.handle(request);
-
- if (response.getStatus().getCode() != Status.SUCCESS_OK.getCode()) {
- LOG.error("Status : " + response.getStatus());
- }
- LOG.info("Using time : " + (System.currentTimeMillis() - time));
- return null;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/3146762b/helix-core/src/main/java/org/apache/helix/controller/restlet/package-info.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/restlet/package-info.java b/helix-core/src/main/java/org/apache/helix/controller/restlet/package-info.java
deleted file mode 100644
index 0ef7a79..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/restlet/package-info.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-/**
- * Restlet server for Helix controller
- *
- */
-package org.apache.helix.controller.restlet;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/helix/blob/3146762b/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedLeaderElection.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedLeaderElection.java b/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedLeaderElection.java
index 3175e8d..d281ae4 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedLeaderElection.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedLeaderElection.java
@@ -31,7 +31,6 @@ import org.apache.helix.NotificationContext;
import org.apache.helix.PropertyType;
import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.controller.GenericHelixController;
-import org.apache.helix.controller.restlet.ZKPropertyTransferServer;
import org.apache.helix.model.LeaderHistory;
import org.apache.helix.model.LiveInstance;
import org.apache.log4j.Logger;
@@ -122,15 +121,6 @@ public class DistributedLeaderElection implements ControllerChangeListener {
leader.setLiveInstance(ManagementFactory.getRuntimeMXBean().getName());
leader.setSessionId(manager.getSessionId());
leader.setHelixVersion(manager.getVersion());
- if (ZKPropertyTransferServer.getInstance() != null) {
- String zkPropertyTransferServiceUrl =
- ZKPropertyTransferServer.getInstance().getWebserviceUrl();
- if (zkPropertyTransferServiceUrl != null) {
- leader.setWebserviceUrl(zkPropertyTransferServiceUrl);
- }
- } else {
- LOG.warn("ZKPropertyTransferServer instnace is null");
- }
boolean success = accessor.createProperty(keyBuilder.controllerLeader(), leader);
if (success) {
return true;
http://git-wip-us.apache.org/repos/asf/helix/blob/3146762b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManagerHelper.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManagerHelper.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManagerHelper.java
index b80ae55..1bee2fe 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManagerHelper.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManagerHelper.java
@@ -27,7 +27,6 @@ import org.I0Itec.zkclient.exception.ZkNodeExistsException;
import org.apache.helix.AccessOption;
import org.apache.helix.ConfigAccessor;
import org.apache.helix.HelixAdmin;
-import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixException;
import org.apache.helix.HelixManager;
import org.apache.helix.InstanceType;
@@ -43,7 +42,6 @@ import org.apache.helix.model.StateModelDefinition;
import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
import org.apache.helix.model.Message.MessageType;
import org.apache.helix.model.builder.HelixConfigScopeBuilder;
-import org.apache.helix.participant.HelixStateMachineEngine;
import org.apache.helix.participant.StateMachineEngine;
import org.apache.helix.participant.statemachine.ScheduledTaskStateModelFactory;
import org.apache.log4j.Logger;
@@ -271,7 +269,6 @@ public class ParticipantManagerHelper {
_messagingService.registerMessageHandlerFactory(MessageType.STATE_TRANSITION.toString(),
_stateMachineEngine);
_manager.addMessageListener(_messagingService.getExecutor(), _instanceName);
- _manager.addControllerListener(_dataAccessor);
ScheduledTaskStateModelFactory stStateModelFactory =
new ScheduledTaskStateModelFactory(_messagingService.getExecutor());
http://git-wip-us.apache.org/repos/asf/helix/blob/3146762b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java
index 94e8feb..cacc20d 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java
@@ -18,6 +18,7 @@ package org.apache.helix.manager.zk;
* specific language governing permissions and limitations
* under the License.
*/
+
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -28,13 +29,11 @@ import org.I0Itec.zkclient.DataUpdater;
import org.I0Itec.zkclient.exception.ZkNoNodeException;
import org.apache.helix.AccessOption;
import org.apache.helix.BaseDataAccessor;
-import org.apache.helix.ControllerChangeListener;
import org.apache.helix.GroupCommit;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixException;
import org.apache.helix.HelixProperty;
import org.apache.helix.InstanceType;
-import org.apache.helix.NotificationContext;
import org.apache.helix.PropertyKey;
import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.PropertyType;
@@ -42,20 +41,15 @@ import org.apache.helix.ZNRecord;
import org.apache.helix.ZNRecordAssembler;
import org.apache.helix.ZNRecordBucketizer;
import org.apache.helix.ZNRecordUpdater;
-import org.apache.helix.controller.restlet.ZNRecordUpdate;
-import org.apache.helix.controller.restlet.ZNRecordUpdate.OpCode;
-import org.apache.helix.controller.restlet.ZkPropertyTransferClient;
-import org.apache.helix.model.LiveInstance;
import org.apache.log4j.Logger;
import org.apache.zookeeper.data.Stat;
-public class ZKHelixDataAccessor implements HelixDataAccessor, ControllerChangeListener {
+public class ZKHelixDataAccessor implements HelixDataAccessor {
private static Logger LOG = Logger.getLogger(ZKHelixDataAccessor.class);
private final BaseDataAccessor<ZNRecord> _baseDataAccessor;
final InstanceType _instanceType;
private final String _clusterName;
private final Builder _propertyKeyBuilder;
- ZkPropertyTransferClient _zkPropertyTransferClient = null;
private final GroupCommit _groupCommit = new GroupCommit();
String _zkPropertyTransferSvcUrl = null;
@@ -100,14 +94,6 @@ public class ZKHelixDataAccessor implements HelixDataAccessor, ControllerChangeL
String path = key.getPath();
int options = constructOptions(type);
- if (type.usePropertyTransferServer()) {
- if (_zkPropertyTransferSvcUrl != null && _zkPropertyTransferClient != null) {
- ZNRecordUpdate update = new ZNRecordUpdate(path, OpCode.SET, value.getRecord());
- _zkPropertyTransferClient.enqueueZNRecordUpdate(update, _zkPropertyTransferSvcUrl);
- return true;
- }
- }
-
boolean success = false;
switch (type) {
case IDEALSTATES:
@@ -154,20 +140,12 @@ public class ZKHelixDataAccessor implements HelixDataAccessor, ControllerChangeL
case CURRENTSTATES:
success = _groupCommit.commit(_baseDataAccessor, options, path, value.getRecord());
break;
- default:
- if (type.usePropertyTransferServer()) {
- if (_zkPropertyTransferSvcUrl != null && _zkPropertyTransferClient != null) {
- ZNRecordUpdate update = new ZNRecordUpdate(path, OpCode.UPDATE, value.getRecord());
- _zkPropertyTransferClient.enqueueZNRecordUpdate(update, _zkPropertyTransferSvcUrl);
-
- return true;
- } else {
- if (LOG.isTraceEnabled()) {
- LOG.trace("getPropertyTransferUrl is null, skip updating the value");
- }
- return true;
- }
+ case STATUSUPDATES:
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Update status. path: " + key.getPath() + ", record: " + value.getRecord());
}
+ break;
+ default:
success = _baseDataAccessor.update(path, new ZNRecordUpdater(value.getRecord()), options);
break;
}
@@ -483,39 +461,4 @@ public class ZKHelixDataAccessor implements HelixDataAccessor, ControllerChangeL
List<DataUpdater<ZNRecord>> updaters, int options) {
return _baseDataAccessor.updateChildren(paths, updaters, options);
}
-
- public void shutdown() {
- if (_zkPropertyTransferClient != null) {
- _zkPropertyTransferClient.shutdown();
- }
- }
-
- @Override
- public void onControllerChange(NotificationContext changeContext) {
- LOG.info("Controller has changed");
- refreshZkPropertyTransferUrl();
- if (_zkPropertyTransferClient == null) {
- if (_zkPropertyTransferSvcUrl != null && _zkPropertyTransferSvcUrl.length() > 0) {
- LOG.info("Creating ZkPropertyTransferClient as we get url " + _zkPropertyTransferSvcUrl);
- _zkPropertyTransferClient =
- new ZkPropertyTransferClient(ZkPropertyTransferClient.DEFAULT_MAX_CONCURRENTTASKS);
- }
- }
- }
-
- void refreshZkPropertyTransferUrl() {
- try {
- LiveInstance leader = getProperty(keyBuilder().controllerLeader());
- if (leader != null) {
- _zkPropertyTransferSvcUrl = leader.getWebserviceUrl();
- LOG.info("_zkPropertyTransferSvcUrl : " + _zkPropertyTransferSvcUrl + " Controller "
- + leader.getInstanceName());
- } else {
- _zkPropertyTransferSvcUrl = null;
- }
- } catch (Exception e) {
- // LOG.error("", e);
- _zkPropertyTransferSvcUrl = null;
- }
- }
}
http://git-wip-us.apache.org/repos/asf/helix/blob/3146762b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
index ed6c8a8..a24ec32 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
@@ -518,8 +518,6 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
// TODO reset user defined handlers only
resetHandlers();
- _dataAccessor.shutdown();
-
if (_leaderElectionHandler != null) {
_leaderElectionHandler.reset();
}
http://git-wip-us.apache.org/repos/asf/helix/blob/3146762b/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordStreamingSerializer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordStreamingSerializer.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordStreamingSerializer.java
index bd37d42..2d7cb3c 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordStreamingSerializer.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordStreamingSerializer.java
@@ -28,6 +28,7 @@ import java.util.TreeMap;
import org.I0Itec.zkclient.exception.ZkMarshallingError;
import org.I0Itec.zkclient.serialize.ZkSerializer;
+import org.apache.commons.codec.binary.Base64;
import org.apache.helix.HelixException;
import org.apache.helix.ZNRecord;
import org.apache.log4j.Logger;
@@ -35,7 +36,6 @@ import org.codehaus.jackson.JsonFactory;
import org.codehaus.jackson.JsonGenerator;
import org.codehaus.jackson.JsonParser;
import org.codehaus.jackson.JsonToken;
-import org.restlet.engine.util.Base64;
import com.google.common.collect.Maps;
@@ -141,7 +141,7 @@ public class ZNRecordStreamingSerializer implements ZkSerializer {
if (rawPayload != null && rawPayload.length > 0) {
// write rawPayload
g.writeRaw("\n ");
- g.writeStringField("rawPayload", Base64.encode(rawPayload, false));
+ g.writeStringField("rawPayload", new String(Base64.encodeBase64(rawPayload), "UTF-8"));
}
g.writeRaw("\n");
@@ -226,7 +226,7 @@ public class ZNRecordStreamingSerializer implements ZkSerializer {
}
} else if ("rawPayload".equals(fieldname)) {
- rawPayload = Base64.decode(jp.getText());
+ rawPayload = Base64.decodeBase64(jp.getText());
} else {
throw new IllegalStateException("Unrecognized field '" + fieldname + "'!");
}
http://git-wip-us.apache.org/repos/asf/helix/blob/3146762b/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java
index cdf11fe..3a321cc 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java
@@ -62,7 +62,7 @@ public class TestMessageThrottleStage extends ZkUnitTestBase {
System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
HelixDataAccessor accessor =
- new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_gZkClient));
+ new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
HelixManager manager = new DummyClusterManager(clusterName, accessor);
// ideal state: node0 is MASTER, node1 is SLAVE
http://git-wip-us.apache.org/repos/asf/helix/blob/3146762b/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalance.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalance.java
index 746b463..4f6f306 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalance.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalance.java
@@ -45,7 +45,7 @@ import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
-public class TestAutoRebalance extends ZkStandAloneCMTestBaseWithPropertyServerCheck {
+public class TestAutoRebalance extends ZkStandAloneCMTestBase {
String db2 = TEST_DB + "2";
String _tag = "SSDSSD";
http://git-wip-us.apache.org/repos/asf/helix/blob/3146762b/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalancePartitionLimit.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalancePartitionLimit.java b/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalancePartitionLimit.java
index 74a5699..f7eaf00 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalancePartitionLimit.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalancePartitionLimit.java
@@ -43,7 +43,7 @@ import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
-public class TestAutoRebalancePartitionLimit extends ZkStandAloneCMTestBaseWithPropertyServerCheck {
+public class TestAutoRebalancePartitionLimit extends ZkStandAloneCMTestBase {
private static final Logger LOG = Logger.getLogger(TestAutoRebalancePartitionLimit.class
.getName());
http://git-wip-us.apache.org/repos/asf/helix/blob/3146762b/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedIdealStateRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedIdealStateRebalancer.java b/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedIdealStateRebalancer.java
index 38903c7..26b9cb0 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedIdealStateRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedIdealStateRebalancer.java
@@ -45,7 +45,7 @@ import org.testng.annotations.Test;
import com.beust.jcommander.internal.Lists;
public class TestCustomizedIdealStateRebalancer extends
- ZkStandAloneCMTestBaseWithPropertyServerCheck {
+ ZkStandAloneCMTestBase {
String db2 = TEST_DB + "2";
static boolean testRebalancerCreated = false;
static boolean testRebalancerInvoked = false;
http://git-wip-us.apache.org/repos/asf/helix/blob/3146762b/helix-core/src/test/java/org/apache/helix/integration/TestDisableNode.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestDisableNode.java b/helix-core/src/test/java/org/apache/helix/integration/TestDisableNode.java
index be523d0..ca54fb3 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestDisableNode.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestDisableNode.java
@@ -25,7 +25,7 @@ import org.apache.helix.tools.ClusterStateVerifier;
import org.testng.Assert;
import org.testng.annotations.Test;
-public class TestDisableNode extends ZkStandAloneCMTestBaseWithPropertyServerCheck {
+public class TestDisableNode extends ZkStandAloneCMTestBase {
@Test()
public void testDisableNode() throws Exception {
http://git-wip-us.apache.org/repos/asf/helix/blob/3146762b/helix-core/src/test/java/org/apache/helix/integration/TestDisablePartition.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestDisablePartition.java b/helix-core/src/test/java/org/apache/helix/integration/TestDisablePartition.java
index fcfc744..83d7b14 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestDisablePartition.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestDisablePartition.java
@@ -46,7 +46,7 @@ import org.testng.annotations.Test;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
-public class TestDisablePartition extends ZkStandAloneCMTestBaseWithPropertyServerCheck {
+public class TestDisablePartition extends ZkStandAloneCMTestBase {
private static Logger LOG = Logger.getLogger(TestDisablePartition.class);
@Test()
http://git-wip-us.apache.org/repos/asf/helix/blob/3146762b/helix-core/src/test/java/org/apache/helix/integration/TestDropResource.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestDropResource.java b/helix-core/src/test/java/org/apache/helix/integration/TestDropResource.java
index 0d02d12..047ea75 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestDropResource.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestDropResource.java
@@ -26,7 +26,7 @@ import org.apache.helix.tools.ClusterStateVerifier;
import org.testng.Assert;
import org.testng.annotations.Test;
-public class TestDropResource extends ZkStandAloneCMTestBaseWithPropertyServerCheck {
+public class TestDropResource extends ZkStandAloneCMTestBase {
@Test()
public void testDropResource() throws Exception {
// add a resource to be dropped
http://git-wip-us.apache.org/repos/asf/helix/blob/3146762b/helix-core/src/test/java/org/apache/helix/integration/TestMessagingService.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestMessagingService.java b/helix-core/src/test/java/org/apache/helix/integration/TestMessagingService.java
index 457b5fb..704d4d3 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestMessagingService.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestMessagingService.java
@@ -36,7 +36,7 @@ import org.apache.helix.model.Message.MessageType;
import org.testng.AssertJUnit;
import org.testng.annotations.Test;
-public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServerCheck {
+public class TestMessagingService extends ZkStandAloneCMTestBase {
public static class TestMessagingHandlerFactory implements MessageHandlerFactory {
public static HashSet<String> _processedMsgIds = new HashSet<String>();
@@ -166,8 +166,6 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ
.get("ReplyMessage"));
if (message.getRecord().getMapField(Message.Attributes.MESSAGE_RESULT.toString())
.get("ReplyMessage") == null) {
- int x = 0;
- x++;
}
_replyedMessageContents.add(message.getRecord()
.getMapField(Message.Attributes.MESSAGE_RESULT.toString()).get("ReplyMessage"));
http://git-wip-us.apache.org/repos/asf/helix/blob/3146762b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
index 49fd98c..7cd64d7 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
@@ -62,7 +62,7 @@ import org.codehaus.jackson.map.SerializationConfig;
import org.testng.Assert;
import org.testng.annotations.Test;
-public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServerCheck {
+public class TestSchedulerMessage extends ZkStandAloneCMTestBase {
class MockAsyncCallback extends AsyncCallback {
Message _message;
@@ -189,7 +189,7 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
}
@Test()
- public void TestSchedulerMsgUsingQueue() throws Exception {
+ public void testSchedulerMsgUsingQueue() throws Exception {
Logger.getRootLogger().setLevel(Level.INFO);
_factory._results.clear();
Thread.sleep(2000);
@@ -278,7 +278,7 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
}
@Test()
- public void TestSchedulerMsg() throws Exception {
+ public void testSchedulerMsg() throws Exception {
Logger.getRootLogger().setLevel(Level.INFO);
_factory._results.clear();
HelixManager manager = null;
@@ -382,7 +382,7 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
PropertyType.STATUSUPDATES);
subPaths = _gZkClient.getChildren(instanceStatusPath);
- Assert.assertTrue(subPaths.size() > 0);
+ Assert.assertTrue(subPaths.size() == 0);
for (String subPath : subPaths) {
String nextPath = instanceStatusPath + "/" + subPath;
List<String> subsubPaths = _gZkClient.getChildren(nextPath);
@@ -405,7 +405,7 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
}
subPaths = _gZkClient.getChildren(instanceStatusPath);
- Assert.assertTrue(subPaths.size() > 0);
+ Assert.assertTrue(subPaths.size() == 0);
for (String subPath : subPaths) {
String nextPath = instanceStatusPath + "/" + subPath;
List<String> subsubPaths = _gZkClient.getChildren(nextPath);
@@ -418,7 +418,7 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
}
@Test()
- public void TestSchedulerMsg2() throws Exception {
+ public void testSchedulerMsg2() throws Exception {
_factory._results.clear();
Thread.sleep(2000);
HelixManager manager = null;
@@ -510,7 +510,7 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
}
@Test()
- public void TestSchedulerZeroMsg() throws Exception {
+ public void testSchedulerZeroMsg() throws Exception {
TestMessagingHandlerFactory factory = new TestMessagingHandlerFactory();
HelixManager manager = null;
for (int i = 0; i < NODE_NR; i++) {
@@ -580,7 +580,7 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
}
@Test()
- public void TestSchedulerMsg3() throws Exception {
+ public void testSchedulerMsg3() throws Exception {
_factory._results.clear();
Thread.sleep(2000);
HelixManager manager = null;
@@ -702,7 +702,7 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
}
@Test()
- public void TestSchedulerMsg4() throws Exception {
+ public void testSchedulerMsg4() throws Exception {
_factory._results.clear();
HelixManager manager = null;
for (int i = 0; i < NODE_NR; i++) {
@@ -851,7 +851,7 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
}
@Test
- public void TestSchedulerMsgContraints() throws JsonGenerationException, JsonMappingException,
+ public void testSchedulerMsgContraints() throws JsonGenerationException, JsonMappingException,
IOException, InterruptedException {
TestMessagingHandlerFactoryLatch factory = new TestMessagingHandlerFactoryLatch();
HelixManager manager = null;
http://git-wip-us.apache.org/repos/asf/helix/blob/3146762b/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java b/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java
index 7759761..d874fcf 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java
@@ -104,11 +104,11 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
Set<String> watchPaths = watchers.get("0x" + participantManagerToExpire.getSessionId());
// System.out.println("participant watch paths: " + watchPaths);
- // participant should have 2 zk-watchers: 1 for MESSAGE and 1 for CONTROLLER
- return watchPaths.size() == 2;
+ // participant should have 1 zk-watcher: 1 for MESSAGE
+ return watchPaths.size() == 1;
}
}, 500);
- Assert.assertTrue(result, "Participant should have 2 zk-watchers.");
+ Assert.assertTrue(result, "Participant should have 1 zk-watcher. MESSAGES->HelixTaskExecutor");
// check HelixManager#_handlers
// printHandlers(controllerManager);
@@ -117,8 +117,8 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
int particHandlerNb = participantManagerToExpire.getHandlers().size();
Assert.assertEquals(controllerHandlerNb, 9,
"HelixController should have 9 (5+2n) callback handlers for 2 (n) participant");
- Assert.assertEquals(particHandlerNb, 2,
- "HelixParticipant should have 2 (msg+cur-state) callback handlers");
+ Assert.assertEquals(particHandlerNb, 1,
+ "HelixParticipant should have 1 (msg->HelixTaskExecutor) callback handlers");
// expire the session of participant
System.out.println("Expiring participant session...");
@@ -159,11 +159,11 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
Set<String> watchPaths = watchers.get("0x" + participantManagerToExpire.getSessionId());
// System.out.println("participant watch paths after session expiry: " + watchPaths);
- // participant should have 2 zk-watchers: 1 for MESSAGE and 1 for CONTROLLER
- return watchPaths.size() == 2;
+ // participant should have 1 zk-watcher: 1 for MESSAGE
+ return watchPaths.size() == 1;
}
}, 500);
- Assert.assertTrue(result, "Participant should have 2 zk-watchers after session expiry.");
+ Assert.assertTrue(result, "Participant should have 1 zk-watcher after session expiry.");
// check handlers
// printHandlers(controllerManager);
@@ -244,8 +244,8 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
Assert.assertEquals(controllerHandlerNb, 9,
"HelixController should have 9 (5+2n) callback handlers for 2 participant, but was "
+ controllerHandlerNb + ", " + printHandlers(controller));
- Assert.assertEquals(particHandlerNb, 2,
- "HelixParticipant should have 2 (msg+cur-state) callback handlers, but was "
+ Assert.assertEquals(particHandlerNb, 1,
+ "HelixParticipant should have 1 (msg->HelixTaskExecutor) callback handler, but was "
+ particHandlerNb + ", " + printHandlers(participantManager));
// expire controller
@@ -287,11 +287,11 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
Set<String> watchPaths = watchers.get("0x" + participantManager.getSessionId());
// System.out.println("participant watch paths after session expiry: " + watchPaths);
- // participant should have 2 zk-watchers: 1 for MESSAGE and 1 for CONTROLLER
- return watchPaths.size() == 2;
+ // participant should have 1 zk-watcher: 1 for MESSAGE
+ return watchPaths.size() == 1;
}
}, 500);
- Assert.assertTrue(result, "Participant should have 2 zk-watchers after session expiry.");
+ Assert.assertTrue(result, "Participant should have 1 zk-watcher after session expiry.");
// check HelixManager#_handlers
// printHandlers(controllerManager);
@@ -365,8 +365,8 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
PropertyKey.Builder keyBuilder = new PropertyKey.Builder(clusterName);
// check manager#hanlders
- Assert.assertEquals(participantToExpire.getHandlers().size(), 3,
- "Should have 3 handlers: CURRENTSTATE/{sessionId}, CONTROLLER, and MESSAGES");
+ Assert.assertEquals(participantToExpire.getHandlers().size(), 2,
+ "Should have 2 handlers: CURRENTSTATE/{sessionId}, and MESSAGES");
// check zkclient#listeners
Map<String, Set<IZkDataListener>> dataListeners =
@@ -382,10 +382,8 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
Assert.assertEquals(dataListeners.get(path).size(), 1, "Should have 1 data-listeners on path: "
+ path);
Assert
- .assertEquals(
- childListeners.size(),
- 3,
- "Should have 3 paths (CURRENTSTATE/{sessionId}, CONTROLLER, and MESSAGES) each of which has 1 child-listener");
+ .assertEquals(childListeners.size(), 2,
+ "Should have 2 paths (CURRENTSTATE/{sessionId}, and MESSAGES) each of which has 1 child-listener");
path = keyBuilder.currentStates(participantToExpire.getInstanceName(), oldSessionId).getPath();
Assert.assertEquals(childListeners.get(path).size(), 1,
"Should have 1 child-listener on path: " + path);
@@ -393,20 +391,17 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
Assert.assertEquals(childListeners.get(path).size(), 1,
"Should have 1 child-listener on path: " + path);
path = keyBuilder.controller().getPath();
- Assert.assertEquals(childListeners.get(path).size(), 1,
- "Should have 1 child-listener on path: " + path);
+ Assert.assertNull(childListeners.get(path), "Should have no child-listener on path: " + path);
// check zookeeper#watches on client side
Map<String, List<String>> watchPaths =
ZkTestHelper.getZkWatch(participantToExpire.getZkClient());
// System.out.println("localhost_12918 zk-client side watchPaths: " + watchPaths + "\n");
Assert
- .assertEquals(
- watchPaths.get("dataWatches").size(),
- 4,
- "Should have 4 data-watches: CURRENTSTATE/{sessionId}, CURRENTSTATE/{sessionId}/TestDB, CONTROLLER, MESSAGES");
- Assert.assertEquals(watchPaths.get("childWatches").size(), 3,
- "Should have 3 child-watches: CONTROLLER, MESSAGES, and CURRENTSTATE/{sessionId}");
+ .assertEquals(watchPaths.get("dataWatches").size(), 3,
+ "Should have 3 data-watches: CURRENTSTATE/{sessionId}, CURRENTSTATE/{sessionId}/TestDB, MESSAGES");
+ Assert.assertEquals(watchPaths.get("childWatches").size(), 2,
+ "Should have 2 child-watches: MESSAGES, and CURRENTSTATE/{sessionId}");
// expire localhost_12918
System.out.println("Expire participant: " + participantToExpire.getInstanceName()
@@ -425,8 +420,8 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
Assert
.assertEquals(
participantToExpire.getHandlers().size(),
- 2,
- "Should have 2 handlers: CONTROLLER and MESSAGES. CURRENTSTATE/{sessionId} handler should be removed by CallbackHandler#handleChildChange()");
+ 1,
+ "Should have 1 handlers: MESSAGES. CURRENTSTATE/{sessionId} handler should be removed by CallbackHandler#handleChildChange()");
// check zkclient#listeners
dataListeners = ZkTestHelper.getZkDataListener(participantToExpire.getZkClient());
@@ -436,8 +431,8 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
Assert
.assertEquals(
childListeners.size(),
- 3,
- "Should have 3 paths (CURRENTSTATE/{oldSessionId}, CONTROLLER, and MESSAGES). "
+ 2,
+ "Should have 2 paths (CURRENTSTATE/{oldSessionId}, and MESSAGES). "
+ "CONTROLLER and MESSAGE has 1 child-listener each. CURRENTSTATE/{oldSessionId} doesn't have listener (ZkClient doesn't remove empty childListener set. probably a ZkClient bug. see ZkClient#unsubscribeChildChange())");
path = keyBuilder.currentStates(participantToExpire.getInstanceName(), oldSessionId).getPath();
Assert.assertEquals(childListeners.get(path).size(), 0,
@@ -446,16 +441,16 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
Assert.assertEquals(childListeners.get(path).size(), 1,
"Should have 1 child-listener on path: " + path);
path = keyBuilder.controller().getPath();
- Assert.assertEquals(childListeners.get(path).size(), 1,
- "Should have 1 child-listener on path: " + path);
+ Assert.assertNull(childListeners.get(path),
+ "Should have no child-listener on path: " + path);
// check zookeeper#watches on client side
watchPaths = ZkTestHelper.getZkWatch(participantToExpire.getZkClient());
// System.out.println("localhost_12918 zk-client side watchPaths: " + watchPaths + "\n");
- Assert.assertEquals(watchPaths.get("dataWatches").size(), 2,
- "Should have 2 data-watches: CONTROLLER and MESSAGES");
- Assert.assertEquals(watchPaths.get("childWatches").size(), 2,
- "Should have 2 child-watches: CONTROLLER and MESSAGES");
+ Assert.assertEquals(watchPaths.get("dataWatches").size(), 1,
+ "Should have 1 data-watches: MESSAGES");
+ Assert.assertEquals(watchPaths.get("childWatches").size(), 1,
+ "Should have 1 child-watches: MESSAGES");
Assert
.assertEquals(watchPaths.get("existWatches").size(), 2,
"Should have 2 exist-watches: CURRENTSTATE/{oldSessionId} and CURRENTSTATE/{oldSessionId}/TestDB0");
@@ -474,10 +469,10 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
// check zookeeper#watches on client side
watchPaths = ZkTestHelper.getZkWatch(participantToExpire.getZkClient());
// System.out.println("localhost_12918 zk-client side watchPaths: " + watchPaths + "\n");
- Assert.assertEquals(watchPaths.get("dataWatches").size(), 2,
- "Should have 2 data-watches: CONTROLLER and MESSAGES");
- Assert.assertEquals(watchPaths.get("childWatches").size(), 2,
- "Should have 2 child-watches: CONTROLLER and MESSAGES");
+ Assert.assertEquals(watchPaths.get("dataWatches").size(), 1,
+ "Should have 1 data-watches: MESSAGES");
+ Assert.assertEquals(watchPaths.get("childWatches").size(), 1,
+ "Should have 1 child-watches: MESSAGES");
Assert
.assertEquals(
watchPaths.get("existWatches").size(),
http://git-wip-us.apache.org/repos/asf/helix/blob/3146762b/helix-core/src/test/java/org/apache/helix/integration/ZkStandAloneCMTestBaseWithPropertyServerCheck.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/ZkStandAloneCMTestBaseWithPropertyServerCheck.java b/helix-core/src/test/java/org/apache/helix/integration/ZkStandAloneCMTestBaseWithPropertyServerCheck.java
deleted file mode 100644
index c6fbea6..0000000
--- a/helix-core/src/test/java/org/apache/helix/integration/ZkStandAloneCMTestBaseWithPropertyServerCheck.java
+++ /dev/null
@@ -1,88 +0,0 @@
-package org.apache.helix.integration;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.List;
-
-import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.PropertyKey.Builder;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.controller.restlet.ZKPropertyTransferServer;
-import org.apache.helix.controller.restlet.ZkPropertyTransferClient;
-import org.apache.helix.manager.zk.ZKHelixDataAccessor;
-import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.model.StatusUpdate;
-import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-
-/**
- * setup a storage cluster and start a zk-based cluster controller in stand-alone mode
- * start 5 dummy participants verify the current states at end
- */
-
-public class ZkStandAloneCMTestBaseWithPropertyServerCheck extends ZkStandAloneCMTestBase {
- @Override
- @BeforeClass
- public void beforeClass() throws Exception {
- ZKPropertyTransferServer.PERIOD = 500;
- ZkPropertyTransferClient.SEND_PERIOD = 500;
- ZKPropertyTransferServer.getInstance().init(19999, ZK_ADDR);
- super.beforeClass();
-
- Thread.sleep(1000);
- HelixDataAccessor accessor =
- new ZKHelixDataAccessor(CLUSTER_NAME, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
- Builder kb = accessor.keyBuilder();
-
- for (int i = 0; i < NODE_NR; i++) {
- String instanceName = _participants[i].getInstanceName();
- List<StatusUpdate> statusUpdates =
- accessor.getChildValues(kb.stateTransitionStatus(instanceName,
- _participants[i].getSessionId(), TEST_DB));
-
- for (int j = 0; j < 10; j++) {
- statusUpdates =
- accessor.getChildValues(kb.stateTransitionStatus(instanceName,
- _participants[i].getSessionId(), TEST_DB));
- if (statusUpdates.size() == 0) {
- Thread.sleep(500);
- } else {
- break;
- }
- }
- Assert.assertTrue(statusUpdates.size() > 0);
- for (StatusUpdate update : statusUpdates) {
- Assert.assertTrue(update.getRecord()
- .getSimpleField(ZkPropertyTransferClient.USE_PROPERTYTRANSFER).equals("true"));
- Assert
- .assertTrue(update.getRecord().getSimpleField(ZKPropertyTransferServer.SERVER) != null);
- }
- }
- }
-
- @Override
- @AfterClass
- public void afterClass() throws Exception {
- super.afterClass();
- ZKPropertyTransferServer.getInstance().shutdown();
- ZKPropertyTransferServer.getInstance().reset();
- }
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/3146762b/helix-core/src/test/java/org/apache/helix/integration/manager/TestConsecutiveZkSessionExpiry.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/TestConsecutiveZkSessionExpiry.java b/helix-core/src/test/java/org/apache/helix/integration/manager/TestConsecutiveZkSessionExpiry.java
index 8bff2fb..69448dc 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/manager/TestConsecutiveZkSessionExpiry.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/manager/TestConsecutiveZkSessionExpiry.java
@@ -243,8 +243,8 @@ public class TestConsecutiveZkSessionExpiry extends ZkUnitTestBase {
Assert
.assertEquals(
handlers.size(),
- 2,
- "Distributed controller should have 2 handlers (message and data-accessor) after lose leadership, but was "
+ 1,
+ "Distributed controller should have 1 handler (message) after lose leadership, but was "
+ handlers.size());
// clean up
http://git-wip-us.apache.org/repos/asf/helix/blob/3146762b/helix-core/src/test/java/org/apache/helix/integration/manager/TestDistributedControllerManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/TestDistributedControllerManager.java b/helix-core/src/test/java/org/apache/helix/integration/manager/TestDistributedControllerManager.java
index aa00a8d..f797d1b 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/manager/TestDistributedControllerManager.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/manager/TestDistributedControllerManager.java
@@ -146,8 +146,8 @@ public class TestDistributedControllerManager extends ZkIntegrationTestBase {
Assert
.assertEquals(
handlers.size(),
- 2,
- "Distributed controller should have 2 handlers (message and data-accessor) after lose leadership, but was "
+ 1,
+ "Distributed controller should have 1 handler (message) after lose leadership, but was "
+ handlers.size());
}
http://git-wip-us.apache.org/repos/asf/helix/blob/3146762b/helix-core/src/test/java/org/apache/helix/integration/manager/TestZkCallbackHandlerLeak.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/TestZkCallbackHandlerLeak.java b/helix-core/src/test/java/org/apache/helix/integration/manager/TestZkCallbackHandlerLeak.java
index 59a3fd9..9b3c32f 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/manager/TestZkCallbackHandlerLeak.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/manager/TestZkCallbackHandlerLeak.java
@@ -107,11 +107,11 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
Set<String> watchPaths = watchers.get("0x" + participantManagerToExpire.getSessionId());
LOG.debug("participant watch paths: " + watchPaths);
- // participant should have 2 zk-watchers: 1 for MESSAGE and 1 for CONTROLLER
- return watchPaths.size() == 2;
+ // participant should have 1 zk-watcher: 1 for MESSAGE
+ return watchPaths.size() == 1;
}
}, 500);
- Assert.assertTrue(result, "Participant should have 2 zk-watchers.");
+ Assert.assertTrue(result, "Participant should have 1 zk-watcher.");
// check HelixManager#_handlers
// printHandlers(controllerManager);
@@ -120,8 +120,8 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
int particHandlerNb = participantManagerToExpire.getHandlers().size();
Assert.assertEquals(controllerHandlerNb, (5 + 2 * n),
"HelixController should have 9 (5+2n) callback handlers for 2 (n) participant");
- Assert.assertEquals(particHandlerNb, 2,
- "HelixParticipant should have 2 (msg+cur-state) callback handlers");
+ Assert.assertEquals(particHandlerNb, 1,
+ "HelixParticipant should have 1 (msg->HelixTaskExecutor) callback handlers");
// expire the session of participant
LOG.debug("Expiring participant session...");
@@ -162,11 +162,11 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
Set<String> watchPaths = watchers.get("0x" + participantManagerToExpire.getSessionId());
LOG.debug("participant watch paths after session expiry: " + watchPaths);
- // participant should have 2 zk-watchers: 1 for MESSAGE and 1 for CONTROLLER
- return watchPaths.size() == 2;
+ // participant should have 1 zk-watcher: 1 for MESSAGE
+ return watchPaths.size() == 1;
}
}, 500);
- Assert.assertTrue(result, "Participant should have 2 zk-watchers after session expiry.");
+ Assert.assertTrue(result, "Participant should have 1 zk-watcher after session expiry.");
// check handlers
// printHandlers(controllerManager);
@@ -239,8 +239,8 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
Assert.assertEquals(controllerHandlerNb, (5 + 2 * n),
"HelixController should have 9 (5+2n) callback handlers for 2 participant, but was "
+ controllerHandlerNb + ", " + TestHelper.printHandlers(controller));
- Assert.assertEquals(particHandlerNb, 2,
- "HelixParticipant should have 2 (msg+cur-state) callback handlers, but was "
+ Assert.assertEquals(particHandlerNb, 1,
+ "HelixParticipant should have 1 (msg->HelixTaskExecutor) callback handler, but was "
+ particHandlerNb + ", " + TestHelper.printHandlers(participantManager));
// expire controller
@@ -283,11 +283,11 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
Set<String> watchPaths = watchers.get("0x" + participantManager.getSessionId());
LOG.debug("participant watch paths after session expiry: " + watchPaths);
- // participant should have 2 zk-watchers: 1 for MESSAGE and 1 for CONTROLLER
- return watchPaths.size() == 2;
+ // participant should have 1 zk-watcher: 1 for MESSAGE
+ return watchPaths.size() == 1;
}
}, 500);
- Assert.assertTrue(result, "Participant should have 2 zk-watchers after session expiry.");
+ Assert.assertTrue(result, "Participant should have 1 zk-watcher after session expiry.");
// check HelixManager#_handlers
// printHandlers(controllerManager);
@@ -354,8 +354,8 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
PropertyKey.Builder keyBuilder = new PropertyKey.Builder(clusterName);
// check manager#hanlders
- Assert.assertEquals(participantToExpire.getHandlers().size(), 3,
- "Should have 3 handlers: CURRENTSTATE/{sessionId}, CONTROLLER, and MESSAGES");
+ Assert.assertEquals(participantToExpire.getHandlers().size(), 2,
+ "Should have 2 handlers: CURRENTSTATE/{sessionId}, and MESSAGES");
// check zkclient#listeners
Map<String, Set<IZkDataListener>> dataListeners =
@@ -373,8 +373,8 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
Assert
.assertEquals(
childListeners.size(),
- 3,
- "Should have 3 paths (CURRENTSTATE/{sessionId}, CONTROLLER, and MESSAGES) each of which has 1 child-listener");
+ 2,
+ "Should have 2 paths (CURRENTSTATE/{sessionId}, and MESSAGES) each of which has 1 child-listener");
path = keyBuilder.currentStates(participantToExpire.getInstanceName(), oldSessionId).getPath();
Assert.assertEquals(childListeners.get(path).size(), 1,
"Should have 1 child-listener on path: " + path);
@@ -382,8 +382,8 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
Assert.assertEquals(childListeners.get(path).size(), 1,
"Should have 1 child-listener on path: " + path);
path = keyBuilder.controller().getPath();
- Assert.assertEquals(childListeners.get(path).size(), 1,
- "Should have 1 child-listener on path: " + path);
+ Assert.assertNull(childListeners.get(path),
+ "Should have no child-listener on path: " + path);
// check zookeeper#watches on client side
Map<String, List<String>> watchPaths =
@@ -392,10 +392,10 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
Assert
.assertEquals(
watchPaths.get("dataWatches").size(),
- 4,
- "Should have 4 data-watches: CURRENTSTATE/{sessionId}, CURRENTSTATE/{sessionId}/TestDB, CONTROLLER, MESSAGES");
- Assert.assertEquals(watchPaths.get("childWatches").size(), 3,
- "Should have 3 child-watches: CONTROLLER, MESSAGES, and CURRENTSTATE/{sessionId}");
+ 3,
+ "Should have 3 data-watches: CURRENTSTATE/{sessionId}, CURRENTSTATE/{sessionId}/TestDB, MESSAGES");
+ Assert.assertEquals(watchPaths.get("childWatches").size(), 2,
+ "Should have 2 child-watches: MESSAGES, and CURRENTSTATE/{sessionId}");
// expire localhost_12918
System.out.println("Expire participant: " + participantToExpire.getInstanceName()
@@ -414,8 +414,8 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
Assert
.assertEquals(
participantToExpire.getHandlers().size(),
- 2,
- "Should have 2 handlers: CONTROLLER and MESSAGES. CURRENTSTATE/{sessionId} handler should be removed by CallbackHandler#handleChildChange()");
+ 1,
+ "Should have 1 handler: MESSAGES. CURRENTSTATE/{sessionId} handler should be removed by CallbackHandler#handleChildChange()");
// check zkclient#listeners
dataListeners = ZkTestHelper.getZkDataListener(participantToExpire.getZkClient());
@@ -425,9 +425,9 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
Assert
.assertEquals(
childListeners.size(),
- 3,
- "Should have 3 paths (CURRENTSTATE/{oldSessionId}, CONTROLLER, and MESSAGES). "
- + "CONTROLLER and MESSAGE has 1 child-listener each. CURRENTSTATE/{oldSessionId} doesn't have listener (ZkClient doesn't remove empty childListener set. probably a ZkClient bug. see ZkClient#unsubscribeChildChange())");
+ 2,
+ "Should have 2 paths (CURRENTSTATE/{oldSessionId}, and MESSAGES). "
+ + "MESSAGE has 1 child-listener each. CURRENTSTATE/{oldSessionId} doesn't have listener (ZkClient doesn't remove empty childListener set. probably a ZkClient bug. see ZkClient#unsubscribeChildChange())");
path = keyBuilder.currentStates(participantToExpire.getInstanceName(), oldSessionId).getPath();
Assert.assertEquals(childListeners.get(path).size(), 0,
"Should have no child-listener on path: " + path);
@@ -435,16 +435,16 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
Assert.assertEquals(childListeners.get(path).size(), 1,
"Should have 1 child-listener on path: " + path);
path = keyBuilder.controller().getPath();
- Assert.assertEquals(childListeners.get(path).size(), 1,
- "Should have 1 child-listener on path: " + path);
+ Assert.assertNull(childListeners.get(path),
+ "Should have no child-listener on path: " + path);
// check zookeeper#watches on client side
watchPaths = ZkTestHelper.getZkWatch(participantToExpire.getZkClient());
LOG.debug("localhost_12918 zk-client side watchPaths: " + watchPaths);
- Assert.assertEquals(watchPaths.get("dataWatches").size(), 2,
- "Should have 2 data-watches: CONTROLLER and MESSAGES");
- Assert.assertEquals(watchPaths.get("childWatches").size(), 2,
- "Should have 2 child-watches: CONTROLLER and MESSAGES");
+ Assert.assertEquals(watchPaths.get("dataWatches").size(), 1,
+ "Should have 1 data-watch: MESSAGES");
+ Assert.assertEquals(watchPaths.get("childWatches").size(), 1,
+ "Should have 1 child-watch: MESSAGES");
Assert
.assertEquals(watchPaths.get("existWatches").size(), 2,
"Should have 2 exist-watches: CURRENTSTATE/{oldSessionId} and CURRENTSTATE/{oldSessionId}/TestDB0");
@@ -463,10 +463,10 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
// check zookeeper#watches on client side
watchPaths = ZkTestHelper.getZkWatch(participantToExpire.getZkClient());
LOG.debug("localhost_12918 zk-client side watchPaths: " + watchPaths);
- Assert.assertEquals(watchPaths.get("dataWatches").size(), 2,
- "Should have 2 data-watches: CONTROLLER and MESSAGES");
- Assert.assertEquals(watchPaths.get("childWatches").size(), 2,
- "Should have 2 child-watches: CONTROLLER and MESSAGES");
+ Assert.assertEquals(watchPaths.get("dataWatches").size(), 1,
+ "Should have 1 data-watch: MESSAGES");
+ Assert.assertEquals(watchPaths.get("childWatches").size(), 1,
+ "Should have 1 child-watch: MESSAGES");
Assert
.assertEquals(
watchPaths.get("existWatches").size(),