You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2014/08/28 20:00:15 UTC
[1/4] [HELIX-470] Netty-based IPC layer
Repository: helix
Updated Branches:
refs/heads/master 59b4bbb0b -> d8ec1ae75
http://git-wip-us.apache.org/repos/asf/helix/blob/f2475fa9/helix-ipc/src/test/java/org/apache/helix/ipc/TestNettyHelixIPCService.java
----------------------------------------------------------------------
diff --git a/helix-ipc/src/test/java/org/apache/helix/ipc/TestNettyHelixIPCService.java b/helix-ipc/src/test/java/org/apache/helix/ipc/TestNettyHelixIPCService.java
new file mode 100644
index 0000000..b399377
--- /dev/null
+++ b/helix-ipc/src/test/java/org/apache/helix/ipc/TestNettyHelixIPCService.java
@@ -0,0 +1,353 @@
+package org.apache.helix.ipc;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.TestHelper;
+import org.apache.helix.api.StateTransitionHandlerFactory;
+import org.apache.helix.api.TransitionHandler;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.StateModelDefId;
+import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.ipc.netty.NettyHelixIPCService;
+import org.apache.helix.model.HelixConfigScope;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.builder.HelixConfigScopeBuilder;
+import org.apache.helix.participant.statemachine.StateModelInfo;
+import org.apache.helix.participant.statemachine.Transition;
+import org.apache.helix.resolver.HelixAddress;
+import org.apache.helix.resolver.HelixMessageScope;
+import org.apache.helix.resolver.HelixResolver;
+import org.apache.helix.resolver.zk.ZKHelixResolver;
+import org.apache.helix.testutil.ZkTestBase;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestNettyHelixIPCService extends ZkTestBase {
+
+ private static final Logger LOG = Logger.getLogger(TestNettyHelixIPCService.class);
+
+ private static final String CLUSTER_NAME = "TEST_CLUSTER";
+ private static final String RESOURCE_NAME = "MyResource";
+
+ private int firstPort;
+ private int secondPort;
+ private HelixManager controller;
+ private HelixManager firstNode;
+ private HelixManager secondNode;
+ private HelixResolver firstResolver;
+ private HelixResolver secondResolver;
+
+ @BeforeClass
+ public void beforeClass() throws Exception {
+ // Allocate test resources
+ firstPort = TestHelper.getRandomPort();
+ secondPort = TestHelper.getRandomPort();
+
+ // Setup cluster
+ ClusterSetup clusterSetup = new ClusterSetup(_zkaddr);
+ clusterSetup.addCluster(CLUSTER_NAME, true);
+ clusterSetup.addInstanceToCluster(CLUSTER_NAME, "localhost_" + firstPort);
+ clusterSetup.addInstanceToCluster(CLUSTER_NAME, "localhost_" + secondPort);
+
+ // Start Helix agents
+ controller =
+ HelixControllerMain.startHelixController(_zkaddr, CLUSTER_NAME, "CONTROLLER", "STANDALONE");
+ firstNode =
+ HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, "localhost_" + firstPort,
+ InstanceType.PARTICIPANT, _zkaddr);
+ secondNode =
+ HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, "localhost_" + secondPort,
+ InstanceType.PARTICIPANT, _zkaddr);
+
+ // Connect participants
+ firstNode.getStateMachineEngine().registerStateModelFactory(
+ StateModelDefId.from("OnlineOffline"), new DummyStateModelFactory());
+ firstNode.connect();
+ secondNode.getStateMachineEngine().registerStateModelFactory(
+ StateModelDefId.from("OnlineOffline"), new DummyStateModelFactory());
+ secondNode.connect();
+
+ // Add a resource
+ clusterSetup.addResourceToCluster(CLUSTER_NAME, RESOURCE_NAME, 4, "OnlineOffline");
+ clusterSetup.rebalanceResource(CLUSTER_NAME, RESOURCE_NAME, 1);
+
+ // Wait for External view convergence
+ ClusterStateVerifier.verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(
+ _zkaddr, CLUSTER_NAME), 10000);
+
+ // Connect resolvers
+ firstResolver = new ZKHelixResolver(_zkaddr);
+ firstResolver.connect();
+ secondResolver = new ZKHelixResolver(_zkaddr);
+ secondResolver.connect();
+
+ // Configure
+ firstNode.getConfigAccessor().set(
+ new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.PARTICIPANT)
+ .forCluster(firstNode.getClusterName()).forParticipant(firstNode.getInstanceName())
+ .build(), HelixIPCService.IPC_PORT, String.valueOf(firstPort));
+ secondNode.getConfigAccessor().set(
+ new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.PARTICIPANT)
+ .forCluster(secondNode.getClusterName()).forParticipant(secondNode.getInstanceName())
+ .build(), HelixIPCService.IPC_PORT, String.valueOf(secondPort));
+ }
+
+ @AfterClass
+ public void afterClass() throws Exception {
+ firstNode.disconnect();
+ secondNode.disconnect();
+ controller.disconnect();
+ }
+
+ @Test
+ public void testService() throws Exception {
+ final int numMessages = 1000;
+ final int messageType = 1;
+
+ // Start first IPC service w/ counter
+ final ConcurrentMap<String, AtomicInteger> firstCounts =
+ new ConcurrentHashMap<String, AtomicInteger>();
+ final HelixIPCService firstIPC =
+ new NettyHelixIPCService(new NettyHelixIPCService.Config().setInstanceName(
+ firstNode.getInstanceName()).setPort(firstPort));
+ firstIPC.registerCallback(messageType, new HelixIPCCallback() {
+ @Override
+ public void onMessage(HelixMessageScope scope, UUID messageId, ByteBuf message) {
+ String key = scope.getPartition() + ":" + scope.getState();
+ firstCounts.putIfAbsent(key, new AtomicInteger());
+ firstCounts.get(key).incrementAndGet();
+ }
+ });
+ firstIPC.start();
+
+ // Start second IPC Service w/ counter
+ final ConcurrentMap<String, AtomicInteger> secondCounts =
+ new ConcurrentHashMap<String, AtomicInteger>();
+ final HelixIPCService secondIPC =
+ new NettyHelixIPCService(new NettyHelixIPCService.Config().setInstanceName(
+ secondNode.getInstanceName()).setPort(secondPort));
+ secondIPC.registerCallback(messageType, new HelixIPCCallback() {
+ @Override
+ public void onMessage(HelixMessageScope scope, UUID messageId, ByteBuf message) {
+ String key = scope.getPartition() + ":" + scope.getState();
+ secondCounts.putIfAbsent(key, new AtomicInteger());
+ secondCounts.get(key).incrementAndGet();
+ }
+ });
+ secondIPC.start();
+
+ // Allow resolver callbacks to fire
+ Thread.sleep(500);
+
+ // Find all partitions on second node...
+ String secondName = "localhost_" + secondPort;
+ Set<String> secondPartitions = new HashSet<String>();
+ IdealState idealState =
+ controller.getClusterManagmentTool().getResourceIdealState(CLUSTER_NAME, RESOURCE_NAME);
+ for (String partitionName : idealState.getPartitionSet()) {
+ for (Map.Entry<String, String> stateEntry : idealState.getInstanceStateMap(partitionName)
+ .entrySet()) {
+ if (stateEntry.getKey().equals(secondName)) {
+ secondPartitions.add(partitionName);
+ }
+ }
+ }
+
+ // And use first node to send messages to them
+ for (String partitionName : secondPartitions) {
+ for (int i = 0; i < numMessages; i++) {
+ HelixMessageScope scope =
+ new HelixMessageScope.Builder().cluster(firstNode.getClusterName())
+ .resource(RESOURCE_NAME).partition(partitionName).state("ONLINE").build();
+
+ Set<HelixAddress> destinations = firstResolver.getDestinations(scope);
+ for (HelixAddress destination : destinations) {
+ ByteBuf message = Unpooled.wrappedBuffer(("Hello" + i).getBytes());
+ firstIPC.send(destination, messageType, UUID.randomUUID(), message);
+ }
+ }
+ }
+
+ // Loopback
+ for (String partitionName : secondPartitions) {
+ for (int i = 0; i < numMessages; i++) {
+ HelixMessageScope scope =
+ new HelixMessageScope.Builder().cluster(secondNode.getClusterName())
+ .resource(RESOURCE_NAME).partition(partitionName).state("ONLINE").build();
+
+ Set<HelixAddress> destinations = secondResolver.getDestinations(scope);
+ for (HelixAddress destination : destinations) {
+ ByteBuf message = Unpooled.wrappedBuffer(("Hello" + i).getBytes());
+ secondIPC.send(destination, messageType, UUID.randomUUID(), message);
+ }
+ }
+ }
+
+ // Check
+ Thread.sleep(500); // just in case
+ for (String partitionName : secondPartitions) {
+ AtomicInteger count = secondCounts.get(partitionName + ":ONLINE");
+ Assert.assertNotNull(count);
+ Assert.assertEquals(count.get(), 2 * numMessages);
+ }
+
+ // Shutdown
+ firstIPC.shutdown();
+ secondIPC.shutdown();
+ }
+
+ @Test
+ public void testMessageManager() throws Exception {
+ final int numMessages = 1000;
+ final int messageType = 1;
+ final int ackMessageType = 2;
+
+ // First IPC service
+ final HelixIPCService firstIPC =
+ new NettyHelixIPCService(new NettyHelixIPCService.Config().setInstanceName(
+ firstNode.getInstanceName()).setPort(firstPort));
+ firstIPC.registerCallback(messageType, new HelixIPCCallback() {
+ final Random random = new Random();
+
+ @Override
+ public void onMessage(HelixMessageScope scope, UUID messageId, ByteBuf message) {
+ if (random.nextInt() % 2 == 0) {
+ HelixAddress sender = firstResolver.getSource(scope);
+ firstIPC.send(sender, ackMessageType, messageId, null);
+ }
+ }
+ });
+ firstIPC.start();
+
+ // Second IPC service
+ final HelixIPCService secondIPC =
+ new NettyHelixIPCService(new NettyHelixIPCService.Config().setInstanceName(
+ secondNode.getInstanceName()).setPort(secondPort));
+ secondIPC.registerCallback(messageType, new HelixIPCCallback() {
+ final Random random = new Random();
+
+ @Override
+ public void onMessage(HelixMessageScope scope, UUID messageId, ByteBuf message) {
+ if (random.nextInt() % 2 == 0) {
+ HelixAddress sender = secondResolver.getSource(scope);
+ secondIPC.send(sender, ackMessageType, messageId, null);
+ }
+ }
+ });
+ secondIPC.start();
+
+ // Allow resolver callbacks to fire
+ Thread.sleep(500);
+
+ // Start state machine (uses first, sends to second)
+ final AtomicInteger numAcks = new AtomicInteger();
+ final AtomicInteger numErrors = new AtomicInteger();
+ HelixIPCService messageManager =
+ new HelixIPCMessageManager(Executors.newSingleThreadScheduledExecutor(), firstIPC, 300, -1);
+ messageManager.registerCallback(ackMessageType, new HelixIPCCallback() {
+ @Override
+ public void onMessage(HelixMessageScope scope, UUID messageId, ByteBuf message) {
+ numAcks.incrementAndGet();
+ }
+ });
+ messageManager.start();
+
+ // Find all partitions on second node...
+ String secondName = "localhost_" + secondPort;
+ Set<String> secondPartitions = new HashSet<String>();
+ IdealState idealState =
+ controller.getClusterManagmentTool().getResourceIdealState(CLUSTER_NAME, RESOURCE_NAME);
+ for (String partitionName : idealState.getPartitionSet()) {
+ for (Map.Entry<String, String> stateEntry : idealState.getInstanceStateMap(partitionName)
+ .entrySet()) {
+ if (stateEntry.getKey().equals(secondName)) {
+ secondPartitions.add(partitionName);
+ }
+ }
+ }
+
+ // And use first node to send messages to them
+ for (String partitionName : secondPartitions) {
+ for (int i = 0; i < numMessages; i++) {
+ HelixMessageScope scope =
+ new HelixMessageScope.Builder().cluster(firstNode.getClusterName())
+ .resource(RESOURCE_NAME).partition(partitionName).state("ONLINE").build();
+ Set<HelixAddress> destinations = firstResolver.getDestinations(scope);
+ for (HelixAddress destination : destinations) {
+ ByteBuf message = Unpooled.wrappedBuffer(("Hello" + i).getBytes());
+ messageManager.send(destination, messageType, UUID.randomUUID(), message);
+ }
+ }
+ }
+
+ // Ensure they're all ack'ed (tests retry logic because only every other one is acked)
+ Thread.sleep(5000);
+ Assert.assertEquals(numAcks.get() + numErrors.get(), numMessages * secondPartitions.size());
+
+ // Shutdown
+ messageManager.shutdown();
+ firstIPC.shutdown();
+ secondIPC.shutdown();
+ }
+
+ public static class DummyStateModelFactory extends
+ StateTransitionHandlerFactory<TransitionHandler> {
+ @Override
+ public TransitionHandler createStateTransitionHandler(PartitionId partitionId) {
+ return new DummyStateModel();
+ }
+
+ @StateModelInfo(states = "{'OFFLINE', 'ONLINE'}", initialState = "OFFLINE")
+ public static class DummyStateModel extends TransitionHandler {
+ @Transition(from = "OFFLINE", to = "ONLINE")
+ public void fromOfflineToOnline(Message message, NotificationContext context) {
+ LOG.info(message);
+ }
+
+ @Transition(from = "ONLINE", to = "OFFLINE")
+ public void fromOnlineToOffline(Message message, NotificationContext context) {
+ LOG.info(message);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/f2475fa9/helix-ipc/src/test/java/org/apache/helix/ipc/benchmark/BenchmarkDriver.java
----------------------------------------------------------------------
diff --git a/helix-ipc/src/test/java/org/apache/helix/ipc/benchmark/BenchmarkDriver.java b/helix-ipc/src/test/java/org/apache/helix/ipc/benchmark/BenchmarkDriver.java
new file mode 100644
index 0000000..2d682f7
--- /dev/null
+++ b/helix-ipc/src/test/java/org/apache/helix/ipc/benchmark/BenchmarkDriver.java
@@ -0,0 +1,221 @@
+package org.apache.helix.ipc.benchmark;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+
+import java.lang.management.ManagementFactory;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.management.MXBean;
+import javax.management.ObjectName;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.helix.ipc.HelixIPCCallback;
+import org.apache.helix.ipc.HelixIPCService;
+import org.apache.helix.ipc.netty.NettyHelixIPCService;
+import org.apache.helix.resolver.HelixAddress;
+import org.apache.helix.resolver.HelixMessageScope;
+import org.apache.log4j.BasicConfigurator;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+
+import com.google.common.collect.ImmutableSet;
+
+/**
+ * Run with following to enable JMX:
+ * -Dcom.sun.management.jmxremote <br/>
+ * -Dcom.sun.management.jmxremote.port=10000 <br/>
+ * -Dcom.sun.management.jmxremote.authenticate=false <br/>
+ * -Dcom.sun.management.jmxremote.ssl=false <br/>
+ */
+public class BenchmarkDriver implements Runnable {
+
+ private static final int MESSAGE_TYPE = 1025;
+
+ private final int port;
+ private final int numPartitions;
+ private final AtomicBoolean isShutdown;
+ private final byte[] messageBytes;
+ private final int numConnections;
+
+ private HelixIPCService ipcService;
+ private String localhost;
+ private Thread[] trafficThreads;
+
+ public BenchmarkDriver(int port, int numPartitions, int numThreads, int messageSize,
+ int numConnections) {
+ this.port = port;
+ this.numPartitions = numPartitions;
+ this.isShutdown = new AtomicBoolean(true);
+ this.trafficThreads = new Thread[numThreads];
+ this.numConnections = numConnections;
+
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < messageSize; i++) {
+ sb.append("A");
+ }
+ this.messageBytes = sb.toString().getBytes();
+ }
+
+ @Override
+ public void run() {
+ try {
+ // Register controller MBean
+ final BenchmarkDriver driver = this;
+ ManagementFactory.getPlatformMBeanServer().registerMBean(new Controller() {
+ @Override
+ public void startTraffic(String remoteHost, int remotePort) {
+ driver.startTraffic(remoteHost, remotePort);
+ }
+
+ @Override
+ public void stopTraffic() {
+ driver.stopTraffic();
+ }
+ }, new ObjectName("org.apache.helix:type=BenchmarkDriver"));
+
+ // The local server
+ localhost = InetAddress.getLocalHost().getCanonicalHostName();
+ ipcService =
+ new NettyHelixIPCService(new NettyHelixIPCService.Config()
+ .setInstanceName(localhost + "_" + port).setPort(port)
+ .setNumConnections(numConnections));
+
+ // Counts number of messages received, and ack them
+ ipcService.registerCallback(MESSAGE_TYPE, new HelixIPCCallback() {
+ @Override
+ public void onMessage(HelixMessageScope scope, UUID messageId, ByteBuf message) {
+ // Do nothing
+ }
+ });
+
+ ipcService.start();
+ System.out.println("Started IPC service on "
+ + InetAddress.getLocalHost().getCanonicalHostName() + ":" + port);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void startTraffic(final String remoteHost, final int remotePort) {
+ if (isShutdown.getAndSet(false)) {
+ System.out.println("Starting " + trafficThreads.length + " threads to generate traffic");
+ for (int i = 0; i < trafficThreads.length; i++) {
+ Thread t = new Thread() {
+ @Override
+ public void run() {
+ ByteBuf m = ByteBufAllocator.DEFAULT.buffer(messageBytes.length);
+ m.writeBytes(messageBytes);
+ while (!isShutdown.get()) {
+ for (int i = 0; i < numPartitions; i++) {
+ HelixMessageScope scope =
+ new HelixMessageScope.Builder().cluster("CLUSTER").resource("RESOURCE")
+ .partition("PARTITION_" + i).sourceInstance(localhost + "_" + port).build();
+
+ Set<HelixAddress> destinations =
+ ImmutableSet.of(new HelixAddress(scope, remoteHost + "_" + remotePort,
+ new InetSocketAddress(remoteHost, remotePort)));
+
+ UUID uuid = UUID.randomUUID();
+
+ try {
+ for (HelixAddress destination : destinations) {
+ m.retain();
+ ipcService.send(destination, MESSAGE_TYPE, uuid, m);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+ };
+ t.start();
+ trafficThreads[i] = t;
+ }
+ System.out.println("Started traffic to " + remoteHost + ":" + remotePort);
+ }
+ }
+
+ private void stopTraffic() {
+ if (!isShutdown.getAndSet(true)) {
+ try {
+ for (Thread t : trafficThreads) {
+ t.join();
+ }
+ System.out.println("Stopped traffic");
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ @MXBean
+ public interface Controller {
+ void startTraffic(String remoteHost, int remotePort);
+
+ void stopTraffic();
+ }
+
+ public static void main(String[] args) throws Exception {
+ BasicConfigurator.configure();
+ Logger.getRootLogger().setLevel(Level.DEBUG);
+
+ Options options = new Options();
+ options.addOption("partitions", true, "Number of partitions");
+ options.addOption("threads", true, "Number of threads");
+ options.addOption("messageSize", true, "Message size in bytes");
+ options.addOption("numConnections", true, "Number of connections between nodes");
+
+ CommandLine commandLine = new GnuParser().parse(options, args);
+
+ if (commandLine.getArgs().length != 1) {
+ HelpFormatter helpFormatter = new HelpFormatter();
+ helpFormatter.printHelp("usage: [options] port", options);
+ System.exit(1);
+ }
+
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ @Override
+ public void run() {
+ latch.countDown();
+ }
+ });
+
+ new BenchmarkDriver(Integer.parseInt(commandLine.getArgs()[0]), Integer.parseInt(commandLine
+ .getOptionValue("partitions", "1")), Integer.parseInt(commandLine.getOptionValue("threads",
+ "1")), Integer.parseInt(commandLine.getOptionValue("messageSize", "1024")),
+ Integer.parseInt(commandLine.getOptionValue("numConnections", "1"))).run();
+
+ latch.await();
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/f2475fa9/helix-ipc/src/test/java/org/apache/helix/resolver/TestZKHelixResolver.java
----------------------------------------------------------------------
diff --git a/helix-ipc/src/test/java/org/apache/helix/resolver/TestZKHelixResolver.java b/helix-ipc/src/test/java/org/apache/helix/resolver/TestZKHelixResolver.java
new file mode 100644
index 0000000..4e802d7
--- /dev/null
+++ b/helix-ipc/src/test/java/org/apache/helix/resolver/TestZKHelixResolver.java
@@ -0,0 +1,161 @@
+package org.apache.helix.resolver;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.net.InetSocketAddress;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.manager.zk.MockController;
+import org.apache.helix.manager.zk.MockParticipant;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.resolver.zk.ZKHelixResolver;
+import org.apache.helix.testutil.ZkTestBase;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+/**
+ * Test basic routing table lookups for a ZK-based Helix resolver.
+ */
+public class TestZKHelixResolver extends ZkTestBase {
+ private static final int NUM_PARTICIPANTS = 2;
+ private static final int NUM_PARTITIONS = 2;
+ private static final String CLUSTER_NAME = TestZKHelixResolver.class.getSimpleName();
+ private static final String RESOURCE_NAME = "MyResource";
+ private MockParticipant[] _participants;
+ private MockController _controller;
+ private ClusterSetup _setupTool;
+ private HelixResolver _resolver;
+ private Map<String, InetSocketAddress> _socketMap;
+
+ @BeforeClass
+ public void beforeClass() {
+ // Set up cluster
+ _setupTool = new ClusterSetup(_zkclient);
+ _setupTool.addCluster(CLUSTER_NAME, true);
+ _setupTool.addResourceToCluster(CLUSTER_NAME, RESOURCE_NAME, NUM_PARTITIONS, "OnlineOffline",
+ IdealState.RebalanceMode.FULL_AUTO.toString());
+ _setupTool.rebalanceCluster(CLUSTER_NAME, RESOURCE_NAME, 1, RESOURCE_NAME, null);
+
+ // Set up and start instances
+ _socketMap = Maps.newHashMap();
+ HelixAdmin admin = _setupTool.getClusterManagementTool();
+ _participants = new MockParticipant[NUM_PARTICIPANTS];
+ for (int i = 0; i < NUM_PARTICIPANTS; i++) {
+ String host = "localhost";
+ int port = i;
+ int ipcPort = i + 100;
+ String instanceName = host + "_" + port;
+ InstanceConfig config = new InstanceConfig(instanceName);
+ config.setHostName(host);
+ config.setPort(Integer.toString(port));
+ config.getRecord().setSimpleField("IPC_PORT", Integer.toString(ipcPort));
+ admin.addInstance(CLUSTER_NAME, config);
+ _socketMap.put(instanceName, new InetSocketAddress(host, ipcPort));
+ _participants[i] = new MockParticipant(_zkaddr, CLUSTER_NAME, instanceName);
+ _participants[i].syncStart();
+ }
+
+ // Start controller
+ _controller = new MockController(_zkaddr, CLUSTER_NAME, "controller_0");
+ _controller.syncStart();
+
+ // Connect a resolver
+ _resolver = new ZKHelixResolver(_zkaddr);
+ _resolver.connect();
+
+ // Wait for External view convergence
+ ClusterStateVerifier.verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(
+ _zkaddr, CLUSTER_NAME), 10000);
+ }
+
+ @Test
+ public void testResolution() {
+ HelixMessageScope clusterScope = new HelixMessageScope.Builder().cluster(CLUSTER_NAME).build();
+ Set<HelixAddress> destinations = _resolver.getDestinations(clusterScope);
+ Assert.assertNotNull(destinations);
+ Set<InetSocketAddress> addresses = new HashSet<InetSocketAddress>();
+ for (HelixAddress destination : destinations) {
+ addresses.add(destination.getSocketAddress());
+ }
+ Assert.assertTrue(addresses.containsAll(_socketMap.values()), "Expected " + _socketMap.values()
+ + ", found " + addresses);
+
+ HelixMessageScope resourceScope =
+ new HelixMessageScope.Builder().cluster(CLUSTER_NAME).resource(RESOURCE_NAME).build();
+ destinations = _resolver.getDestinations(resourceScope);
+ Assert.assertNotNull(destinations);
+ addresses.clear();
+ for (HelixAddress destination : destinations) {
+ addresses.add(destination.getSocketAddress());
+ }
+ Assert.assertTrue(addresses.containsAll(_socketMap.values()), "Expected " + _socketMap.values()
+ + ", found " + addresses);
+
+ HelixMessageScope partition0Scope =
+ new HelixMessageScope.Builder().cluster(CLUSTER_NAME).resource(RESOURCE_NAME)
+ .partition(RESOURCE_NAME + "_0").build();
+ destinations = _resolver.getDestinations(partition0Scope);
+ Assert.assertNotNull(destinations);
+ ExternalView externalView =
+ _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, RESOURCE_NAME);
+ Set<String> instanceSet = externalView.getStateMap(RESOURCE_NAME + "_0").keySet();
+ Set<InetSocketAddress> expectedSocketAddrs = Sets.newHashSet();
+ for (String instanceName : instanceSet) {
+ expectedSocketAddrs.add(_socketMap.get(instanceName));
+ }
+ addresses.clear();
+ for (HelixAddress destination : destinations) {
+ addresses.add(destination.getSocketAddress());
+ }
+ Assert.assertEquals(addresses, expectedSocketAddrs, "Expected " + expectedSocketAddrs
+ + ", found " + addresses);
+
+ HelixMessageScope sourceInstanceScope =
+ new HelixMessageScope.Builder().cluster(CLUSTER_NAME).resource(RESOURCE_NAME)
+ .partition(RESOURCE_NAME + "_0").sourceInstance(_participants[0].getInstanceName())
+ .build();
+ HelixAddress sourceAddress = _resolver.getSource(sourceInstanceScope);
+ Assert.assertNotNull(sourceAddress);
+ Assert.assertEquals(sourceAddress.getSocketAddress(),
+ _socketMap.get(_participants[0].getInstanceName()));
+ }
+
+ @AfterClass
+ public void afterClass() {
+ _resolver.disconnect();
+ _controller.syncStop();
+ for (MockParticipant participant : _participants) {
+ participant.syncStop();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/f2475fa9/helix-ipc/src/test/resources/build_benchmark.sh
----------------------------------------------------------------------
diff --git a/helix-ipc/src/test/resources/build_benchmark.sh b/helix-ipc/src/test/resources/build_benchmark.sh
new file mode 100644
index 0000000..8b94e38
--- /dev/null
+++ b/helix-ipc/src/test/resources/build_benchmark.sh
@@ -0,0 +1,29 @@
+#!/bin/bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+cd helix-ipc
+mvn package -DskipTests
+mkdir -p /tmp/ipc-benchmark
+cp target/helix-ipc-0.7-1-jar-with-dependencies.jar /tmp/ipc-benchmark
+cp target/helix-ipc-0.7.1-tests.jar /tmp/ipc-benchmark
+cp src/test/resources/run_benchmark.sh /tmp/ipc-benchmark
+cd /tmp
+tar cvzf ipc-benchmark.tar.gz ipc-benchmark
http://git-wip-us.apache.org/repos/asf/helix/blob/f2475fa9/helix-ipc/src/test/resources/run_benchmark.sh
----------------------------------------------------------------------
diff --git a/helix-ipc/src/test/resources/run_benchmark.sh b/helix-ipc/src/test/resources/run_benchmark.sh
new file mode 100644
index 0000000..0b9373d
--- /dev/null
+++ b/helix-ipc/src/test/resources/run_benchmark.sh
@@ -0,0 +1,37 @@
+#!/bin/bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+$JAVA_HOME/bin/java -cp helix-ipc-0.7.1-jar-with-dependencies.jar:helix-ipc-0.7.1-tests.jar \
+ -Xloggc:gc_$JMX_PORT.log \
+ -XX:+UseConcMarkSweepGC \
+ -XX:+UseCMSInitiatingOccupancyOnly \
+ -XX:CMSInitiatingOccupancyFraction=65 \
+ -XX:NewRatio=2 \
+ -Xmx4g \
+ -Xms4g \
+ -Dcom.sun.management.jmxremote \
+ -Dcom.sun.management.jmxremote.port=$JMX_PORT \
+ -Dcom.sun.management.jmxremote.authenticate=false \
+ -Dcom.sun.management.jmxremote.ssl=false \
+ -Dio.netty.resourceLeakDetection \
+ -Dio.netty.allocator.type=pooled \
+ -Dio.netty.noPreferDirect=false \
+ org.apache.helix.ipc.benchmark.BenchmarkDriver $@
http://git-wip-us.apache.org/repos/asf/helix/blob/f2475fa9/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 4bd6f2e..ae49319 100644
--- a/pom.xml
+++ b/pom.xml
@@ -199,6 +199,7 @@ under the License.
<module>helix-agent</module>
<module>helix-provisioning</module>
<module>helix-examples</module>
+ <module>helix-ipc</module>
<module>recipes</module>
</modules>
[4/4] git commit: Merge branch 'master' of
https://github.com/brandtg/helix (closes #2)
Posted by ka...@apache.org.
Merge branch 'master' of https://github.com/brandtg/helix (closes #2)
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/d8ec1ae7
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/d8ec1ae7
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/d8ec1ae7
Branch: refs/heads/master
Commit: d8ec1ae7560118363e843283ce2f15f611aea099
Parents: 59b4bbb 3bd83a3
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Thu Aug 28 09:49:03 2014 -0700
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Thu Aug 28 10:00:43 2014 -0700
----------------------------------------------------------------------
NOTICE | 8 +
.../helix/spectator/RoutingTableProvider.java | 24 +-
.../test/java/org/apache/helix/TestHelper.java | 10 +
helix-ipc/LICENSE | 202 ++++++++
helix-ipc/NOTICE | 41 ++
helix-ipc/pom.xml | 141 ++++++
helix-ipc/src/assemble/assembly.xml | 60 +++
helix-ipc/src/main/config/log4j.properties | 31 ++
.../org/apache/helix/ipc/HelixIPCCallback.java | 32 ++
.../helix/ipc/HelixIPCMessageManager.java | 163 ++++++
.../org/apache/helix/ipc/HelixIPCService.java | 49 ++
.../helix/ipc/netty/NettyHelixIPCService.java | 490 +++++++++++++++++++
.../helix/resolver/AbstractHelixResolver.java | 295 +++++++++++
.../org/apache/helix/resolver/HelixAddress.java | 70 +++
.../helix/resolver/HelixMessageScope.java | 151 ++++++
.../apache/helix/resolver/HelixResolver.java | 47 ++
.../helix/resolver/ResolverRoutingTable.java | 92 ++++
.../helix/resolver/zk/ZKHelixResolver.java | 45 ++
.../helix/ipc/TestNettyHelixIPCService.java | 353 +++++++++++++
.../helix/ipc/benchmark/BenchmarkDriver.java | 221 +++++++++
.../helix/resolver/TestZKHelixResolver.java | 161 ++++++
pom.xml | 1 +
22 files changed, 2685 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/d8ec1ae7/NOTICE
----------------------------------------------------------------------
diff --cc NOTICE
index ca4a5e0,ca4a5e0..2d70d2e
--- a/NOTICE
+++ b/NOTICE
@@@ -32,6 -32,6 +32,14 @@@ This product includes software develope
zkclient (https://github.com/sgroschupf/zkclient).
Licensed under the Apache License 2.0.
++This product includes software developed at
++netty (http://netty.io).
++Licensed under the Apache License 2.0.
++
++This product includes software developed at
++Metrics (http://metrics.codahale.com).
++Licensed under the Apache License 2.0.
++
II. License Summary
- Apache License 2.0
- BSD License
http://git-wip-us.apache.org/repos/asf/helix/blob/d8ec1ae7/helix-ipc/LICENSE
----------------------------------------------------------------------
diff --cc helix-ipc/LICENSE
index 0000000,0000000..d645695
new file mode 100644
--- /dev/null
+++ b/helix-ipc/LICENSE
@@@ -1,0 -1,0 +1,202 @@@
++
++ Apache License
++ Version 2.0, January 2004
++ http://www.apache.org/licenses/
++
++ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
++
++ 1. Definitions.
++
++ "License" shall mean the terms and conditions for use, reproduction,
++ and distribution as defined by Sections 1 through 9 of this document.
++
++ "Licensor" shall mean the copyright owner or entity authorized by
++ the copyright owner that is granting the License.
++
++ "Legal Entity" shall mean the union of the acting entity and all
++ other entities that control, are controlled by, or are under common
++ control with that entity. For the purposes of this definition,
++ "control" means (i) the power, direct or indirect, to cause the
++ direction or management of such entity, whether by contract or
++ otherwise, or (ii) ownership of fifty percent (50%) or more of the
++ outstanding shares, or (iii) beneficial ownership of such entity.
++
++ "You" (or "Your") shall mean an individual or Legal Entity
++ exercising permissions granted by this License.
++
++ "Source" form shall mean the preferred form for making modifications,
++ including but not limited to software source code, documentation
++ source, and configuration files.
++
++ "Object" form shall mean any form resulting from mechanical
++ transformation or translation of a Source form, including but
++ not limited to compiled object code, generated documentation,
++ and conversions to other media types.
++
++ "Work" shall mean the work of authorship, whether in Source or
++ Object form, made available under the License, as indicated by a
++ copyright notice that is included in or attached to the work
++ (an example is provided in the Appendix below).
++
++ "Derivative Works" shall mean any work, whether in Source or Object
++ form, that is based on (or derived from) the Work and for which the
++ editorial revisions, annotations, elaborations, or other modifications
++ represent, as a whole, an original work of authorship. For the purposes
++ of this License, Derivative Works shall not include works that remain
++ separable from, or merely link (or bind by name) to the interfaces of,
++ the Work and Derivative Works thereof.
++
++ "Contribution" shall mean any work of authorship, including
++ the original version of the Work and any modifications or additions
++ to that Work or Derivative Works thereof, that is intentionally
++ submitted to Licensor for inclusion in the Work by the copyright owner
++ or by an individual or Legal Entity authorized to submit on behalf of
++ the copyright owner. For the purposes of this definition, "submitted"
++ means any form of electronic, verbal, or written communication sent
++ to the Licensor or its representatives, including but not limited to
++ communication on electronic mailing lists, source code control systems,
++ and issue tracking systems that are managed by, or on behalf of, the
++ Licensor for the purpose of discussing and improving the Work, but
++ excluding communication that is conspicuously marked or otherwise
++ designated in writing by the copyright owner as "Not a Contribution."
++
++ "Contributor" shall mean Licensor and any individual or Legal Entity
++ on behalf of whom a Contribution has been received by Licensor and
++ subsequently incorporated within the Work.
++
++ 2. Grant of Copyright License. Subject to the terms and conditions of
++ this License, each Contributor hereby grants to You a perpetual,
++ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
++ copyright license to reproduce, prepare Derivative Works of,
++ publicly display, publicly perform, sublicense, and distribute the
++ Work and such Derivative Works in Source or Object form.
++
++ 3. Grant of Patent License. Subject to the terms and conditions of
++ this License, each Contributor hereby grants to You a perpetual,
++ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
++ (except as stated in this section) patent license to make, have made,
++ use, offer to sell, sell, import, and otherwise transfer the Work,
++ where such license applies only to those patent claims licensable
++ by such Contributor that are necessarily infringed by their
++ Contribution(s) alone or by combination of their Contribution(s)
++ with the Work to which such Contribution(s) was submitted. If You
++ institute patent litigation against any entity (including a
++ cross-claim or counterclaim in a lawsuit) alleging that the Work
++ or a Contribution incorporated within the Work constitutes direct
++ or contributory patent infringement, then any patent licenses
++ granted to You under this License for that Work shall terminate
++ as of the date such litigation is filed.
++
++ 4. Redistribution. You may reproduce and distribute copies of the
++ Work or Derivative Works thereof in any medium, with or without
++ modifications, and in Source or Object form, provided that You
++ meet the following conditions:
++
++ (a) You must give any other recipients of the Work or
++ Derivative Works a copy of this License; and
++
++ (b) You must cause any modified files to carry prominent notices
++ stating that You changed the files; and
++
++ (c) You must retain, in the Source form of any Derivative Works
++ that You distribute, all copyright, patent, trademark, and
++ attribution notices from the Source form of the Work,
++ excluding those notices that do not pertain to any part of
++ the Derivative Works; and
++
++ (d) If the Work includes a "NOTICE" text file as part of its
++ distribution, then any Derivative Works that You distribute must
++ include a readable copy of the attribution notices contained
++ within such NOTICE file, excluding those notices that do not
++ pertain to any part of the Derivative Works, in at least one
++ of the following places: within a NOTICE text file distributed
++ as part of the Derivative Works; within the Source form or
++ documentation, if provided along with the Derivative Works; or,
++ within a display generated by the Derivative Works, if and
++ wherever such third-party notices normally appear. The contents
++ of the NOTICE file are for informational purposes only and
++ do not modify the License. You may add Your own attribution
++ notices within Derivative Works that You distribute, alongside
++ or as an addendum to the NOTICE text from the Work, provided
++ that such additional attribution notices cannot be construed
++ as modifying the License.
++
++ You may add Your own copyright statement to Your modifications and
++ may provide additional or different license terms and conditions
++ for use, reproduction, or distribution of Your modifications, or
++ for any such Derivative Works as a whole, provided Your use,
++ reproduction, and distribution of the Work otherwise complies with
++ the conditions stated in this License.
++
++ 5. Submission of Contributions. Unless You explicitly state otherwise,
++ any Contribution intentionally submitted for inclusion in the Work
++ by You to the Licensor shall be under the terms and conditions of
++ this License, without any additional terms or conditions.
++ Notwithstanding the above, nothing herein shall supersede or modify
++ the terms of any separate license agreement you may have executed
++ with Licensor regarding such Contributions.
++
++ 6. Trademarks. This License does not grant permission to use the trade
++ names, trademarks, service marks, or product names of the Licensor,
++ except as required for reasonable and customary use in describing the
++ origin of the Work and reproducing the content of the NOTICE file.
++
++ 7. Disclaimer of Warranty. Unless required by applicable law or
++ agreed to in writing, Licensor provides the Work (and each
++ Contributor provides its Contributions) on an "AS IS" BASIS,
++ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
++ implied, including, without limitation, any warranties or conditions
++ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
++ PARTICULAR PURPOSE. You are solely responsible for determining the
++ appropriateness of using or redistributing the Work and assume any
++ risks associated with Your exercise of permissions under this License.
++
++ 8. Limitation of Liability. In no event and under no legal theory,
++ whether in tort (including negligence), contract, or otherwise,
++ unless required by applicable law (such as deliberate and grossly
++ negligent acts) or agreed to in writing, shall any Contributor be
++ liable to You for damages, including any direct, indirect, special,
++ incidental, or consequential damages of any character arising as a
++ result of this License or out of the use or inability to use the
++ Work (including but not limited to damages for loss of goodwill,
++ work stoppage, computer failure or malfunction, or any and all
++ other commercial damages or losses), even if such Contributor
++ has been advised of the possibility of such damages.
++
++ 9. Accepting Warranty or Additional Liability. While redistributing
++ the Work or Derivative Works thereof, You may choose to offer,
++ and charge a fee for, acceptance of support, warranty, indemnity,
++ or other liability obligations and/or rights consistent with this
++ License. However, in accepting such obligations, You may act only
++ on Your own behalf and on Your sole responsibility, not on behalf
++ of any other Contributor, and only if You agree to indemnify,
++ defend, and hold each Contributor harmless for any liability
++ incurred by, or claims asserted against, such Contributor by reason
++ of your accepting any such warranty or additional liability.
++
++ END OF TERMS AND CONDITIONS
++
++ APPENDIX: How to apply the Apache License to your work.
++
++ To apply the Apache License to your work, attach the following
++ boilerplate notice, with the fields enclosed by brackets "[]"
++ replaced with your own identifying information. (Don't include
++ the brackets!) The text should be enclosed in the appropriate
++ comment syntax for the file format. We also recommend that a
++ file or class name and description of purpose be included on the
++ same "printed page" as the copyright notice for easier
++ identification within third-party archives.
++
++ Copyright [yyyy] [name of copyright owner]
++
++ Licensed under the Apache License, Version 2.0 (the "License");
++ you may not use this file except in compliance with the License.
++ You may obtain a copy of the License at
++
++ http://www.apache.org/licenses/LICENSE-2.0
++
++ Unless required by applicable law or agreed to in writing, software
++ distributed under the License is distributed on an "AS IS" BASIS,
++ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ See the License for the specific language governing permissions and
++ limitations under the License.
http://git-wip-us.apache.org/repos/asf/helix/blob/d8ec1ae7/helix-ipc/NOTICE
----------------------------------------------------------------------
diff --cc helix-ipc/NOTICE
index 0000000,1ee0d24..6f657e3
mode 000000,100644..100644
--- a/helix-ipc/NOTICE
+++ b/helix-ipc/NOTICE
@@@ -1,0 -1,33 +1,41 @@@
+ Apache Helix
+ Copyright 2014 The Apache Software Foundation
+
+
+ I. Included Software
+
+ This product includes software developed at
+ The Apache Software Foundation (http://www.apache.org/).
+ Licensed under the Apache License 2.0.
+
+ This product includes software developed at
+ Codehaus (http://www.codehaus.org/).
+ Licensed under the BSD License.
+
+ This product includes software developed at
+ jline (http://jline.sourceforge.net/).
+ Licensed under the BSD License.
+
+ This product includes software developed at
+ Google (http://www.google.com/).
+ Licensed under the Apache License 2.0.
+
+ This product includes software developed at
+ snakeyaml (http://www.snakeyaml.org/).
+ Licensed under the Apache License 2.0.
+
+ This product includes software developed at
+ zkclient (https://github.com/sgroschupf/zkclient).
+ Licensed under the Apache License 2.0.
+
++This product includes software developed at
++netty (http://netty.io).
++Licensed under the Apache License 2.0.
++
++This product includes software developed at
++Metrics (http://metrics.codahale.com).
++Licensed under the Apache License 2.0.
++
+ II. License Summary
+ - Apache License 2.0
+ - BSD License
[2/4] git commit: [HELIX-470] Netty-based IPC layer
Posted by ka...@apache.org.
[HELIX-470] Netty-based IPC layer
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/f2475fa9
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/f2475fa9
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/f2475fa9
Branch: refs/heads/master
Commit: f2475fa9a6123052fea2588cdd4e439ddc7af020
Parents: bfe5d2d
Author: Greg Brandt <br...@gmail.com>
Authored: Tue Aug 26 13:14:36 2014 -0700
Committer: Greg Brandt <br...@gmail.com>
Committed: Tue Aug 26 13:14:36 2014 -0700
----------------------------------------------------------------------
.../helix/spectator/RoutingTableProvider.java | 24 +-
.../test/java/org/apache/helix/TestHelper.java | 10 +
helix-ipc/LICENSE | 273 +++++++++++
helix-ipc/NOTICE | 33 ++
helix-ipc/pom.xml | 141 ++++++
helix-ipc/src/assemble/assembly.xml | 60 +++
helix-ipc/src/main/config/log4j.properties | 31 ++
.../org/apache/helix/ipc/HelixIPCCallback.java | 32 ++
.../helix/ipc/HelixIPCMessageManager.java | 163 ++++++
.../org/apache/helix/ipc/HelixIPCService.java | 49 ++
.../helix/ipc/netty/NettyHelixIPCService.java | 490 +++++++++++++++++++
.../helix/resolver/AbstractHelixResolver.java | 295 +++++++++++
.../org/apache/helix/resolver/HelixAddress.java | 70 +++
.../helix/resolver/HelixMessageScope.java | 151 ++++++
.../apache/helix/resolver/HelixResolver.java | 47 ++
.../helix/resolver/ResolverRoutingTable.java | 92 ++++
.../helix/resolver/zk/ZKHelixResolver.java | 45 ++
.../helix/ipc/TestNettyHelixIPCService.java | 353 +++++++++++++
.../helix/ipc/benchmark/BenchmarkDriver.java | 221 +++++++++
.../helix/resolver/TestZKHelixResolver.java | 161 ++++++
helix-ipc/src/test/resources/build_benchmark.sh | 29 ++
helix-ipc/src/test/resources/run_benchmark.sh | 37 ++
pom.xml | 1 +
23 files changed, 2806 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/f2475fa9/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
index 7799ca1..ccce64a 100644
--- a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
+++ b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
@@ -38,7 +38,8 @@ import org.apache.helix.model.ExternalView;
import org.apache.helix.model.InstanceConfig;
import org.apache.log4j.Logger;
-public class RoutingTableProvider implements ExternalViewChangeListener, InstanceConfigChangeListener {
+public class RoutingTableProvider implements ExternalViewChangeListener,
+ InstanceConfigChangeListener {
private static final Logger logger = Logger.getLogger(RoutingTableProvider.class);
private final AtomicReference<RoutingTable> _routingTableRef;
@@ -91,6 +92,15 @@ public class RoutingTableProvider implements ExternalViewChangeListener, Instanc
return instanceSet;
}
+ /**
+ * Get the configuration of an instance from its name
+ * @param instanceName the instance ID
+ * @return InstanceConfig if present, null otherwise
+ */
+ public InstanceConfig getInstanceConfig(String instanceName) {
+ return _routingTableRef.get().getConfig(instanceName);
+ }
+
@Override
public void onExternalViewChange(List<ExternalView> externalViewList,
NotificationContext changeContext) {
@@ -124,12 +134,13 @@ public class RoutingTableProvider implements ExternalViewChangeListener, Instanc
HelixDataAccessor accessor = changeContext.getManager().getHelixDataAccessor();
Builder keyBuilder = accessor.keyBuilder();
+ RoutingTable newRoutingTable = new RoutingTable();
List<InstanceConfig> configList = accessor.getChildValues(keyBuilder.instanceConfigs());
Map<String, InstanceConfig> instanceConfigMap = new HashMap<String, InstanceConfig>();
for (InstanceConfig config : configList) {
instanceConfigMap.put(config.getId(), config);
+ newRoutingTable.addConfig(config);
}
- RoutingTable newRoutingTable = new RoutingTable();
if (externalViewList != null) {
for (ExternalView extView : externalViewList) {
String resourceName = extView.getId();
@@ -154,9 +165,11 @@ public class RoutingTableProvider implements ExternalViewChangeListener, Instanc
class RoutingTable {
private final HashMap<String, ResourceInfo> resourceInfoMap;
+ private final Map<String, InstanceConfig> instanceConfigMap;
public RoutingTable() {
resourceInfoMap = new HashMap<String, RoutingTableProvider.ResourceInfo>();
+ instanceConfigMap = new HashMap<String, InstanceConfig>();
}
public void addEntry(String resourceName, String partitionName, String state,
@@ -169,10 +182,17 @@ public class RoutingTableProvider implements ExternalViewChangeListener, Instanc
}
+ public void addConfig(InstanceConfig config) {
+ instanceConfigMap.put(config.getInstanceName(), config);
+ }
+
ResourceInfo get(String resourceName) {
return resourceInfoMap.get(resourceName);
}
+ InstanceConfig getConfig(String instanceName) {
+ return instanceConfigMap.get(instanceName);
+ }
}
class ResourceInfo {
http://git-wip-us.apache.org/repos/asf/helix/blob/f2475fa9/helix-core/src/test/java/org/apache/helix/TestHelper.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/TestHelper.java b/helix-core/src/test/java/org/apache/helix/TestHelper.java
index 4a9139f..64796ba 100644
--- a/helix-core/src/test/java/org/apache/helix/TestHelper.java
+++ b/helix-core/src/test/java/org/apache/helix/TestHelper.java
@@ -20,7 +20,9 @@ package org.apache.helix;
*/
import java.io.File;
+import java.io.IOException;
import java.lang.reflect.Method;
+import java.net.ServerSocket;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -769,4 +771,12 @@ public class TestHelper {
System.out.println(sb.toString());
}
+ public static int getRandomPort() throws IOException {
+ ServerSocket sock = new ServerSocket();
+ sock.bind(null);
+ int port = sock.getLocalPort();
+ sock.close();
+ return port;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/helix/blob/f2475fa9/helix-ipc/LICENSE
----------------------------------------------------------------------
diff --git a/helix-ipc/LICENSE b/helix-ipc/LICENSE
new file mode 100644
index 0000000..413913f
--- /dev/null
+++ b/helix-ipc/LICENSE
@@ -0,0 +1,273 @@
+
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+ 1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+ 2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+ 3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+ 4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+ 5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+ 6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+ 7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+ 8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+ 9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+ END OF TERMS AND CONDITIONS
+
+ APPENDIX: How to apply the Apache License to your work.
+
+ To apply the Apache License to your work, attach the following
+ boilerplate notice, with the fields enclosed by brackets "[]"
+ replaced with your own identifying information. (Don't include
+ the brackets!) The text should be enclosed in the appropriate
+ comment syntax for the file format. We also recommend that a
+ file or class name and description of purpose be included on the
+ same "printed page" as the copyright notice for easier
+ identification within third-party archives.
+
+ Copyright [yyyy] [name of copyright owner]
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+
+
+For xstream:
+
+Copyright (c) 2003-2006, Joe Walnes
+Copyright (c) 2006-2009, 2011 XStream Committers
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are met:
+
+1. Redistributions of source code must retain the above copyright notice, this list of
+conditions and the following disclaimer.
+
+2. Redistributions in binary form must reproduce the above copyright notice, this list of
+conditions and the following disclaimer in the documentation and/or other materials provided
+with the distribution.
+
+3. Neither the name of XStream nor the names of its contributors may be used to endorse
+or promote products derived from this software without specific prior written
+permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
+EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
+OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT
+SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
+INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
+TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY
+WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH
+DAMAGE.
+
+for jline:
+
+Copyright (c) 2002-2006, Marc Prud'hommeaux <mw...@cornell.edu>
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or
+without modification, are permitted provided that the following
+conditions are met:
+
+Redistributions of source code must retain the above copyright
+notice, this list of conditions and the following disclaimer.
+
+Redistributions in binary form must reproduce the above copyright
+notice, this list of conditions and the following disclaimer
+in the documentation and/or other materials provided with
+the distribution.
+
+Neither the name of JLine nor the names of its contributors
+may be used to endorse or promote products derived from this
+software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING,
+BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
+AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
+EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE
+FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY,
+OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING
+IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
+OF THE POSSIBILITY OF SUCH DAMAGE.
+
+
+
http://git-wip-us.apache.org/repos/asf/helix/blob/f2475fa9/helix-ipc/NOTICE
----------------------------------------------------------------------
diff --git a/helix-ipc/NOTICE b/helix-ipc/NOTICE
new file mode 100644
index 0000000..1ee0d24
--- /dev/null
+++ b/helix-ipc/NOTICE
@@ -0,0 +1,33 @@
+Apache Helix
+Copyright 2014 The Apache Software Foundation
+
+
+I. Included Software
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+Licensed under the Apache License 2.0.
+
+This product includes software developed at
+Codehaus (http://www.codehaus.org/).
+Licensed under the BSD License.
+
+This product includes software developed at
+jline (http://jline.sourceforge.net/).
+Licensed under the BSD License.
+
+This product includes software developed at
+Google (http://www.google.com/).
+Licensed under the Apache License 2.0.
+
+This product includes software developed at
+snakeyaml (http://www.snakeyaml.org/).
+Licensed under the Apache License 2.0.
+
+This product includes software developed at
+zkclient (https://github.com/sgroschupf/zkclient).
+Licensed under the Apache License 2.0.
+
+II. License Summary
+- Apache License 2.0
+- BSD License
http://git-wip-us.apache.org/repos/asf/helix/blob/f2475fa9/helix-ipc/pom.xml
----------------------------------------------------------------------
diff --git a/helix-ipc/pom.xml b/helix-ipc/pom.xml
new file mode 100644
index 0000000..90ab11d
--- /dev/null
+++ b/helix-ipc/pom.xml
@@ -0,0 +1,141 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <groupId>org.apache.helix</groupId>
+ <artifactId>helix</artifactId>
+ <version>0.7.1-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>helix-ipc</artifactId>
+ <packaging>bundle</packaging>
+
+ <name>Apache Helix :: IPC</name>
+
+ <properties>
+ <osgi.import>
+ javax.management*,
+ org.apache.commons.math*;version="[2.1,3)",
+ org.apache.log4j*;version="[1.2,2)",
+ org.restlet;version="[2.1.4,3]",
+ *
+ </osgi.import>
+ <osgi.ignore>
+ org.apache.helix.tools*
+ </osgi.ignore>
+ <osgi.export>org.apache.helix*;version="${project.version};-noimport:=true</osgi.export>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.helix</groupId>
+ <artifactId>helix-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-all</artifactId>
+ <version>4.0.21.Final</version>
+ </dependency>
+ <dependency>
+ <groupId>com.codahale.metrics</groupId>
+ <artifactId>metrics-core</artifactId>
+ <version>3.0.1</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.helix</groupId>
+ <artifactId>helix-core</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.testng</groupId>
+ <artifactId>testng</artifactId>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ </dependencies>
+ <build>
+ <resources>
+ <resource>
+ <directory>${basedir}/src/main/resources</directory>
+ <filtering>true</filtering>
+ </resource>
+ <resource>
+ <directory>${basedir}</directory>
+ <includes>
+ <include>DISCLAIMER</include>
+ </includes>
+ </resource>
+ </resources>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>appassembler-maven-plugin</artifactId>
+ <configuration>
+ <programs>
+ <program>
+ <mainClass>org.apache.helix.ipc.netty.NettyHelixIPCService</mainClass>
+ <name>netty-helix-ipc-service</name>
+ </program>
+ </programs>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <configuration>
+ <!--
+ <descriptors>
+ <descriptor>src/assemble/assembly.xml</descriptor>
+ </descriptors>
+ -->
+ <descriptorRefs>
+ <descriptorRef>jar-with-dependencies</descriptorRef>
+ </descriptorRefs>
+ </configuration>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
http://git-wip-us.apache.org/repos/asf/helix/blob/f2475fa9/helix-ipc/src/assemble/assembly.xml
----------------------------------------------------------------------
diff --git a/helix-ipc/src/assemble/assembly.xml b/helix-ipc/src/assemble/assembly.xml
new file mode 100644
index 0000000..c2d08a1
--- /dev/null
+++ b/helix-ipc/src/assemble/assembly.xml
@@ -0,0 +1,60 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<assembly>
+ <id>pkg</id>
+ <formats>
+ <format>tar</format>
+ </formats>
+ <fileSets>
+ <fileSet>
+ <directory>${project.build.directory}/${project.artifactId}-pkg/bin</directory>
+ <outputDirectory>bin</outputDirectory>
+ <lineEnding>unix</lineEnding>
+ <fileMode>0755</fileMode>
+ <directoryMode>0755</directoryMode>
+ </fileSet>
+ <fileSet>
+ <directory>${project.build.directory}/${project.artifactId}-pkg/repo/</directory>
+ <outputDirectory>repo</outputDirectory>
+ <fileMode>0755</fileMode>
+ <directoryMode>0755</directoryMode>
+ <excludes>
+ <exclude>**/*.xml</exclude>
+ </excludes>
+ </fileSet>
+ <fileSet>
+ <directory>${project.build.directory}/${project.artifactId}-pkg/conf</directory>
+ <outputDirectory>conf</outputDirectory>
+ <lineEnding>unix</lineEnding>
+ <fileMode>0755</fileMode>
+ <directoryMode>0755</directoryMode>
+ </fileSet>
+ <fileSet>
+ <directory>${project.basedir}</directory>
+ <outputDirectory>/</outputDirectory>
+ <includes>
+ <include>LICENSE</include>
+ <include>NOTICE</include>
+ <include>DISCLAIMER</include>
+ </includes>
+ <fileMode>0755</fileMode>
+ </fileSet>
+ </fileSets>
+</assembly>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/helix/blob/f2475fa9/helix-ipc/src/main/config/log4j.properties
----------------------------------------------------------------------
diff --git a/helix-ipc/src/main/config/log4j.properties b/helix-ipc/src/main/config/log4j.properties
new file mode 100644
index 0000000..4b3dc31
--- /dev/null
+++ b/helix-ipc/src/main/config/log4j.properties
@@ -0,0 +1,31 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Set root logger level to DEBUG and its only appender to A1.
+log4j.rootLogger=ERROR,A1
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.A1=org.apache.log4j.ConsoleAppender
+
+# A1 uses PatternLayout.
+log4j.appender.A1.layout=org.apache.log4j.PatternLayout
+log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+
+log4j.logger.org.I0Itec=ERROR
+log4j.logger.org.apache=ERROR
http://git-wip-us.apache.org/repos/asf/helix/blob/f2475fa9/helix-ipc/src/main/java/org/apache/helix/ipc/HelixIPCCallback.java
----------------------------------------------------------------------
diff --git a/helix-ipc/src/main/java/org/apache/helix/ipc/HelixIPCCallback.java b/helix-ipc/src/main/java/org/apache/helix/ipc/HelixIPCCallback.java
new file mode 100644
index 0000000..9ea23ca
--- /dev/null
+++ b/helix-ipc/src/main/java/org/apache/helix/ipc/HelixIPCCallback.java
@@ -0,0 +1,32 @@
+package org.apache.helix.ipc;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import io.netty.buffer.ByteBuf;
+import org.apache.helix.resolver.HelixMessageScope;
+
+import java.util.UUID;
+
+/**
+ * Callback registered per message type to handle messages.
+ */
+public interface HelixIPCCallback {
+ void onMessage(HelixMessageScope scope, UUID messageId, ByteBuf message);
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/f2475fa9/helix-ipc/src/main/java/org/apache/helix/ipc/HelixIPCMessageManager.java
----------------------------------------------------------------------
diff --git a/helix-ipc/src/main/java/org/apache/helix/ipc/HelixIPCMessageManager.java b/helix-ipc/src/main/java/org/apache/helix/ipc/HelixIPCMessageManager.java
new file mode 100644
index 0000000..89b10eb
--- /dev/null
+++ b/helix-ipc/src/main/java/org/apache/helix/ipc/HelixIPCMessageManager.java
@@ -0,0 +1,163 @@
+package org.apache.helix.ipc;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import io.netty.buffer.ByteBuf;
+import org.apache.helix.resolver.HelixAddress;
+import org.apache.helix.resolver.HelixMessageScope;
+import org.apache.log4j.Logger;
+
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * A wrapper around a base IPC service that manages message retries / timeouts.
+ * <p>
+ * This class manages retries and timeouts, and can be used in the same way as a
+ * {@link org.apache.helix.ipc.HelixIPCService}.
+ * </p>
+ * <p>
+ * A message will be sent until the max number of retries has been reached (i.e. timed out), or it
+ * is acknowledged by the recipient. If the max number of retries is -1, it will be retried forever.
+ * </p>
+ * <p>
+ * A callback should be registered for every acknowledgement message type associated with any
+ * original message type sent by this class.
+ * </p>
+ * <p>
+ * For example, consider we have the two message types defined: DATA_REQ = 1, DATA_ACK = 2. One
+ * would do the following:
+ *
+ * <pre>
+ * messageManager.registerCallback(DATA_ACK, new HelixIPCCallback() {
+ * public void onMessage(HelixMessageScope scope, UUID messageId, ByteBuf message) {
+ * // Process ACK
+ * }
+ *
+ * public void onError(HelixMessageScope scope, UUID messageId, Throwable cause) {
+ * // Message error or timeout
+ * }
+ * });
+ *
+ * messageManager.send(destinations, DATA_REQ, messageId, data);
+ * </pre>
+ *
+ * </p>
+ * <p>
+ * In send, we note messageId, and retry until we get a DATA_ACK for the same messageId. The
+ * callback registered with the message manager will only be called once, even if the message is
+ * acknowledged several times.
+ * </p>
+ */
+public class HelixIPCMessageManager implements HelixIPCService {
+
+ private static final Logger LOG = Logger.getLogger(HelixIPCMessageManager.class);
+
+ private final ScheduledExecutorService scheduler;
+ private final HelixIPCService baseIpcService;
+ private final long messageTimeoutMillis;
+ private final int maxNumRetries;
+ private final ConcurrentMap<UUID, Boolean> pendingMessages;
+ private final ConcurrentMap<UUID, AtomicInteger> retriesLeft;
+ private final ConcurrentMap<UUID, ByteBuf> messageBuffers;
+ private final ConcurrentMap<Integer, HelixIPCCallback> callbacks;
+
+ public HelixIPCMessageManager(ScheduledExecutorService scheduler, HelixIPCService baseIpcService,
+ long messageTimeoutMillis, int maxNumRetries) {
+ this.scheduler = scheduler;
+ this.baseIpcService = baseIpcService;
+ this.maxNumRetries = maxNumRetries;
+ this.messageTimeoutMillis = messageTimeoutMillis;
+ this.pendingMessages = new ConcurrentHashMap<UUID, Boolean>();
+ this.retriesLeft = new ConcurrentHashMap<UUID, AtomicInteger>();
+ this.messageBuffers = new ConcurrentHashMap<UUID, ByteBuf>();
+ this.callbacks = new ConcurrentHashMap<Integer, HelixIPCCallback>();
+ }
+
+ @Override
+ public void start() throws Exception {
+ baseIpcService.start();
+ }
+
+ @Override
+ public void shutdown() throws Exception {
+ baseIpcService.shutdown();
+ }
+
+ @Override
+ public void send(final HelixAddress destination, final int messageType, final UUID messageId,
+ final ByteBuf message) {
+ // State
+ pendingMessages.put(messageId, true);
+ retriesLeft.putIfAbsent(messageId, new AtomicInteger(maxNumRetries));
+ messageBuffers.put(messageId, message);
+
+ // Will free it when we've finally received response
+ message.retain();
+
+ // Send initial message
+ baseIpcService.send(destination, messageType, messageId, message);
+
+ // Retries
+ scheduler.schedule(new Runnable() {
+ @Override
+ public void run() {
+ Boolean isPending = pendingMessages.get(messageId);
+ AtomicInteger numLeft = retriesLeft.get(messageId);
+ if (numLeft != null && isPending != null && isPending) {
+ if (numLeft.decrementAndGet() > 0 || maxNumRetries == -1) {
+ // n.b. will schedule another retry
+ send(destination, messageType, messageId, message);
+ } else {
+ LOG.warn("Message " + messageId + " timed out after " + maxNumRetries + " retries");
+ }
+ }
+ }
+ }, messageTimeoutMillis, TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public void registerCallback(final int messageType, final HelixIPCCallback callback) {
+
+ // This callback will first check if the message is pending, then delegate to the provided
+ // callback if it has not yet done so.
+ HelixIPCCallback wrappedCallback = new HelixIPCCallback() {
+ @Override
+ public void onMessage(HelixMessageScope scope, UUID messageId, ByteBuf message) {
+ if (pendingMessages.replace(messageId, true, false)) {
+ pendingMessages.remove(messageId);
+ ByteBuf originalMessage = messageBuffers.remove(messageId);
+ if (originalMessage != null) {
+ originalMessage.release();
+ }
+ retriesLeft.remove(messageId);
+ callback.onMessage(scope, messageId, message);
+ }
+ }
+ };
+
+ callbacks.put(messageType, wrappedCallback);
+ baseIpcService.registerCallback(messageType, wrappedCallback);
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/f2475fa9/helix-ipc/src/main/java/org/apache/helix/ipc/HelixIPCService.java
----------------------------------------------------------------------
diff --git a/helix-ipc/src/main/java/org/apache/helix/ipc/HelixIPCService.java b/helix-ipc/src/main/java/org/apache/helix/ipc/HelixIPCService.java
new file mode 100644
index 0000000..6158514
--- /dev/null
+++ b/helix-ipc/src/main/java/org/apache/helix/ipc/HelixIPCService.java
@@ -0,0 +1,49 @@
+package org.apache.helix.ipc;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import io.netty.buffer.ByteBuf;
+import org.apache.helix.resolver.HelixAddress;
+
+import java.util.UUID;
+
+/**
+ * Allows message passing among instances in Helix clusters.
+ * <p>
+ * Messages are sent asynchronously using {@link #send}, and handled by callbacks registered via
+ * {@link #registerCallback}
+ * </p>
+ */
+public interface HelixIPCService {
+
+ static final String IPC_PORT = "IPC_PORT";
+
+ /** Starts service (must call before {@link #send}) */
+ void start() throws Exception;
+
+ /** Shuts down service and releases any resources */
+ void shutdown() throws Exception;
+
+ /** Sends a message to one or more instances that map to a cluster scope. */
+ void send(HelixAddress destination, int messageType, UUID messageId, ByteBuf message);
+
+ /** Registers a callback for a given message type */
+ void registerCallback(int messageType, HelixIPCCallback callback);
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/f2475fa9/helix-ipc/src/main/java/org/apache/helix/ipc/netty/NettyHelixIPCService.java
----------------------------------------------------------------------
diff --git a/helix-ipc/src/main/java/org/apache/helix/ipc/netty/NettyHelixIPCService.java b/helix-ipc/src/main/java/org/apache/helix/ipc/netty/NettyHelixIPCService.java
new file mode 100644
index 0000000..00d6157
--- /dev/null
+++ b/helix-ipc/src/main/java/org/apache/helix/ipc/netty/NettyHelixIPCService.java
@@ -0,0 +1,490 @@
+package org.apache.helix.ipc.netty;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.CompositeByteBuf;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+import io.netty.handler.codec.LengthFieldPrepender;
+
+import java.net.InetSocketAddress;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.helix.ipc.HelixIPCCallback;
+import org.apache.helix.ipc.HelixIPCService;
+import org.apache.helix.resolver.HelixAddress;
+import org.apache.helix.resolver.HelixMessageScope;
+import org.apache.log4j.Logger;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.JmxReporter;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricRegistry;
+
+/**
+ * Provides partition/state-level messaging among nodes in a Helix cluster.
+ * <p>
+ * The message format is (where len == 4B, and contains the length of the next field)
+ *
+ * <pre>
+ * +----------------------+
+ * | totalLength (4B) |
+ * +----------------------+
+ * | version (4B) |
+ * +----------------------+
+ * | messageType (4B) |
+ * +----------------------+
+ * | messageId (16B) |
+ * +----------------------+
+ * | len | cluster |
+ * +----------------------+
+ * | len | resource |
+ * +----------------------+
+ * | len | partition |
+ * +----------------------+
+ * | len | state |
+ * +----------------------+
+ * | len | srcInstance |
+ * +----------------------+
+ * | len | dstInstance |
+ * +----------------------+
+ * | len | message |
+ * +----------------------+
+ * </pre>
+ *
+ * </p>
+ */
+public class NettyHelixIPCService implements HelixIPCService {
+
+ private static final Logger LOG = Logger.getLogger(NettyHelixIPCService.class);
+ private static final int MESSAGE_VERSION = 1;
+
+ // Parameters for length header field of message (tells decoder to interpret but preserve length
+ // field in message)
+ private static final int MAX_FRAME_LENGTH = 1024 * 1024;
+ private static final int LENGTH_FIELD_OFFSET = 0;
+ private static final int LENGTH_FIELD_LENGTH = 4;
+ private static final int LENGTH_ADJUSTMENT = -4;
+ private static final int INITIAL_BYTES_TO_STRIP = 0;
+ private static final int NUM_LENGTH_FIELDS = 7;
+
+ private final Config config;
+ private final AtomicBoolean isShutdown;
+ private final Map<InetSocketAddress, List<Channel>> channelMap;
+ private final ConcurrentMap<Channel, Long> channelOpenTimes;
+ private final MetricRegistry metricRegistry;
+ private final ConcurrentMap<Integer, HelixIPCCallback> callbacks;
+
+ private EventLoopGroup eventLoopGroup;
+ private Bootstrap clientBootstrap;
+ private Meter statTxMsg;
+ private Meter statRxMsg;
+ private Meter statTxBytes;
+ private Meter statRxBytes;
+ private Counter statChannelOpen;
+ private Counter statError;
+ private JmxReporter jmxReporter;
+
+ public NettyHelixIPCService(Config config) {
+ super();
+ this.config = config;
+ this.isShutdown = new AtomicBoolean(true);
+ this.channelMap = new HashMap<InetSocketAddress, List<Channel>>();
+ this.channelOpenTimes = new ConcurrentHashMap<Channel, Long>();
+ this.metricRegistry = new MetricRegistry();
+ this.callbacks = new ConcurrentHashMap<Integer, HelixIPCCallback>();
+ }
+
+ /**
+ * Starts message handling server, creates client bootstrap, and bootstraps partition routing
+ * table.
+ */
+ public void start() throws Exception {
+ if (isShutdown.getAndSet(false)) {
+ eventLoopGroup = new NioEventLoopGroup();
+
+ statTxMsg = metricRegistry.meter(MetricRegistry.name(NettyHelixIPCService.class, "txMsg"));
+ statRxMsg = metricRegistry.meter(MetricRegistry.name(NettyHelixIPCService.class, "rxMsg"));
+ statTxBytes =
+ metricRegistry.meter(MetricRegistry.name(NettyHelixIPCService.class, "txBytes"));
+ statRxBytes =
+ metricRegistry.meter(MetricRegistry.name(NettyHelixIPCService.class, "rxBytes"));
+ statChannelOpen =
+ metricRegistry.counter(MetricRegistry.name(NettyHelixIPCService.class, "channelOpen"));
+ statError = metricRegistry.counter(MetricRegistry.name(NettyHelixIPCService.class, "error"));
+
+ // Report metrics via JMX
+ jmxReporter = JmxReporter.forRegistry(metricRegistry).build();
+ jmxReporter.start();
+
+ new ServerBootstrap().group(eventLoopGroup).channel(NioServerSocketChannel.class)
+ .option(ChannelOption.SO_KEEPALIVE, true).childOption(ChannelOption.SO_KEEPALIVE, true)
+ .childHandler(new ChannelInitializer<SocketChannel>() {
+ @Override
+ protected void initChannel(SocketChannel socketChannel) throws Exception {
+ socketChannel.pipeline().addLast(
+ new LengthFieldBasedFrameDecoder(MAX_FRAME_LENGTH, LENGTH_FIELD_OFFSET,
+ LENGTH_FIELD_LENGTH, LENGTH_ADJUSTMENT, INITIAL_BYTES_TO_STRIP));
+ socketChannel.pipeline().addLast(new HelixIPCCallbackHandler());
+ }
+ }).bind(new InetSocketAddress(config.getPort()));
+
+ clientBootstrap =
+ new Bootstrap().group(eventLoopGroup).channel(NioSocketChannel.class)
+ .option(ChannelOption.SO_KEEPALIVE, true)
+ .handler(new ChannelInitializer<SocketChannel>() {
+ @Override
+ protected void initChannel(SocketChannel socketChannel) throws Exception {
+ socketChannel.pipeline().addLast(
+ new LengthFieldPrepender(LENGTH_FIELD_LENGTH, true));
+ }
+ });
+ }
+ }
+
+ /**
+ * Shuts down event loops for message handling server and message passing client.
+ */
+ public void shutdown() throws Exception {
+ if (!isShutdown.getAndSet(true)) {
+ jmxReporter.stop();
+ eventLoopGroup.shutdownGracefully();
+ }
+ }
+
+ /**
+ * Sends a message to all partitions with a given state in the cluster.
+ */
+ @Override
+ public void send(HelixAddress destination, int messageType, UUID messageId, ByteBuf message) {
+ // Send message
+ try {
+ // Get list of channels
+ List<Channel> channels = channelMap.get(destination.getSocketAddress());
+ if (channels == null) {
+ synchronized (channelMap) {
+ channels = channelMap.get(destination.getSocketAddress());
+ if (channels == null) {
+ channels = new ArrayList<Channel>(config.getNumConnections());
+ for (int i = 0; i < config.getNumConnections(); i++) {
+ channels.add(null);
+ }
+ channelMap.put(destination.getSocketAddress(), channels);
+ }
+ }
+ }
+
+ // Pick the channel for this scope
+ int idx = (Integer.MAX_VALUE & destination.getScope().hashCode()) % channels.size();
+ Channel channel = channels.get(idx);
+ if (channel == null || !channel.isOpen() || isExpired(channel)) {
+ synchronized (channelMap) {
+ channel = channels.get(idx);
+ if (channel == null || !channel.isOpen() || isExpired(channel)) {
+ if (channel != null && channel.isOpen()) {
+ channel.close();
+ }
+ channel = clientBootstrap.connect(destination.getSocketAddress()).sync().channel();
+ channels.set(idx, channel);
+ statChannelOpen.inc();
+ channelOpenTimes.put(channel, System.currentTimeMillis());
+ }
+ }
+ }
+
+ // Compute total length
+ int headerLength =
+ NUM_LENGTH_FIELDS
+ * (Integer.SIZE / 8)
+ + (Integer.SIZE / 8)
+ * 2 // version, type
+ + (Long.SIZE / 8)
+ * 2 // 128 bit UUID
+ + getLength(destination.getScope().getCluster())
+ + getLength(destination.getScope().getResource())
+ + getLength(destination.getScope().getPartition())
+ + getLength(destination.getScope().getState()) + getLength(config.getInstanceName())
+ + getLength(destination.getInstanceName());
+ int messageLength = message == null ? 0 : message.readableBytes();
+
+ // Build message header
+ ByteBuf headerBuf = channel.alloc().buffer(headerLength);
+ headerBuf.writeInt(MESSAGE_VERSION).writeInt(messageType)
+ .writeLong(messageId.getMostSignificantBits())
+ .writeLong(messageId.getLeastSignificantBits());
+ writeStringWithLength(headerBuf, destination.getScope().getCluster());
+ writeStringWithLength(headerBuf, destination.getScope().getResource());
+ writeStringWithLength(headerBuf, destination.getScope().getPartition());
+ writeStringWithLength(headerBuf, destination.getScope().getState());
+ writeStringWithLength(headerBuf, config.getInstanceName());
+ writeStringWithLength(headerBuf, destination.getInstanceName());
+
+ // Compose message header and payload
+ headerBuf.writeInt(messageLength);
+ CompositeByteBuf fullByteBuf = channel.alloc().compositeBuffer(2);
+ fullByteBuf.addComponent(headerBuf);
+ fullByteBuf.writerIndex(headerBuf.readableBytes());
+ if (message != null) {
+ fullByteBuf.addComponent(message);
+ fullByteBuf.writerIndex(fullByteBuf.writerIndex() + message.readableBytes());
+ }
+
+ // Send
+ statTxMsg.mark();
+ statTxBytes.mark(fullByteBuf.readableBytes());
+ channel.writeAndFlush(fullByteBuf);
+ } catch (Exception e) {
+ statError.inc();
+ throw new IllegalStateException("Could not send message to " + destination, e);
+ }
+ }
+
+ private boolean isExpired(Channel channel) {
+ Long channelOpenTime = channelOpenTimes.get(channel);
+ return channelOpenTime != null
+ && System.currentTimeMillis() - channelOpenTime >= config.getMaxChannelLifeMillis();
+ }
+
+ @Override
+ public void registerCallback(int messageType, HelixIPCCallback callback) {
+ callbacks.put(messageType, callback);
+ }
+
+ @ChannelHandler.Sharable
+ private class HelixIPCCallbackHandler extends SimpleChannelInboundHandler<ByteBuf> {
+
+ HelixIPCCallbackHandler() {
+ super(false); // we will manage reference
+ }
+
+ @Override
+ protected void channelRead0(ChannelHandlerContext ctx, ByteBuf byteBuf) throws Exception {
+ try {
+ int idx = 0;
+
+ // Message length
+ int messageLength = byteBuf.readInt();
+ idx += 4;
+
+ // Message version
+ @SuppressWarnings("unused")
+ int messageVersion = byteBuf.readInt();
+ idx += 4;
+
+ // Message type
+ int messageType = byteBuf.readInt();
+ idx += 4;
+
+ // Message ID
+ UUID messageId = new UUID(byteBuf.readLong(), byteBuf.readLong());
+ idx += 16;
+
+ // Cluster
+ byteBuf.readerIndex(idx);
+ int clusterSize = byteBuf.readInt();
+ idx += 4;
+ checkLength("clusterSize", clusterSize, messageLength);
+ String clusterName = toNonEmptyString(clusterSize, byteBuf);
+ idx += clusterSize;
+
+ // Resource
+ byteBuf.readerIndex(idx);
+ int resourceSize = byteBuf.readInt();
+ idx += 4;
+ checkLength("resourceSize", resourceSize, messageLength);
+ String resourceName = toNonEmptyString(resourceSize, byteBuf);
+ idx += resourceSize;
+
+ // Partition
+ byteBuf.readerIndex(idx);
+ int partitionSize = byteBuf.readInt();
+ idx += 4;
+ checkLength("partitionSize", partitionSize, messageLength);
+ String partitionName = toNonEmptyString(partitionSize, byteBuf);
+ idx += partitionSize;
+
+ // State
+ byteBuf.readerIndex(idx);
+ int stateSize = byteBuf.readInt();
+ idx += 4;
+ checkLength("stateSize", stateSize, messageLength);
+ String state = toNonEmptyString(stateSize, byteBuf);
+ idx += stateSize;
+
+ // Source instance
+ byteBuf.readerIndex(idx);
+ int srcInstanceSize = byteBuf.readInt();
+ idx += 4;
+ checkLength("srcInstanceSize", srcInstanceSize, messageLength);
+ String srcInstance = toNonEmptyString(srcInstanceSize, byteBuf);
+ idx += srcInstanceSize;
+
+ // Destination instance
+ byteBuf.readerIndex(idx);
+ int dstInstanceSize = byteBuf.readInt();
+ idx += 4;
+ checkLength("dstInstanceSize", dstInstanceSize, messageLength);
+ String dstInstance = toNonEmptyString(dstInstanceSize, byteBuf);
+ idx += dstInstanceSize;
+
+ // Position at message
+ byteBuf.readerIndex(idx + 4);
+
+ // Error check
+ if (dstInstance == null) {
+ throw new IllegalStateException("Received message addressed to null destination from "
+ + srcInstance);
+ } else if (!dstInstance.equals(config.getInstanceName())) {
+ throw new IllegalStateException(config.getInstanceName()
+ + " received message addressed to " + dstInstance + " from " + srcInstance);
+ } else if (callbacks.get(messageType) == null) {
+ throw new IllegalStateException("No callback registered for message type " + messageType);
+ }
+
+ // Build scope
+ HelixMessageScope scope =
+ new HelixMessageScope.Builder().cluster(clusterName).resource(resourceName)
+ .partition(partitionName).state(state).sourceInstance(srcInstance).build();
+
+ // Get callback
+ HelixIPCCallback callback = callbacks.get(messageType);
+ if (callback == null) {
+ throw new IllegalStateException("No callback registered for message type " + messageType);
+ }
+
+ // Handle callback
+ callback.onMessage(scope, messageId, byteBuf);
+
+ // Stats
+ statRxMsg.mark();
+ statRxBytes.mark(messageLength);
+ } finally {
+ byteBuf.release();
+ }
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable cause) {
+ LOG.error(cause);
+ }
+ }
+
+ /** Given a byte buf w/ a certain reader index, encodes the next length bytes as a String */
+ private static String toNonEmptyString(int length, ByteBuf byteBuf) {
+ if (byteBuf.readableBytes() >= length) {
+ return byteBuf.toString(byteBuf.readerIndex(), length, Charset.defaultCharset());
+ }
+ return null;
+ }
+
+ /** Writes [s.length(), s] to buf, or [0] if s is null */
+ private static void writeStringWithLength(ByteBuf buf, String s) {
+ if (s == null) {
+ buf.writeInt(0);
+ return;
+ }
+
+ buf.writeInt(s.length());
+ for (int i = 0; i < s.length(); i++) {
+ buf.writeByte(s.charAt(i));
+ }
+ }
+
+ /** Returns the length of a string, or 0 if s is null */
+ private static int getLength(String s) {
+ return s == null ? 0 : s.length();
+ }
+
+ /**
+ * @throws java.lang.IllegalArgumentException if length > messageLength (attempt to prevent OOM
+ * exceptions)
+ */
+ private static void checkLength(String fieldName, int length, int messageLength)
+ throws IllegalArgumentException {
+ if (length > messageLength) {
+ throw new IllegalArgumentException(fieldName + "=" + length
+ + " is greater than messageLength=" + messageLength);
+ }
+ }
+
+ public static class Config {
+ private String instanceName;
+ private int port;
+ private int numConnections = 1;
+ private long maxChannelLifeMillis = 5000;
+
+ public Config setInstanceName(String instanceName) {
+ this.instanceName = instanceName;
+ return this;
+ }
+
+ public Config setPort(int port) {
+ this.port = port;
+ return this;
+ }
+
+ public Config setNumConnections(int numConnections) {
+ this.numConnections = numConnections;
+ return this;
+ }
+
+ public Config setMaxChannelLifeMillis(long maxChannelLifeMillis) {
+ this.maxChannelLifeMillis = maxChannelLifeMillis;
+ return this;
+ }
+
+ public String getInstanceName() {
+ return instanceName;
+ }
+
+ public int getPort() {
+ return port;
+ }
+
+ public int getNumConnections() {
+ return numConnections;
+ }
+
+ public long getMaxChannelLifeMillis() {
+ return maxChannelLifeMillis;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/f2475fa9/helix-ipc/src/main/java/org/apache/helix/resolver/AbstractHelixResolver.java
----------------------------------------------------------------------
diff --git a/helix-ipc/src/main/java/org/apache/helix/resolver/AbstractHelixResolver.java b/helix-ipc/src/main/java/org/apache/helix/resolver/AbstractHelixResolver.java
new file mode 100644
index 0000000..c0fd2bb
--- /dev/null
+++ b/helix-ipc/src/main/java/org/apache/helix/resolver/AbstractHelixResolver.java
@@ -0,0 +1,295 @@
+package org.apache.helix.resolver;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.net.InetSocketAddress;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.log4j.Logger;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+/**
+ * A basic implementation of a resolver in terms of expiring routing tables
+ */
+public abstract class AbstractHelixResolver implements HelixResolver {
+ private static final Logger LOG = Logger.getLogger(AbstractHelixResolver.class);
+ private static final int DEFAULT_THREAD_POOL_SIZE = 10;
+ private static final long DEFAULT_LEASE_LENGTH_MS = 60 * 60 * 1000; // TODO: are these good
+ // values?
+ private static final String IPC_PORT = "IPC_PORT";
+ private final Map<String, Spectator> _connections;
+ private boolean _isConnected;
+ private ScheduledExecutorService _executor;
+
+ protected AbstractHelixResolver() {
+ _connections = Maps.newHashMap();
+ _isConnected = false;
+ }
+
+ @Override
+ public void connect() {
+ _executor = Executors.newScheduledThreadPool(DEFAULT_THREAD_POOL_SIZE);
+ _isConnected = true;
+ }
+
+ @Override
+ public void disconnect() {
+ synchronized (_connections) {
+ for (Spectator connection : _connections.values()) {
+ connection.shutdown();
+ }
+ _connections.clear();
+ }
+ _executor.shutdown();
+ _isConnected = false;
+ }
+
+ @Override
+ public Set<HelixAddress> getDestinations(HelixMessageScope scope) {
+ if (!scope.isValid()) {
+ LOG.error("Scope " + scope + " is not valid!");
+ return new HashSet<HelixAddress>();
+ } else if (!_isConnected) {
+ LOG.error("Cannot resolve " + scope + " without first connecting!");
+ return new HashSet<HelixAddress>();
+ }
+
+ // Connect or refresh connection
+ String cluster = scope.getCluster();
+ ResolverRoutingTable routingTable;
+ Spectator connection = _connections.get(cluster);
+ if (connection == null || !connection.getManager().isConnected()) {
+ synchronized (_connections) {
+ connection = _connections.get(cluster);
+ if (connection == null || !connection.getManager().isConnected()) {
+ connection = new Spectator(cluster, DEFAULT_LEASE_LENGTH_MS);
+ connection.init();
+ _connections.put(cluster, connection);
+ }
+ }
+ }
+ routingTable = connection.getRoutingTable();
+
+ // Resolve all resources, either explicitly or match all
+ Set<String> resources;
+ if (scope.getResource() != null) {
+ resources = Sets.newHashSet(scope.getResource());
+ } else {
+ resources = routingTable.getResources();
+ }
+
+ // Resolve all partitions
+ Map<String, Set<String>> partitionMap = Maps.newHashMap();
+ if (scope.getPartition() != null) {
+ for (String resource : resources) {
+ partitionMap.put(resource, Sets.newHashSet(scope.getPartition()));
+ }
+ } else {
+ for (String resource : resources) {
+ partitionMap.put(resource, routingTable.getPartitions(resource));
+ }
+ }
+
+ // Resolve all states
+ Set<String> states;
+ if (scope.getState() != null) {
+ states = Sets.newHashSet(scope.getState());
+ } else {
+ states = routingTable.getStates();
+ }
+
+ // Get all the participants that match
+ Set<InstanceConfig> participants = Sets.newHashSet();
+ for (String resource : resources) {
+ for (String partition : partitionMap.get(resource)) {
+ for (String state : states) {
+ participants.addAll(routingTable.getInstances(resource, partition, state));
+ }
+ }
+ }
+
+ // Resolve those participants
+ Set<HelixAddress> result = new HashSet<HelixAddress>();
+ for (InstanceConfig participant : participants) {
+ String ipcPort = participant.getRecord().getSimpleField(IPC_PORT);
+ if (ipcPort == null) {
+ LOG.error("No ipc address registered for target instance " + participant.getInstanceName()
+ + ", skipping");
+ } else {
+ result.add(new HelixAddress(scope, participant.getInstanceName(), new InetSocketAddress(
+ participant.getHostName(), Integer.valueOf(ipcPort))));
+ }
+ }
+
+ return result;
+ }
+
+ @Override
+ public HelixAddress getSource(HelixMessageScope scope) {
+ // Connect or refresh connection
+ String cluster = scope.getCluster();
+ ResolverRoutingTable routingTable;
+ Spectator connection = _connections.get(cluster);
+ if (connection == null || !connection.getManager().isConnected()) {
+ synchronized (_connections) {
+ connection = _connections.get(cluster);
+ if (connection == null || !connection.getManager().isConnected()) {
+ connection = new Spectator(cluster, DEFAULT_LEASE_LENGTH_MS);
+ connection.init();
+ _connections.put(cluster, connection);
+ }
+ }
+ }
+ routingTable = connection.getRoutingTable();
+
+ if (scope.getSourceInstance() != null) {
+ InstanceConfig config = routingTable.getInstanceConfig(scope.getSourceInstance());
+ String ipcPort = config.getRecord().getSimpleField(IPC_PORT);
+ if (ipcPort == null) {
+ throw new IllegalStateException("No IPC address registered for source instance "
+ + scope.getSourceInstance());
+ }
+ return new HelixAddress(scope, scope.getSourceInstance(), new InetSocketAddress(
+ config.getHostName(), Integer.valueOf(ipcPort)));
+ }
+
+ return null;
+ }
+
+ @Override
+ public boolean isConnected() {
+ return _isConnected;
+ }
+
+ /**
+ * Create a Helix manager connection based on the appropriate backing store
+ * @param cluster the name of the cluster to connect to
+ * @return HelixManager instance
+ */
+ protected abstract HelixManager createManager(String cluster);
+
+ private class Spectator {
+ private final String _cluster;
+ private final HelixManager _manager;
+ private final ResolverRoutingTable _routingTable;
+ private final long _leaseLengthMs;
+ private ScheduledFuture<?> _future;
+
+ /**
+ * Initialize a spectator. This does not automatically connect.
+ * @param cluster the cluster to spectate
+ * @param leaseLengthMs the expiry of this spectator after the last request
+ */
+ public Spectator(String cluster, long leaseLengthMs) {
+ _cluster = cluster;
+ _manager = createManager(cluster);
+ _leaseLengthMs = leaseLengthMs;
+ _routingTable = new ResolverRoutingTable();
+ }
+
+ /**
+ * Connect and initialize the routing table
+ */
+ public void init() {
+ try {
+ _manager.connect();
+ _manager.addExternalViewChangeListener(_routingTable);
+ _manager.addInstanceConfigChangeListener(_routingTable);
+
+ // Force an initial refresh
+ HelixDataAccessor accessor = _manager.getHelixDataAccessor();
+ List<ExternalView> externalViews =
+ accessor.getChildValues(accessor.keyBuilder().externalViews());
+ NotificationContext context = new NotificationContext(_manager);
+ context.setType(NotificationContext.Type.INIT);
+ _routingTable.onExternalViewChange(externalViews, context);
+ List<InstanceConfig> instanceConfigs =
+ accessor.getChildValues(accessor.keyBuilder().instanceConfigs());
+ _routingTable.onInstanceConfigChange(instanceConfigs, context);
+ } catch (Exception e) {
+ LOG.error("Error setting up routing table", e);
+ }
+ }
+
+ /**
+ * Clean up the connection to the spectated cluster
+ */
+ public void shutdown() {
+ resetFuture();
+ expire();
+ }
+
+ /**
+ * Get the dynamically-updating routing table for this cluster
+ * @return ResolverRoutingTable, a RoutingTableProvider that can answer questions about its
+ * contents
+ */
+ public ResolverRoutingTable getRoutingTable() {
+ renew();
+ return _routingTable;
+ }
+
+ public HelixManager getManager() {
+ return _manager;
+ }
+
+ private synchronized void renew() {
+ resetFuture();
+
+ // Schedule this connection to expire if not renewed quickly enough
+ _future = _executor.schedule(new Runnable() {
+ @Override
+ public void run() {
+ expire();
+ }
+ }, _leaseLengthMs, TimeUnit.MILLISECONDS);
+
+ }
+
+ private synchronized void resetFuture() {
+ if (_future != null && !_future.isDone()) {
+ _future.cancel(true);
+ }
+ }
+
+ private void expire() {
+ synchronized (_connections) {
+ _connections.remove(_cluster);
+ if (_manager != null && _manager.isConnected()) {
+ _manager.disconnect();
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/f2475fa9/helix-ipc/src/main/java/org/apache/helix/resolver/HelixAddress.java
----------------------------------------------------------------------
diff --git a/helix-ipc/src/main/java/org/apache/helix/resolver/HelixAddress.java b/helix-ipc/src/main/java/org/apache/helix/resolver/HelixAddress.java
new file mode 100644
index 0000000..657e8bd
--- /dev/null
+++ b/helix-ipc/src/main/java/org/apache/helix/resolver/HelixAddress.java
@@ -0,0 +1,70 @@
+package org.apache.helix.resolver;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.base.Objects;
+
+import java.net.InetSocketAddress;
+
+public class HelixAddress {
+
+ private final HelixMessageScope scope;
+ private final String instanceName;
+ private final InetSocketAddress socketAddress;
+
+ public HelixAddress(HelixMessageScope scope, String instanceName, InetSocketAddress socketAddress) {
+ this.scope = scope;
+ this.instanceName = instanceName;
+ this.socketAddress = socketAddress;
+ }
+
+ public HelixMessageScope getScope() {
+ return scope;
+ }
+
+ public String getInstanceName() {
+ return instanceName;
+ }
+
+ public InetSocketAddress getSocketAddress() {
+ return socketAddress;
+ }
+
+ @Override
+ public String toString() {
+ return Objects.toStringHelper(this).addValue(scope).addValue(instanceName)
+ .addValue(socketAddress).toString();
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(scope, instanceName, socketAddress);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof HelixAddress)) {
+ return false;
+ }
+ HelixAddress a = (HelixAddress) o;
+ return a.getScope().equals(scope) && a.getInstanceName().equals(instanceName)
+ && a.getSocketAddress().equals(socketAddress);
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/f2475fa9/helix-ipc/src/main/java/org/apache/helix/resolver/HelixMessageScope.java
----------------------------------------------------------------------
diff --git a/helix-ipc/src/main/java/org/apache/helix/resolver/HelixMessageScope.java b/helix-ipc/src/main/java/org/apache/helix/resolver/HelixMessageScope.java
new file mode 100644
index 0000000..ba06556
--- /dev/null
+++ b/helix-ipc/src/main/java/org/apache/helix/resolver/HelixMessageScope.java
@@ -0,0 +1,151 @@
+package org.apache.helix.resolver;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.base.Objects;
+
+/**
+ * A definition of the addressing scope of a message.
+ */
+public class HelixMessageScope {
+ private final String _cluster;
+ private final String _resource;
+ private final String _partition;
+ private final String _state;
+ private final String _srcInstance;
+
+ private HelixMessageScope(String cluster, String resource, String partition, String state,
+ String srcInstance) {
+ _cluster = cluster;
+ _resource = resource;
+ _partition = partition;
+ _state = state;
+ _srcInstance = srcInstance;
+ }
+
+ @Override
+ public String toString() {
+ return Objects.toStringHelper(this).addValue(_cluster).addValue(_resource).addValue(_partition)
+ .addValue(_state).toString();
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(_cluster, _resource, _partition, _state);
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other instanceof HelixMessageScope) {
+ HelixMessageScope that = (HelixMessageScope) other;
+ return Objects.equal(_cluster, that._cluster) && Objects.equal(_resource, that._resource)
+ && Objects.equal(_partition, that._partition) && Objects.equal(_state, that._state);
+ }
+ return false;
+ }
+
+ public String getCluster() {
+ return _cluster;
+ }
+
+ public String getResource() {
+ return _resource;
+ }
+
+ public String getPartition() {
+ return _partition;
+ }
+
+ public String getState() {
+ return _state;
+ }
+
+ public String getSourceInstance() {
+ return _srcInstance;
+ }
+
+ public boolean isValid() {
+ return _cluster != null && ((_partition != null && _resource != null) || _partition == null);
+ }
+
+ /**
+ * Creator for a HelixMessageScope
+ */
+ public static class Builder {
+ private String _cluster;
+ private String _resource;
+ private String _partition;
+ private String _state;
+ private String _sourceInstance;
+
+ /**
+ * Associate the scope with a cluster
+ * @param cluster the cluster to scope routing to
+ * @return Builder
+ */
+ public Builder cluster(String cluster) {
+ _cluster = cluster;
+ return this;
+ }
+
+ /**
+ * Associate the scope with a resource
+ * @param resource a resource served by the cluster
+ * @return Builder
+ */
+ public Builder resource(String resource) {
+ _resource = resource;
+ return this;
+ }
+
+ /**
+ * Associate the scope with a partition
+ * @param partition a specific partition of the scoped resource
+ * @return Builder
+ */
+ public Builder partition(String partition) {
+ _partition = partition;
+ return this;
+ }
+
+ /**
+ * Associate the scope with a state
+ * @param state a state that a resource in the cluster can be in
+ * @return Builder
+ */
+ public Builder state(String state) {
+ _state = state;
+ return this;
+ }
+
+ public Builder sourceInstance(String sourceInstance) {
+ _sourceInstance = sourceInstance;
+ return this;
+ }
+
+ /**
+ * Create the scope
+ * @return HelixMessageScope instance corresponding to the built scope
+ */
+ public HelixMessageScope build() {
+ return new HelixMessageScope(_cluster, _resource, _partition, _state, _sourceInstance);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/f2475fa9/helix-ipc/src/main/java/org/apache/helix/resolver/HelixResolver.java
----------------------------------------------------------------------
diff --git a/helix-ipc/src/main/java/org/apache/helix/resolver/HelixResolver.java b/helix-ipc/src/main/java/org/apache/helix/resolver/HelixResolver.java
new file mode 100644
index 0000000..b7f2f3e
--- /dev/null
+++ b/helix-ipc/src/main/java/org/apache/helix/resolver/HelixResolver.java
@@ -0,0 +1,47 @@
+package org.apache.helix.resolver;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Set;
+
+/**
+ * An interface that resolves a message scope to a direct address.
+ */
+public interface HelixResolver {
+ /**
+ * Initialize a connection for scope resolution.
+ */
+ void connect();
+
+ /**
+ * Tear down any state and open connections to Helix clusters.
+ */
+ void disconnect();
+
+ /**
+ * Check the connection status
+ * @return true if connected, false otherwise
+ */
+ boolean isConnected();
+
+ Set<HelixAddress> getDestinations(HelixMessageScope scope);
+
+ HelixAddress getSource(HelixMessageScope scope);
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/f2475fa9/helix-ipc/src/main/java/org/apache/helix/resolver/ResolverRoutingTable.java
----------------------------------------------------------------------
diff --git a/helix-ipc/src/main/java/org/apache/helix/resolver/ResolverRoutingTable.java b/helix-ipc/src/main/java/org/apache/helix/resolver/ResolverRoutingTable.java
new file mode 100644
index 0000000..2bc5413
--- /dev/null
+++ b/helix-ipc/src/main/java/org/apache/helix/resolver/ResolverRoutingTable.java
@@ -0,0 +1,92 @@
+package org.apache.helix.resolver;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.NotificationContext;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.spectator.RoutingTableProvider;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+/**
+ * A routing table that can also return all resources, partitions, and states in the cluster
+ */
+public class ResolverRoutingTable extends RoutingTableProvider {
+ Map<String, Set<String>> _resourceMap;
+ Set<String> _stateSet;
+
+ /**
+ * Create the table.
+ */
+ public ResolverRoutingTable() {
+ super();
+ _resourceMap = Maps.newHashMap();
+ _stateSet = Sets.newHashSet();
+ }
+
+ /**
+ * Get all resources that are currently served in the cluster.
+ * @return set of resource names
+ */
+ public synchronized Set<String> getResources() {
+ return Sets.newHashSet(_resourceMap.keySet());
+ }
+
+ /**
+ * Get all partitions currently served for a resource.
+ * @param resource the resource for which to look up partitions
+ * @return set of partition names
+ */
+ public synchronized Set<String> getPartitions(String resource) {
+ if (_resourceMap.containsKey(resource)) {
+ return Sets.newHashSet(_resourceMap.get(resource));
+ } else {
+ return Collections.emptySet();
+ }
+ }
+
+ /**
+ * Get all states that partitions of all resources are currently in
+ * @return set of state names
+ */
+ public synchronized Set<String> getStates() {
+ return Sets.newHashSet(_stateSet);
+ }
+
+ @Override
+ public synchronized void onExternalViewChange(List<ExternalView> externalViewList,
+ NotificationContext changeContext) {
+ super.onExternalViewChange(externalViewList, changeContext);
+ _resourceMap.clear();
+ _stateSet.clear();
+ for (ExternalView externalView : externalViewList) {
+ _resourceMap.put(externalView.getResourceName(), externalView.getPartitionSet());
+ for (String partition : externalView.getPartitionSet()) {
+ _stateSet.addAll(externalView.getStateMap(partition).values());
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/f2475fa9/helix-ipc/src/main/java/org/apache/helix/resolver/zk/ZKHelixResolver.java
----------------------------------------------------------------------
diff --git a/helix-ipc/src/main/java/org/apache/helix/resolver/zk/ZKHelixResolver.java b/helix-ipc/src/main/java/org/apache/helix/resolver/zk/ZKHelixResolver.java
new file mode 100644
index 0000000..48fa81a
--- /dev/null
+++ b/helix-ipc/src/main/java/org/apache/helix/resolver/zk/ZKHelixResolver.java
@@ -0,0 +1,45 @@
+package org.apache.helix.resolver.zk;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.resolver.AbstractHelixResolver;
+
+/**
+ * A ZooKeeper-specific {@link org.apache.helix.resolver.HelixResolver}
+ */
+public class ZKHelixResolver extends AbstractHelixResolver {
+ private final String _zkAddress;
+
+ /**
+ * Create a ZK-based Helix resolver
+ * @param zkConnectString the connection string to the ZooKeeper ensemble
+ */
+ public ZKHelixResolver(String zkConnectString) {
+ _zkAddress = zkConnectString;
+ }
+
+ @Override
+ protected HelixManager createManager(String cluster) {
+ return HelixManagerFactory.getZKHelixManager(cluster, null, InstanceType.SPECTATOR, _zkAddress);
+ }
+}
[3/4] git commit: Removed extraneous files in helix-ipc
Posted by ka...@apache.org.
Removed extraneous files in helix-ipc
Removed helix-ipc/LICENSE as per @hsaputra recommendation.
Removed some scripts intended to generate benchmark executables.
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/3bd83a3c
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/3bd83a3c
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/3bd83a3c
Branch: refs/heads/master
Commit: 3bd83a3c090a004337b8f6900caeed20de5d4cde
Parents: f2475fa
Author: Greg Brandt <br...@gmail.com>
Authored: Tue Aug 26 15:57:03 2014 -0700
Committer: Greg Brandt <br...@gmail.com>
Committed: Tue Aug 26 15:57:03 2014 -0700
----------------------------------------------------------------------
helix-ipc/LICENSE | 273 -------------------
helix-ipc/src/test/resources/build_benchmark.sh | 29 --
helix-ipc/src/test/resources/run_benchmark.sh | 37 ---
3 files changed, 339 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/3bd83a3c/helix-ipc/LICENSE
----------------------------------------------------------------------
diff --git a/helix-ipc/LICENSE b/helix-ipc/LICENSE
deleted file mode 100644
index 413913f..0000000
--- a/helix-ipc/LICENSE
+++ /dev/null
@@ -1,273 +0,0 @@
-
- Apache License
- Version 2.0, January 2004
- http://www.apache.org/licenses/
-
- TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
-
- 1. Definitions.
-
- "License" shall mean the terms and conditions for use, reproduction,
- and distribution as defined by Sections 1 through 9 of this document.
-
- "Licensor" shall mean the copyright owner or entity authorized by
- the copyright owner that is granting the License.
-
- "Legal Entity" shall mean the union of the acting entity and all
- other entities that control, are controlled by, or are under common
- control with that entity. For the purposes of this definition,
- "control" means (i) the power, direct or indirect, to cause the
- direction or management of such entity, whether by contract or
- otherwise, or (ii) ownership of fifty percent (50%) or more of the
- outstanding shares, or (iii) beneficial ownership of such entity.
-
- "You" (or "Your") shall mean an individual or Legal Entity
- exercising permissions granted by this License.
-
- "Source" form shall mean the preferred form for making modifications,
- including but not limited to software source code, documentation
- source, and configuration files.
-
- "Object" form shall mean any form resulting from mechanical
- transformation or translation of a Source form, including but
- not limited to compiled object code, generated documentation,
- and conversions to other media types.
-
- "Work" shall mean the work of authorship, whether in Source or
- Object form, made available under the License, as indicated by a
- copyright notice that is included in or attached to the work
- (an example is provided in the Appendix below).
-
- "Derivative Works" shall mean any work, whether in Source or Object
- form, that is based on (or derived from) the Work and for which the
- editorial revisions, annotations, elaborations, or other modifications
- represent, as a whole, an original work of authorship. For the purposes
- of this License, Derivative Works shall not include works that remain
- separable from, or merely link (or bind by name) to the interfaces of,
- the Work and Derivative Works thereof.
-
- "Contribution" shall mean any work of authorship, including
- the original version of the Work and any modifications or additions
- to that Work or Derivative Works thereof, that is intentionally
- submitted to Licensor for inclusion in the Work by the copyright owner
- or by an individual or Legal Entity authorized to submit on behalf of
- the copyright owner. For the purposes of this definition, "submitted"
- means any form of electronic, verbal, or written communication sent
- to the Licensor or its representatives, including but not limited to
- communication on electronic mailing lists, source code control systems,
- and issue tracking systems that are managed by, or on behalf of, the
- Licensor for the purpose of discussing and improving the Work, but
- excluding communication that is conspicuously marked or otherwise
- designated in writing by the copyright owner as "Not a Contribution."
-
- "Contributor" shall mean Licensor and any individual or Legal Entity
- on behalf of whom a Contribution has been received by Licensor and
- subsequently incorporated within the Work.
-
- 2. Grant of Copyright License. Subject to the terms and conditions of
- this License, each Contributor hereby grants to You a perpetual,
- worldwide, non-exclusive, no-charge, royalty-free, irrevocable
- copyright license to reproduce, prepare Derivative Works of,
- publicly display, publicly perform, sublicense, and distribute the
- Work and such Derivative Works in Source or Object form.
-
- 3. Grant of Patent License. Subject to the terms and conditions of
- this License, each Contributor hereby grants to You a perpetual,
- worldwide, non-exclusive, no-charge, royalty-free, irrevocable
- (except as stated in this section) patent license to make, have made,
- use, offer to sell, sell, import, and otherwise transfer the Work,
- where such license applies only to those patent claims licensable
- by such Contributor that are necessarily infringed by their
- Contribution(s) alone or by combination of their Contribution(s)
- with the Work to which such Contribution(s) was submitted. If You
- institute patent litigation against any entity (including a
- cross-claim or counterclaim in a lawsuit) alleging that the Work
- or a Contribution incorporated within the Work constitutes direct
- or contributory patent infringement, then any patent licenses
- granted to You under this License for that Work shall terminate
- as of the date such litigation is filed.
-
- 4. Redistribution. You may reproduce and distribute copies of the
- Work or Derivative Works thereof in any medium, with or without
- modifications, and in Source or Object form, provided that You
- meet the following conditions:
-
- (a) You must give any other recipients of the Work or
- Derivative Works a copy of this License; and
-
- (b) You must cause any modified files to carry prominent notices
- stating that You changed the files; and
-
- (c) You must retain, in the Source form of any Derivative Works
- that You distribute, all copyright, patent, trademark, and
- attribution notices from the Source form of the Work,
- excluding those notices that do not pertain to any part of
- the Derivative Works; and
-
- (d) If the Work includes a "NOTICE" text file as part of its
- distribution, then any Derivative Works that You distribute must
- include a readable copy of the attribution notices contained
- within such NOTICE file, excluding those notices that do not
- pertain to any part of the Derivative Works, in at least one
- of the following places: within a NOTICE text file distributed
- as part of the Derivative Works; within the Source form or
- documentation, if provided along with the Derivative Works; or,
- within a display generated by the Derivative Works, if and
- wherever such third-party notices normally appear. The contents
- of the NOTICE file are for informational purposes only and
- do not modify the License. You may add Your own attribution
- notices within Derivative Works that You distribute, alongside
- or as an addendum to the NOTICE text from the Work, provided
- that such additional attribution notices cannot be construed
- as modifying the License.
-
- You may add Your own copyright statement to Your modifications and
- may provide additional or different license terms and conditions
- for use, reproduction, or distribution of Your modifications, or
- for any such Derivative Works as a whole, provided Your use,
- reproduction, and distribution of the Work otherwise complies with
- the conditions stated in this License.
-
- 5. Submission of Contributions. Unless You explicitly state otherwise,
- any Contribution intentionally submitted for inclusion in the Work
- by You to the Licensor shall be under the terms and conditions of
- this License, without any additional terms or conditions.
- Notwithstanding the above, nothing herein shall supersede or modify
- the terms of any separate license agreement you may have executed
- with Licensor regarding such Contributions.
-
- 6. Trademarks. This License does not grant permission to use the trade
- names, trademarks, service marks, or product names of the Licensor,
- except as required for reasonable and customary use in describing the
- origin of the Work and reproducing the content of the NOTICE file.
-
- 7. Disclaimer of Warranty. Unless required by applicable law or
- agreed to in writing, Licensor provides the Work (and each
- Contributor provides its Contributions) on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- implied, including, without limitation, any warranties or conditions
- of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
- PARTICULAR PURPOSE. You are solely responsible for determining the
- appropriateness of using or redistributing the Work and assume any
- risks associated with Your exercise of permissions under this License.
-
- 8. Limitation of Liability. In no event and under no legal theory,
- whether in tort (including negligence), contract, or otherwise,
- unless required by applicable law (such as deliberate and grossly
- negligent acts) or agreed to in writing, shall any Contributor be
- liable to You for damages, including any direct, indirect, special,
- incidental, or consequential damages of any character arising as a
- result of this License or out of the use or inability to use the
- Work (including but not limited to damages for loss of goodwill,
- work stoppage, computer failure or malfunction, or any and all
- other commercial damages or losses), even if such Contributor
- has been advised of the possibility of such damages.
-
- 9. Accepting Warranty or Additional Liability. While redistributing
- the Work or Derivative Works thereof, You may choose to offer,
- and charge a fee for, acceptance of support, warranty, indemnity,
- or other liability obligations and/or rights consistent with this
- License. However, in accepting such obligations, You may act only
- on Your own behalf and on Your sole responsibility, not on behalf
- of any other Contributor, and only if You agree to indemnify,
- defend, and hold each Contributor harmless for any liability
- incurred by, or claims asserted against, such Contributor by reason
- of your accepting any such warranty or additional liability.
-
- END OF TERMS AND CONDITIONS
-
- APPENDIX: How to apply the Apache License to your work.
-
- To apply the Apache License to your work, attach the following
- boilerplate notice, with the fields enclosed by brackets "[]"
- replaced with your own identifying information. (Don't include
- the brackets!) The text should be enclosed in the appropriate
- comment syntax for the file format. We also recommend that a
- file or class name and description of purpose be included on the
- same "printed page" as the copyright notice for easier
- identification within third-party archives.
-
- Copyright [yyyy] [name of copyright owner]
-
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
-
-
-
-For xstream:
-
-Copyright (c) 2003-2006, Joe Walnes
-Copyright (c) 2006-2009, 2011 XStream Committers
-All rights reserved.
-
-Redistribution and use in source and binary forms, with or without
-modification, are permitted provided that the following conditions are met:
-
-1. Redistributions of source code must retain the above copyright notice, this list of
-conditions and the following disclaimer.
-
-2. Redistributions in binary form must reproduce the above copyright notice, this list of
-conditions and the following disclaimer in the documentation and/or other materials provided
-with the distribution.
-
-3. Neither the name of XStream nor the names of its contributors may be used to endorse
-or promote products derived from this software without specific prior written
-permission.
-
-THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
-EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
-OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT
-SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
-INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
-TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
-BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
-CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY
-WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH
-DAMAGE.
-
-for jline:
-
-Copyright (c) 2002-2006, Marc Prud'hommeaux <mw...@cornell.edu>
-All rights reserved.
-
-Redistribution and use in source and binary forms, with or
-without modification, are permitted provided that the following
-conditions are met:
-
-Redistributions of source code must retain the above copyright
-notice, this list of conditions and the following disclaimer.
-
-Redistributions in binary form must reproduce the above copyright
-notice, this list of conditions and the following disclaimer
-in the documentation and/or other materials provided with
-the distribution.
-
-Neither the name of JLine nor the names of its contributors
-may be used to endorse or promote products derived from this
-software without specific prior written permission.
-
-THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING,
-BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
-AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
-EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE
-FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY,
-OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
-PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
-AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
-LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING
-IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
-OF THE POSSIBILITY OF SUCH DAMAGE.
-
-
-
http://git-wip-us.apache.org/repos/asf/helix/blob/3bd83a3c/helix-ipc/src/test/resources/build_benchmark.sh
----------------------------------------------------------------------
diff --git a/helix-ipc/src/test/resources/build_benchmark.sh b/helix-ipc/src/test/resources/build_benchmark.sh
deleted file mode 100644
index 8b94e38..0000000
--- a/helix-ipc/src/test/resources/build_benchmark.sh
+++ /dev/null
@@ -1,29 +0,0 @@
-#!/bin/bash
-
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-cd helix-ipc
-mvn package -DskipTests
-mkdir -p /tmp/ipc-benchmark
-cp target/helix-ipc-0.7-1-jar-with-dependencies.jar /tmp/ipc-benchmark
-cp target/helix-ipc-0.7.1-tests.jar /tmp/ipc-benchmark
-cp src/test/resources/run_benchmark.sh /tmp/ipc-benchmark
-cd /tmp
-tar cvzf ipc-benchmark.tar.gz ipc-benchmark
http://git-wip-us.apache.org/repos/asf/helix/blob/3bd83a3c/helix-ipc/src/test/resources/run_benchmark.sh
----------------------------------------------------------------------
diff --git a/helix-ipc/src/test/resources/run_benchmark.sh b/helix-ipc/src/test/resources/run_benchmark.sh
deleted file mode 100644
index 0b9373d..0000000
--- a/helix-ipc/src/test/resources/run_benchmark.sh
+++ /dev/null
@@ -1,37 +0,0 @@
-#!/bin/bash
-
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-$JAVA_HOME/bin/java -cp helix-ipc-0.7.1-jar-with-dependencies.jar:helix-ipc-0.7.1-tests.jar \
- -Xloggc:gc_$JMX_PORT.log \
- -XX:+UseConcMarkSweepGC \
- -XX:+UseCMSInitiatingOccupancyOnly \
- -XX:CMSInitiatingOccupancyFraction=65 \
- -XX:NewRatio=2 \
- -Xmx4g \
- -Xms4g \
- -Dcom.sun.management.jmxremote \
- -Dcom.sun.management.jmxremote.port=$JMX_PORT \
- -Dcom.sun.management.jmxremote.authenticate=false \
- -Dcom.sun.management.jmxremote.ssl=false \
- -Dio.netty.resourceLeakDetection \
- -Dio.netty.allocator.type=pooled \
- -Dio.netty.noPreferDirect=false \
- org.apache.helix.ipc.benchmark.BenchmarkDriver $@