You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2014/08/28 20:00:15 UTC

[1/4] [HELIX-470] Netty-based IPC layer

Repository: helix
Updated Branches:
  refs/heads/master 59b4bbb0b -> d8ec1ae75


http://git-wip-us.apache.org/repos/asf/helix/blob/f2475fa9/helix-ipc/src/test/java/org/apache/helix/ipc/TestNettyHelixIPCService.java
----------------------------------------------------------------------
diff --git a/helix-ipc/src/test/java/org/apache/helix/ipc/TestNettyHelixIPCService.java b/helix-ipc/src/test/java/org/apache/helix/ipc/TestNettyHelixIPCService.java
new file mode 100644
index 0000000..b399377
--- /dev/null
+++ b/helix-ipc/src/test/java/org/apache/helix/ipc/TestNettyHelixIPCService.java
@@ -0,0 +1,353 @@
+package org.apache.helix.ipc;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.TestHelper;
+import org.apache.helix.api.StateTransitionHandlerFactory;
+import org.apache.helix.api.TransitionHandler;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.StateModelDefId;
+import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.ipc.netty.NettyHelixIPCService;
+import org.apache.helix.model.HelixConfigScope;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.builder.HelixConfigScopeBuilder;
+import org.apache.helix.participant.statemachine.StateModelInfo;
+import org.apache.helix.participant.statemachine.Transition;
+import org.apache.helix.resolver.HelixAddress;
+import org.apache.helix.resolver.HelixMessageScope;
+import org.apache.helix.resolver.HelixResolver;
+import org.apache.helix.resolver.zk.ZKHelixResolver;
+import org.apache.helix.testutil.ZkTestBase;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestNettyHelixIPCService extends ZkTestBase {
+
+  private static final Logger LOG = Logger.getLogger(TestNettyHelixIPCService.class);
+
+  private static final String CLUSTER_NAME = "TEST_CLUSTER";
+  private static final String RESOURCE_NAME = "MyResource";
+
+  private int firstPort;
+  private int secondPort;
+  private HelixManager controller;
+  private HelixManager firstNode;
+  private HelixManager secondNode;
+  private HelixResolver firstResolver;
+  private HelixResolver secondResolver;
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    // Allocate test resources
+    firstPort = TestHelper.getRandomPort();
+    secondPort = TestHelper.getRandomPort();
+
+    // Setup cluster
+    ClusterSetup clusterSetup = new ClusterSetup(_zkaddr);
+    clusterSetup.addCluster(CLUSTER_NAME, true);
+    clusterSetup.addInstanceToCluster(CLUSTER_NAME, "localhost_" + firstPort);
+    clusterSetup.addInstanceToCluster(CLUSTER_NAME, "localhost_" + secondPort);
+
+    // Start Helix agents
+    controller =
+        HelixControllerMain.startHelixController(_zkaddr, CLUSTER_NAME, "CONTROLLER", "STANDALONE");
+    firstNode =
+        HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, "localhost_" + firstPort,
+            InstanceType.PARTICIPANT, _zkaddr);
+    secondNode =
+        HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, "localhost_" + secondPort,
+            InstanceType.PARTICIPANT, _zkaddr);
+
+    // Connect participants
+    firstNode.getStateMachineEngine().registerStateModelFactory(
+        StateModelDefId.from("OnlineOffline"), new DummyStateModelFactory());
+    firstNode.connect();
+    secondNode.getStateMachineEngine().registerStateModelFactory(
+        StateModelDefId.from("OnlineOffline"), new DummyStateModelFactory());
+    secondNode.connect();
+
+    // Add a resource
+    clusterSetup.addResourceToCluster(CLUSTER_NAME, RESOURCE_NAME, 4, "OnlineOffline");
+    clusterSetup.rebalanceResource(CLUSTER_NAME, RESOURCE_NAME, 1);
+
+    // Wait for External view convergence
+    ClusterStateVerifier.verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(
+        _zkaddr, CLUSTER_NAME), 10000);
+
+    // Connect resolvers
+    firstResolver = new ZKHelixResolver(_zkaddr);
+    firstResolver.connect();
+    secondResolver = new ZKHelixResolver(_zkaddr);
+    secondResolver.connect();
+
+    // Configure
+    firstNode.getConfigAccessor().set(
+        new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.PARTICIPANT)
+            .forCluster(firstNode.getClusterName()).forParticipant(firstNode.getInstanceName())
+            .build(), HelixIPCService.IPC_PORT, String.valueOf(firstPort));
+    secondNode.getConfigAccessor().set(
+        new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.PARTICIPANT)
+            .forCluster(secondNode.getClusterName()).forParticipant(secondNode.getInstanceName())
+            .build(), HelixIPCService.IPC_PORT, String.valueOf(secondPort));
+  }
+
+  @AfterClass
+  public void afterClass() throws Exception {
+    firstNode.disconnect();
+    secondNode.disconnect();
+    controller.disconnect();
+  }
+
+  @Test
+  public void testService() throws Exception {
+    final int numMessages = 1000;
+    final int messageType = 1;
+
+    // Start first IPC service w/ counter
+    final ConcurrentMap<String, AtomicInteger> firstCounts =
+        new ConcurrentHashMap<String, AtomicInteger>();
+    final HelixIPCService firstIPC =
+        new NettyHelixIPCService(new NettyHelixIPCService.Config().setInstanceName(
+            firstNode.getInstanceName()).setPort(firstPort));
+    firstIPC.registerCallback(messageType, new HelixIPCCallback() {
+      @Override
+      public void onMessage(HelixMessageScope scope, UUID messageId, ByteBuf message) {
+        String key = scope.getPartition() + ":" + scope.getState();
+        firstCounts.putIfAbsent(key, new AtomicInteger());
+        firstCounts.get(key).incrementAndGet();
+      }
+    });
+    firstIPC.start();
+
+    // Start second IPC Service w/ counter
+    final ConcurrentMap<String, AtomicInteger> secondCounts =
+        new ConcurrentHashMap<String, AtomicInteger>();
+    final HelixIPCService secondIPC =
+        new NettyHelixIPCService(new NettyHelixIPCService.Config().setInstanceName(
+            secondNode.getInstanceName()).setPort(secondPort));
+    secondIPC.registerCallback(messageType, new HelixIPCCallback() {
+      @Override
+      public void onMessage(HelixMessageScope scope, UUID messageId, ByteBuf message) {
+        String key = scope.getPartition() + ":" + scope.getState();
+        secondCounts.putIfAbsent(key, new AtomicInteger());
+        secondCounts.get(key).incrementAndGet();
+      }
+    });
+    secondIPC.start();
+
+    // Allow resolver callbacks to fire
+    Thread.sleep(500);
+
+    // Find all partitions on second node...
+    String secondName = "localhost_" + secondPort;
+    Set<String> secondPartitions = new HashSet<String>();
+    IdealState idealState =
+        controller.getClusterManagmentTool().getResourceIdealState(CLUSTER_NAME, RESOURCE_NAME);
+    for (String partitionName : idealState.getPartitionSet()) {
+      for (Map.Entry<String, String> stateEntry : idealState.getInstanceStateMap(partitionName)
+          .entrySet()) {
+        if (stateEntry.getKey().equals(secondName)) {
+          secondPartitions.add(partitionName);
+        }
+      }
+    }
+
+    // And use first node to send messages to them
+    for (String partitionName : secondPartitions) {
+      for (int i = 0; i < numMessages; i++) {
+        HelixMessageScope scope =
+            new HelixMessageScope.Builder().cluster(firstNode.getClusterName())
+                .resource(RESOURCE_NAME).partition(partitionName).state("ONLINE").build();
+
+        Set<HelixAddress> destinations = firstResolver.getDestinations(scope);
+        for (HelixAddress destination : destinations) {
+          ByteBuf message = Unpooled.wrappedBuffer(("Hello" + i).getBytes());
+          firstIPC.send(destination, messageType, UUID.randomUUID(), message);
+        }
+      }
+    }
+
+    // Loopback
+    for (String partitionName : secondPartitions) {
+      for (int i = 0; i < numMessages; i++) {
+        HelixMessageScope scope =
+            new HelixMessageScope.Builder().cluster(secondNode.getClusterName())
+                .resource(RESOURCE_NAME).partition(partitionName).state("ONLINE").build();
+
+        Set<HelixAddress> destinations = secondResolver.getDestinations(scope);
+        for (HelixAddress destination : destinations) {
+          ByteBuf message = Unpooled.wrappedBuffer(("Hello" + i).getBytes());
+          secondIPC.send(destination, messageType, UUID.randomUUID(), message);
+        }
+      }
+    }
+
+    // Check
+    Thread.sleep(500); // just in case
+    for (String partitionName : secondPartitions) {
+      AtomicInteger count = secondCounts.get(partitionName + ":ONLINE");
+      Assert.assertNotNull(count);
+      Assert.assertEquals(count.get(), 2 * numMessages);
+    }
+
+    // Shutdown
+    firstIPC.shutdown();
+    secondIPC.shutdown();
+  }
+
+  @Test
+  public void testMessageManager() throws Exception {
+    final int numMessages = 1000;
+    final int messageType = 1;
+    final int ackMessageType = 2;
+
+    // First IPC service
+    final HelixIPCService firstIPC =
+        new NettyHelixIPCService(new NettyHelixIPCService.Config().setInstanceName(
+            firstNode.getInstanceName()).setPort(firstPort));
+    firstIPC.registerCallback(messageType, new HelixIPCCallback() {
+      final Random random = new Random();
+
+      @Override
+      public void onMessage(HelixMessageScope scope, UUID messageId, ByteBuf message) {
+        if (random.nextInt() % 2 == 0) {
+          HelixAddress sender = firstResolver.getSource(scope);
+          firstIPC.send(sender, ackMessageType, messageId, null);
+        }
+      }
+    });
+    firstIPC.start();
+
+    // Second IPC service
+    final HelixIPCService secondIPC =
+        new NettyHelixIPCService(new NettyHelixIPCService.Config().setInstanceName(
+            secondNode.getInstanceName()).setPort(secondPort));
+    secondIPC.registerCallback(messageType, new HelixIPCCallback() {
+      final Random random = new Random();
+
+      @Override
+      public void onMessage(HelixMessageScope scope, UUID messageId, ByteBuf message) {
+        if (random.nextInt() % 2 == 0) {
+          HelixAddress sender = secondResolver.getSource(scope);
+          secondIPC.send(sender, ackMessageType, messageId, null);
+        }
+      }
+    });
+    secondIPC.start();
+
+    // Allow resolver callbacks to fire
+    Thread.sleep(500);
+
+    // Start state machine (uses first, sends to second)
+    final AtomicInteger numAcks = new AtomicInteger();
+    final AtomicInteger numErrors = new AtomicInteger();
+    HelixIPCService messageManager =
+        new HelixIPCMessageManager(Executors.newSingleThreadScheduledExecutor(), firstIPC, 300, -1);
+    messageManager.registerCallback(ackMessageType, new HelixIPCCallback() {
+      @Override
+      public void onMessage(HelixMessageScope scope, UUID messageId, ByteBuf message) {
+        numAcks.incrementAndGet();
+      }
+    });
+    messageManager.start();
+
+    // Find all partitions on second node...
+    String secondName = "localhost_" + secondPort;
+    Set<String> secondPartitions = new HashSet<String>();
+    IdealState idealState =
+        controller.getClusterManagmentTool().getResourceIdealState(CLUSTER_NAME, RESOURCE_NAME);
+    for (String partitionName : idealState.getPartitionSet()) {
+      for (Map.Entry<String, String> stateEntry : idealState.getInstanceStateMap(partitionName)
+          .entrySet()) {
+        if (stateEntry.getKey().equals(secondName)) {
+          secondPartitions.add(partitionName);
+        }
+      }
+    }
+
+    // And use first node to send messages to them
+    for (String partitionName : secondPartitions) {
+      for (int i = 0; i < numMessages; i++) {
+        HelixMessageScope scope =
+            new HelixMessageScope.Builder().cluster(firstNode.getClusterName())
+                .resource(RESOURCE_NAME).partition(partitionName).state("ONLINE").build();
+        Set<HelixAddress> destinations = firstResolver.getDestinations(scope);
+        for (HelixAddress destination : destinations) {
+          ByteBuf message = Unpooled.wrappedBuffer(("Hello" + i).getBytes());
+          messageManager.send(destination, messageType, UUID.randomUUID(), message);
+        }
+      }
+    }
+
+    // Ensure they're all ack'ed (tests retry logic because only every other one is acked)
+    Thread.sleep(5000);
+    Assert.assertEquals(numAcks.get() + numErrors.get(), numMessages * secondPartitions.size());
+
+    // Shutdown
+    messageManager.shutdown();
+    firstIPC.shutdown();
+    secondIPC.shutdown();
+  }
+
+  public static class DummyStateModelFactory extends
+      StateTransitionHandlerFactory<TransitionHandler> {
+    @Override
+    public TransitionHandler createStateTransitionHandler(PartitionId partitionId) {
+      return new DummyStateModel();
+    }
+
+    @StateModelInfo(states = "{'OFFLINE', 'ONLINE'}", initialState = "OFFLINE")
+    public static class DummyStateModel extends TransitionHandler {
+      @Transition(from = "OFFLINE", to = "ONLINE")
+      public void fromOfflineToOnline(Message message, NotificationContext context) {
+        LOG.info(message);
+      }
+
+      @Transition(from = "ONLINE", to = "OFFLINE")
+      public void fromOnlineToOffline(Message message, NotificationContext context) {
+        LOG.info(message);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/f2475fa9/helix-ipc/src/test/java/org/apache/helix/ipc/benchmark/BenchmarkDriver.java
----------------------------------------------------------------------
diff --git a/helix-ipc/src/test/java/org/apache/helix/ipc/benchmark/BenchmarkDriver.java b/helix-ipc/src/test/java/org/apache/helix/ipc/benchmark/BenchmarkDriver.java
new file mode 100644
index 0000000..2d682f7
--- /dev/null
+++ b/helix-ipc/src/test/java/org/apache/helix/ipc/benchmark/BenchmarkDriver.java
@@ -0,0 +1,221 @@
+package org.apache.helix.ipc.benchmark;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+
+import java.lang.management.ManagementFactory;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.management.MXBean;
+import javax.management.ObjectName;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.helix.ipc.HelixIPCCallback;
+import org.apache.helix.ipc.HelixIPCService;
+import org.apache.helix.ipc.netty.NettyHelixIPCService;
+import org.apache.helix.resolver.HelixAddress;
+import org.apache.helix.resolver.HelixMessageScope;
+import org.apache.log4j.BasicConfigurator;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+
+import com.google.common.collect.ImmutableSet;
+
+/**
+ * Run with following to enable JMX:
+ * -Dcom.sun.management.jmxremote <br/>
+ * -Dcom.sun.management.jmxremote.port=10000 <br/>
+ * -Dcom.sun.management.jmxremote.authenticate=false <br/>
+ * -Dcom.sun.management.jmxremote.ssl=false <br/>
+ */
+public class BenchmarkDriver implements Runnable {
+
+  private static final int MESSAGE_TYPE = 1025;
+
+  private final int port;
+  private final int numPartitions;
+  private final AtomicBoolean isShutdown;
+  private final byte[] messageBytes;
+  private final int numConnections;
+
+  private HelixIPCService ipcService;
+  private String localhost;
+  private Thread[] trafficThreads;
+
+  public BenchmarkDriver(int port, int numPartitions, int numThreads, int messageSize,
+      int numConnections) {
+    this.port = port;
+    this.numPartitions = numPartitions;
+    this.isShutdown = new AtomicBoolean(true);
+    this.trafficThreads = new Thread[numThreads];
+    this.numConnections = numConnections;
+
+    StringBuilder sb = new StringBuilder();
+    for (int i = 0; i < messageSize; i++) {
+      sb.append("A");
+    }
+    this.messageBytes = sb.toString().getBytes();
+  }
+
+  @Override
+  public void run() {
+    try {
+      // Register controller MBean
+      final BenchmarkDriver driver = this;
+      ManagementFactory.getPlatformMBeanServer().registerMBean(new Controller() {
+        @Override
+        public void startTraffic(String remoteHost, int remotePort) {
+          driver.startTraffic(remoteHost, remotePort);
+        }
+
+        @Override
+        public void stopTraffic() {
+          driver.stopTraffic();
+        }
+      }, new ObjectName("org.apache.helix:type=BenchmarkDriver"));
+
+      // The local server
+      localhost = InetAddress.getLocalHost().getCanonicalHostName();
+      ipcService =
+          new NettyHelixIPCService(new NettyHelixIPCService.Config()
+              .setInstanceName(localhost + "_" + port).setPort(port)
+              .setNumConnections(numConnections));
+
+      // Counts number of messages received, and ack them
+      ipcService.registerCallback(MESSAGE_TYPE, new HelixIPCCallback() {
+        @Override
+        public void onMessage(HelixMessageScope scope, UUID messageId, ByteBuf message) {
+          // Do nothing
+        }
+      });
+
+      ipcService.start();
+      System.out.println("Started IPC service on "
+          + InetAddress.getLocalHost().getCanonicalHostName() + ":" + port);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private void startTraffic(final String remoteHost, final int remotePort) {
+    if (isShutdown.getAndSet(false)) {
+      System.out.println("Starting " + trafficThreads.length + " threads to generate traffic");
+      for (int i = 0; i < trafficThreads.length; i++) {
+        Thread t = new Thread() {
+          @Override
+          public void run() {
+            ByteBuf m = ByteBufAllocator.DEFAULT.buffer(messageBytes.length);
+            m.writeBytes(messageBytes);
+            while (!isShutdown.get()) {
+              for (int i = 0; i < numPartitions; i++) {
+                HelixMessageScope scope =
+                    new HelixMessageScope.Builder().cluster("CLUSTER").resource("RESOURCE")
+                        .partition("PARTITION_" + i).sourceInstance(localhost + "_" + port).build();
+
+                Set<HelixAddress> destinations =
+                    ImmutableSet.of(new HelixAddress(scope, remoteHost + "_" + remotePort,
+                        new InetSocketAddress(remoteHost, remotePort)));
+
+                UUID uuid = UUID.randomUUID();
+
+                try {
+                  for (HelixAddress destination : destinations) {
+                    m.retain();
+                    ipcService.send(destination, MESSAGE_TYPE, uuid, m);
+                  }
+                } catch (Exception e) {
+                  e.printStackTrace();
+                }
+              }
+            }
+          }
+        };
+        t.start();
+        trafficThreads[i] = t;
+      }
+      System.out.println("Started traffic to " + remoteHost + ":" + remotePort);
+    }
+  }
+
+  private void stopTraffic() {
+    if (!isShutdown.getAndSet(true)) {
+      try {
+        for (Thread t : trafficThreads) {
+          t.join();
+        }
+        System.out.println("Stopped traffic");
+      } catch (Exception e) {
+        e.printStackTrace();
+      }
+    }
+  }
+
+  @MXBean
+  public interface Controller {
+    void startTraffic(String remoteHost, int remotePort);
+
+    void stopTraffic();
+  }
+
+  public static void main(String[] args) throws Exception {
+    BasicConfigurator.configure();
+    Logger.getRootLogger().setLevel(Level.DEBUG);
+
+    Options options = new Options();
+    options.addOption("partitions", true, "Number of partitions");
+    options.addOption("threads", true, "Number of threads");
+    options.addOption("messageSize", true, "Message size in bytes");
+    options.addOption("numConnections", true, "Number of connections between nodes");
+
+    CommandLine commandLine = new GnuParser().parse(options, args);
+
+    if (commandLine.getArgs().length != 1) {
+      HelpFormatter helpFormatter = new HelpFormatter();
+      helpFormatter.printHelp("usage: [options] port", options);
+      System.exit(1);
+    }
+
+    final CountDownLatch latch = new CountDownLatch(1);
+
+    Runtime.getRuntime().addShutdownHook(new Thread() {
+      @Override
+      public void run() {
+        latch.countDown();
+      }
+    });
+
+    new BenchmarkDriver(Integer.parseInt(commandLine.getArgs()[0]), Integer.parseInt(commandLine
+        .getOptionValue("partitions", "1")), Integer.parseInt(commandLine.getOptionValue("threads",
+        "1")), Integer.parseInt(commandLine.getOptionValue("messageSize", "1024")),
+        Integer.parseInt(commandLine.getOptionValue("numConnections", "1"))).run();
+
+    latch.await();
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/f2475fa9/helix-ipc/src/test/java/org/apache/helix/resolver/TestZKHelixResolver.java
----------------------------------------------------------------------
diff --git a/helix-ipc/src/test/java/org/apache/helix/resolver/TestZKHelixResolver.java b/helix-ipc/src/test/java/org/apache/helix/resolver/TestZKHelixResolver.java
new file mode 100644
index 0000000..4e802d7
--- /dev/null
+++ b/helix-ipc/src/test/java/org/apache/helix/resolver/TestZKHelixResolver.java
@@ -0,0 +1,161 @@
+package org.apache.helix.resolver;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.net.InetSocketAddress;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.manager.zk.MockController;
+import org.apache.helix.manager.zk.MockParticipant;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.resolver.zk.ZKHelixResolver;
+import org.apache.helix.testutil.ZkTestBase;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+/**
+ * Test basic routing table lookups for a ZK-based Helix resolver.
+ */
+public class TestZKHelixResolver extends ZkTestBase {
+  private static final int NUM_PARTICIPANTS = 2;
+  private static final int NUM_PARTITIONS = 2;
+  private static final String CLUSTER_NAME = TestZKHelixResolver.class.getSimpleName();
+  private static final String RESOURCE_NAME = "MyResource";
+  private MockParticipant[] _participants;
+  private MockController _controller;
+  private ClusterSetup _setupTool;
+  private HelixResolver _resolver;
+  private Map<String, InetSocketAddress> _socketMap;
+
+  @BeforeClass
+  public void beforeClass() {
+    // Set up cluster
+    _setupTool = new ClusterSetup(_zkclient);
+    _setupTool.addCluster(CLUSTER_NAME, true);
+    _setupTool.addResourceToCluster(CLUSTER_NAME, RESOURCE_NAME, NUM_PARTITIONS, "OnlineOffline",
+        IdealState.RebalanceMode.FULL_AUTO.toString());
+    _setupTool.rebalanceCluster(CLUSTER_NAME, RESOURCE_NAME, 1, RESOURCE_NAME, null);
+
+    // Set up and start instances
+    _socketMap = Maps.newHashMap();
+    HelixAdmin admin = _setupTool.getClusterManagementTool();
+    _participants = new MockParticipant[NUM_PARTICIPANTS];
+    for (int i = 0; i < NUM_PARTICIPANTS; i++) {
+      String host = "localhost";
+      int port = i;
+      int ipcPort = i + 100;
+      String instanceName = host + "_" + port;
+      InstanceConfig config = new InstanceConfig(instanceName);
+      config.setHostName(host);
+      config.setPort(Integer.toString(port));
+      config.getRecord().setSimpleField("IPC_PORT", Integer.toString(ipcPort));
+      admin.addInstance(CLUSTER_NAME, config);
+      _socketMap.put(instanceName, new InetSocketAddress(host, ipcPort));
+      _participants[i] = new MockParticipant(_zkaddr, CLUSTER_NAME, instanceName);
+      _participants[i].syncStart();
+    }
+
+    // Start controller
+    _controller = new MockController(_zkaddr, CLUSTER_NAME, "controller_0");
+    _controller.syncStart();
+
+    // Connect a resolver
+    _resolver = new ZKHelixResolver(_zkaddr);
+    _resolver.connect();
+
+    // Wait for External view convergence
+    ClusterStateVerifier.verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(
+        _zkaddr, CLUSTER_NAME), 10000);
+  }
+
+  @Test
+  public void testResolution() {
+    HelixMessageScope clusterScope = new HelixMessageScope.Builder().cluster(CLUSTER_NAME).build();
+    Set<HelixAddress> destinations = _resolver.getDestinations(clusterScope);
+    Assert.assertNotNull(destinations);
+    Set<InetSocketAddress> addresses = new HashSet<InetSocketAddress>();
+    for (HelixAddress destination : destinations) {
+      addresses.add(destination.getSocketAddress());
+    }
+    Assert.assertTrue(addresses.containsAll(_socketMap.values()), "Expected " + _socketMap.values()
+        + ", found " + addresses);
+
+    HelixMessageScope resourceScope =
+        new HelixMessageScope.Builder().cluster(CLUSTER_NAME).resource(RESOURCE_NAME).build();
+    destinations = _resolver.getDestinations(resourceScope);
+    Assert.assertNotNull(destinations);
+    addresses.clear();
+    for (HelixAddress destination : destinations) {
+      addresses.add(destination.getSocketAddress());
+    }
+    Assert.assertTrue(addresses.containsAll(_socketMap.values()), "Expected " + _socketMap.values()
+        + ", found " + addresses);
+
+    HelixMessageScope partition0Scope =
+        new HelixMessageScope.Builder().cluster(CLUSTER_NAME).resource(RESOURCE_NAME)
+            .partition(RESOURCE_NAME + "_0").build();
+    destinations = _resolver.getDestinations(partition0Scope);
+    Assert.assertNotNull(destinations);
+    ExternalView externalView =
+        _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, RESOURCE_NAME);
+    Set<String> instanceSet = externalView.getStateMap(RESOURCE_NAME + "_0").keySet();
+    Set<InetSocketAddress> expectedSocketAddrs = Sets.newHashSet();
+    for (String instanceName : instanceSet) {
+      expectedSocketAddrs.add(_socketMap.get(instanceName));
+    }
+    addresses.clear();
+    for (HelixAddress destination : destinations) {
+      addresses.add(destination.getSocketAddress());
+    }
+    Assert.assertEquals(addresses, expectedSocketAddrs, "Expected " + expectedSocketAddrs
+        + ", found " + addresses);
+
+    HelixMessageScope sourceInstanceScope =
+        new HelixMessageScope.Builder().cluster(CLUSTER_NAME).resource(RESOURCE_NAME)
+            .partition(RESOURCE_NAME + "_0").sourceInstance(_participants[0].getInstanceName())
+            .build();
+    HelixAddress sourceAddress = _resolver.getSource(sourceInstanceScope);
+    Assert.assertNotNull(sourceAddress);
+    Assert.assertEquals(sourceAddress.getSocketAddress(),
+        _socketMap.get(_participants[0].getInstanceName()));
+  }
+
+  @AfterClass
+  public void afterClass() {
+    _resolver.disconnect();
+    _controller.syncStop();
+    for (MockParticipant participant : _participants) {
+      participant.syncStop();
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/f2475fa9/helix-ipc/src/test/resources/build_benchmark.sh
----------------------------------------------------------------------
diff --git a/helix-ipc/src/test/resources/build_benchmark.sh b/helix-ipc/src/test/resources/build_benchmark.sh
new file mode 100644
index 0000000..8b94e38
--- /dev/null
+++ b/helix-ipc/src/test/resources/build_benchmark.sh
@@ -0,0 +1,29 @@
+#!/bin/bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+cd helix-ipc
+mvn package -DskipTests
+mkdir -p /tmp/ipc-benchmark
+cp target/helix-ipc-0.7-1-jar-with-dependencies.jar /tmp/ipc-benchmark
+cp target/helix-ipc-0.7.1-tests.jar /tmp/ipc-benchmark
+cp src/test/resources/run_benchmark.sh /tmp/ipc-benchmark
+cd /tmp
+tar cvzf ipc-benchmark.tar.gz ipc-benchmark

http://git-wip-us.apache.org/repos/asf/helix/blob/f2475fa9/helix-ipc/src/test/resources/run_benchmark.sh
----------------------------------------------------------------------
diff --git a/helix-ipc/src/test/resources/run_benchmark.sh b/helix-ipc/src/test/resources/run_benchmark.sh
new file mode 100644
index 0000000..0b9373d
--- /dev/null
+++ b/helix-ipc/src/test/resources/run_benchmark.sh
@@ -0,0 +1,37 @@
+#!/bin/bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+$JAVA_HOME/bin/java -cp helix-ipc-0.7.1-jar-with-dependencies.jar:helix-ipc-0.7.1-tests.jar \
+    -Xloggc:gc_$JMX_PORT.log \
+    -XX:+UseConcMarkSweepGC \
+    -XX:+UseCMSInitiatingOccupancyOnly \
+    -XX:CMSInitiatingOccupancyFraction=65 \
+    -XX:NewRatio=2 \
+    -Xmx4g \
+    -Xms4g \
+    -Dcom.sun.management.jmxremote \
+    -Dcom.sun.management.jmxremote.port=$JMX_PORT \
+    -Dcom.sun.management.jmxremote.authenticate=false \
+    -Dcom.sun.management.jmxremote.ssl=false \
+    -Dio.netty.resourceLeakDetection \
+    -Dio.netty.allocator.type=pooled \
+    -Dio.netty.noPreferDirect=false \
+    org.apache.helix.ipc.benchmark.BenchmarkDriver $@

http://git-wip-us.apache.org/repos/asf/helix/blob/f2475fa9/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 4bd6f2e..ae49319 100644
--- a/pom.xml
+++ b/pom.xml
@@ -199,6 +199,7 @@ under the License.
     <module>helix-agent</module>
     <module>helix-provisioning</module>
     <module>helix-examples</module>
+    <module>helix-ipc</module>
     <module>recipes</module>
   </modules>
 


[4/4] git commit: Merge branch 'master' of https://github.com/brandtg/helix (closes #2)

Posted by ka...@apache.org.
Merge branch 'master' of https://github.com/brandtg/helix (closes #2)


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/d8ec1ae7
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/d8ec1ae7
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/d8ec1ae7

Branch: refs/heads/master
Commit: d8ec1ae7560118363e843283ce2f15f611aea099
Parents: 59b4bbb 3bd83a3
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Thu Aug 28 09:49:03 2014 -0700
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Thu Aug 28 10:00:43 2014 -0700

----------------------------------------------------------------------
 NOTICE                                          |   8 +
 .../helix/spectator/RoutingTableProvider.java   |  24 +-
 .../test/java/org/apache/helix/TestHelper.java  |  10 +
 helix-ipc/LICENSE                               | 202 ++++++++
 helix-ipc/NOTICE                                |  41 ++
 helix-ipc/pom.xml                               | 141 ++++++
 helix-ipc/src/assemble/assembly.xml             |  60 +++
 helix-ipc/src/main/config/log4j.properties      |  31 ++
 .../org/apache/helix/ipc/HelixIPCCallback.java  |  32 ++
 .../helix/ipc/HelixIPCMessageManager.java       | 163 ++++++
 .../org/apache/helix/ipc/HelixIPCService.java   |  49 ++
 .../helix/ipc/netty/NettyHelixIPCService.java   | 490 +++++++++++++++++++
 .../helix/resolver/AbstractHelixResolver.java   | 295 +++++++++++
 .../org/apache/helix/resolver/HelixAddress.java |  70 +++
 .../helix/resolver/HelixMessageScope.java       | 151 ++++++
 .../apache/helix/resolver/HelixResolver.java    |  47 ++
 .../helix/resolver/ResolverRoutingTable.java    |  92 ++++
 .../helix/resolver/zk/ZKHelixResolver.java      |  45 ++
 .../helix/ipc/TestNettyHelixIPCService.java     | 353 +++++++++++++
 .../helix/ipc/benchmark/BenchmarkDriver.java    | 221 +++++++++
 .../helix/resolver/TestZKHelixResolver.java     | 161 ++++++
 pom.xml                                         |   1 +
 22 files changed, 2685 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/d8ec1ae7/NOTICE
----------------------------------------------------------------------
diff --cc NOTICE
index ca4a5e0,ca4a5e0..2d70d2e
--- a/NOTICE
+++ b/NOTICE
@@@ -32,6 -32,6 +32,14 @@@ This product includes software develope
  zkclient (https://github.com/sgroschupf/zkclient).
  Licensed under the Apache License 2.0.
  
++This product includes software developed at
++netty (http://netty.io).
++Licensed under the Apache License 2.0.
++
++This product includes software developed at
++Metrics (http://metrics.codahale.com).
++Licensed under the Apache License 2.0.
++
  II. License Summary
  - Apache License 2.0
  - BSD License

http://git-wip-us.apache.org/repos/asf/helix/blob/d8ec1ae7/helix-ipc/LICENSE
----------------------------------------------------------------------
diff --cc helix-ipc/LICENSE
index 0000000,0000000..d645695
new file mode 100644
--- /dev/null
+++ b/helix-ipc/LICENSE
@@@ -1,0 -1,0 +1,202 @@@
++
++                                 Apache License
++                           Version 2.0, January 2004
++                        http://www.apache.org/licenses/
++
++   TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
++
++   1. Definitions.
++
++      "License" shall mean the terms and conditions for use, reproduction,
++      and distribution as defined by Sections 1 through 9 of this document.
++
++      "Licensor" shall mean the copyright owner or entity authorized by
++      the copyright owner that is granting the License.
++
++      "Legal Entity" shall mean the union of the acting entity and all
++      other entities that control, are controlled by, or are under common
++      control with that entity. For the purposes of this definition,
++      "control" means (i) the power, direct or indirect, to cause the
++      direction or management of such entity, whether by contract or
++      otherwise, or (ii) ownership of fifty percent (50%) or more of the
++      outstanding shares, or (iii) beneficial ownership of such entity.
++
++      "You" (or "Your") shall mean an individual or Legal Entity
++      exercising permissions granted by this License.
++
++      "Source" form shall mean the preferred form for making modifications,
++      including but not limited to software source code, documentation
++      source, and configuration files.
++
++      "Object" form shall mean any form resulting from mechanical
++      transformation or translation of a Source form, including but
++      not limited to compiled object code, generated documentation,
++      and conversions to other media types.
++
++      "Work" shall mean the work of authorship, whether in Source or
++      Object form, made available under the License, as indicated by a
++      copyright notice that is included in or attached to the work
++      (an example is provided in the Appendix below).
++
++      "Derivative Works" shall mean any work, whether in Source or Object
++      form, that is based on (or derived from) the Work and for which the
++      editorial revisions, annotations, elaborations, or other modifications
++      represent, as a whole, an original work of authorship. For the purposes
++      of this License, Derivative Works shall not include works that remain
++      separable from, or merely link (or bind by name) to the interfaces of,
++      the Work and Derivative Works thereof.
++
++      "Contribution" shall mean any work of authorship, including
++      the original version of the Work and any modifications or additions
++      to that Work or Derivative Works thereof, that is intentionally
++      submitted to Licensor for inclusion in the Work by the copyright owner
++      or by an individual or Legal Entity authorized to submit on behalf of
++      the copyright owner. For the purposes of this definition, "submitted"
++      means any form of electronic, verbal, or written communication sent
++      to the Licensor or its representatives, including but not limited to
++      communication on electronic mailing lists, source code control systems,
++      and issue tracking systems that are managed by, or on behalf of, the
++      Licensor for the purpose of discussing and improving the Work, but
++      excluding communication that is conspicuously marked or otherwise
++      designated in writing by the copyright owner as "Not a Contribution."
++
++      "Contributor" shall mean Licensor and any individual or Legal Entity
++      on behalf of whom a Contribution has been received by Licensor and
++      subsequently incorporated within the Work.
++
++   2. Grant of Copyright License. Subject to the terms and conditions of
++      this License, each Contributor hereby grants to You a perpetual,
++      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
++      copyright license to reproduce, prepare Derivative Works of,
++      publicly display, publicly perform, sublicense, and distribute the
++      Work and such Derivative Works in Source or Object form.
++
++   3. Grant of Patent License. Subject to the terms and conditions of
++      this License, each Contributor hereby grants to You a perpetual,
++      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
++      (except as stated in this section) patent license to make, have made,
++      use, offer to sell, sell, import, and otherwise transfer the Work,
++      where such license applies only to those patent claims licensable
++      by such Contributor that are necessarily infringed by their
++      Contribution(s) alone or by combination of their Contribution(s)
++      with the Work to which such Contribution(s) was submitted. If You
++      institute patent litigation against any entity (including a
++      cross-claim or counterclaim in a lawsuit) alleging that the Work
++      or a Contribution incorporated within the Work constitutes direct
++      or contributory patent infringement, then any patent licenses
++      granted to You under this License for that Work shall terminate
++      as of the date such litigation is filed.
++
++   4. Redistribution. You may reproduce and distribute copies of the
++      Work or Derivative Works thereof in any medium, with or without
++      modifications, and in Source or Object form, provided that You
++      meet the following conditions:
++
++      (a) You must give any other recipients of the Work or
++          Derivative Works a copy of this License; and
++
++      (b) You must cause any modified files to carry prominent notices
++          stating that You changed the files; and
++
++      (c) You must retain, in the Source form of any Derivative Works
++          that You distribute, all copyright, patent, trademark, and
++          attribution notices from the Source form of the Work,
++          excluding those notices that do not pertain to any part of
++          the Derivative Works; and
++
++      (d) If the Work includes a "NOTICE" text file as part of its
++          distribution, then any Derivative Works that You distribute must
++          include a readable copy of the attribution notices contained
++          within such NOTICE file, excluding those notices that do not
++          pertain to any part of the Derivative Works, in at least one
++          of the following places: within a NOTICE text file distributed
++          as part of the Derivative Works; within the Source form or
++          documentation, if provided along with the Derivative Works; or,
++          within a display generated by the Derivative Works, if and
++          wherever such third-party notices normally appear. The contents
++          of the NOTICE file are for informational purposes only and
++          do not modify the License. You may add Your own attribution
++          notices within Derivative Works that You distribute, alongside
++          or as an addendum to the NOTICE text from the Work, provided
++          that such additional attribution notices cannot be construed
++          as modifying the License.
++
++      You may add Your own copyright statement to Your modifications and
++      may provide additional or different license terms and conditions
++      for use, reproduction, or distribution of Your modifications, or
++      for any such Derivative Works as a whole, provided Your use,
++      reproduction, and distribution of the Work otherwise complies with
++      the conditions stated in this License.
++
++   5. Submission of Contributions. Unless You explicitly state otherwise,
++      any Contribution intentionally submitted for inclusion in the Work
++      by You to the Licensor shall be under the terms and conditions of
++      this License, without any additional terms or conditions.
++      Notwithstanding the above, nothing herein shall supersede or modify
++      the terms of any separate license agreement you may have executed
++      with Licensor regarding such Contributions.
++
++   6. Trademarks. This License does not grant permission to use the trade
++      names, trademarks, service marks, or product names of the Licensor,
++      except as required for reasonable and customary use in describing the
++      origin of the Work and reproducing the content of the NOTICE file.
++
++   7. Disclaimer of Warranty. Unless required by applicable law or
++      agreed to in writing, Licensor provides the Work (and each
++      Contributor provides its Contributions) on an "AS IS" BASIS,
++      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
++      implied, including, without limitation, any warranties or conditions
++      of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
++      PARTICULAR PURPOSE. You are solely responsible for determining the
++      appropriateness of using or redistributing the Work and assume any
++      risks associated with Your exercise of permissions under this License.
++
++   8. Limitation of Liability. In no event and under no legal theory,
++      whether in tort (including negligence), contract, or otherwise,
++      unless required by applicable law (such as deliberate and grossly
++      negligent acts) or agreed to in writing, shall any Contributor be
++      liable to You for damages, including any direct, indirect, special,
++      incidental, or consequential damages of any character arising as a
++      result of this License or out of the use or inability to use the
++      Work (including but not limited to damages for loss of goodwill,
++      work stoppage, computer failure or malfunction, or any and all
++      other commercial damages or losses), even if such Contributor
++      has been advised of the possibility of such damages.
++
++   9. Accepting Warranty or Additional Liability. While redistributing
++      the Work or Derivative Works thereof, You may choose to offer,
++      and charge a fee for, acceptance of support, warranty, indemnity,
++      or other liability obligations and/or rights consistent with this
++      License. However, in accepting such obligations, You may act only
++      on Your own behalf and on Your sole responsibility, not on behalf
++      of any other Contributor, and only if You agree to indemnify,
++      defend, and hold each Contributor harmless for any liability
++      incurred by, or claims asserted against, such Contributor by reason
++      of your accepting any such warranty or additional liability.
++
++   END OF TERMS AND CONDITIONS
++
++   APPENDIX: How to apply the Apache License to your work.
++
++      To apply the Apache License to your work, attach the following
++      boilerplate notice, with the fields enclosed by brackets "[]"
++      replaced with your own identifying information. (Don't include
++      the brackets!)  The text should be enclosed in the appropriate
++      comment syntax for the file format. We also recommend that a
++      file or class name and description of purpose be included on the
++      same "printed page" as the copyright notice for easier
++      identification within third-party archives.
++
++   Copyright [yyyy] [name of copyright owner]
++
++   Licensed under the Apache License, Version 2.0 (the "License");
++   you may not use this file except in compliance with the License.
++   You may obtain a copy of the License at
++
++       http://www.apache.org/licenses/LICENSE-2.0
++
++   Unless required by applicable law or agreed to in writing, software
++   distributed under the License is distributed on an "AS IS" BASIS,
++   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++   See the License for the specific language governing permissions and
++   limitations under the License.

http://git-wip-us.apache.org/repos/asf/helix/blob/d8ec1ae7/helix-ipc/NOTICE
----------------------------------------------------------------------
diff --cc helix-ipc/NOTICE
index 0000000,1ee0d24..6f657e3
mode 000000,100644..100644
--- a/helix-ipc/NOTICE
+++ b/helix-ipc/NOTICE
@@@ -1,0 -1,33 +1,41 @@@
+ Apache Helix
+ Copyright 2014 The Apache Software Foundation
+ 
+ 
+ I. Included Software
+ 
+ This product includes software developed at
+ The Apache Software Foundation (http://www.apache.org/).
+ Licensed under the Apache License 2.0.
+ 
+ This product includes software developed at
+ Codehaus (http://www.codehaus.org/).
+ Licensed under the BSD License.
+ 
+ This product includes software developed at
+ jline (http://jline.sourceforge.net/).
+ Licensed under the BSD License.
+ 
+ This product includes software developed at
+ Google (http://www.google.com/).
+ Licensed under the Apache License 2.0.
+ 
+ This product includes software developed at
+ snakeyaml (http://www.snakeyaml.org/).
+ Licensed under the Apache License 2.0.
+ 
+ This product includes software developed at
+ zkclient (https://github.com/sgroschupf/zkclient).
+ Licensed under the Apache License 2.0.
+ 
++This product includes software developed at
++netty (http://netty.io).
++Licensed under the Apache License 2.0.
++
++This product includes software developed at
++Metrics (http://metrics.codahale.com).
++Licensed under the Apache License 2.0.
++
+ II. License Summary
+ - Apache License 2.0
+ - BSD License


[2/4] git commit: [HELIX-470] Netty-based IPC layer

Posted by ka...@apache.org.
[HELIX-470] Netty-based IPC layer


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/f2475fa9
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/f2475fa9
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/f2475fa9

Branch: refs/heads/master
Commit: f2475fa9a6123052fea2588cdd4e439ddc7af020
Parents: bfe5d2d
Author: Greg Brandt <br...@gmail.com>
Authored: Tue Aug 26 13:14:36 2014 -0700
Committer: Greg Brandt <br...@gmail.com>
Committed: Tue Aug 26 13:14:36 2014 -0700

----------------------------------------------------------------------
 .../helix/spectator/RoutingTableProvider.java   |  24 +-
 .../test/java/org/apache/helix/TestHelper.java  |  10 +
 helix-ipc/LICENSE                               | 273 +++++++++++
 helix-ipc/NOTICE                                |  33 ++
 helix-ipc/pom.xml                               | 141 ++++++
 helix-ipc/src/assemble/assembly.xml             |  60 +++
 helix-ipc/src/main/config/log4j.properties      |  31 ++
 .../org/apache/helix/ipc/HelixIPCCallback.java  |  32 ++
 .../helix/ipc/HelixIPCMessageManager.java       | 163 ++++++
 .../org/apache/helix/ipc/HelixIPCService.java   |  49 ++
 .../helix/ipc/netty/NettyHelixIPCService.java   | 490 +++++++++++++++++++
 .../helix/resolver/AbstractHelixResolver.java   | 295 +++++++++++
 .../org/apache/helix/resolver/HelixAddress.java |  70 +++
 .../helix/resolver/HelixMessageScope.java       | 151 ++++++
 .../apache/helix/resolver/HelixResolver.java    |  47 ++
 .../helix/resolver/ResolverRoutingTable.java    |  92 ++++
 .../helix/resolver/zk/ZKHelixResolver.java      |  45 ++
 .../helix/ipc/TestNettyHelixIPCService.java     | 353 +++++++++++++
 .../helix/ipc/benchmark/BenchmarkDriver.java    | 221 +++++++++
 .../helix/resolver/TestZKHelixResolver.java     | 161 ++++++
 helix-ipc/src/test/resources/build_benchmark.sh |  29 ++
 helix-ipc/src/test/resources/run_benchmark.sh   |  37 ++
 pom.xml                                         |   1 +
 23 files changed, 2806 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/f2475fa9/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
index 7799ca1..ccce64a 100644
--- a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
+++ b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
@@ -38,7 +38,8 @@ import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.log4j.Logger;
 
-public class RoutingTableProvider implements ExternalViewChangeListener, InstanceConfigChangeListener {
+public class RoutingTableProvider implements ExternalViewChangeListener,
+    InstanceConfigChangeListener {
   private static final Logger logger = Logger.getLogger(RoutingTableProvider.class);
   private final AtomicReference<RoutingTable> _routingTableRef;
 
@@ -91,6 +92,15 @@ public class RoutingTableProvider implements ExternalViewChangeListener, Instanc
     return instanceSet;
   }
 
+  /**
+   * Get the configuration of an instance from its name
+   * @param instanceName the instance ID
+   * @return InstanceConfig if present, null otherwise
+   */
+  public InstanceConfig getInstanceConfig(String instanceName) {
+    return _routingTableRef.get().getConfig(instanceName);
+  }
+
   @Override
   public void onExternalViewChange(List<ExternalView> externalViewList,
       NotificationContext changeContext) {
@@ -124,12 +134,13 @@ public class RoutingTableProvider implements ExternalViewChangeListener, Instanc
     HelixDataAccessor accessor = changeContext.getManager().getHelixDataAccessor();
     Builder keyBuilder = accessor.keyBuilder();
 
+    RoutingTable newRoutingTable = new RoutingTable();
     List<InstanceConfig> configList = accessor.getChildValues(keyBuilder.instanceConfigs());
     Map<String, InstanceConfig> instanceConfigMap = new HashMap<String, InstanceConfig>();
     for (InstanceConfig config : configList) {
       instanceConfigMap.put(config.getId(), config);
+      newRoutingTable.addConfig(config);
     }
-    RoutingTable newRoutingTable = new RoutingTable();
     if (externalViewList != null) {
       for (ExternalView extView : externalViewList) {
         String resourceName = extView.getId();
@@ -154,9 +165,11 @@ public class RoutingTableProvider implements ExternalViewChangeListener, Instanc
 
   class RoutingTable {
     private final HashMap<String, ResourceInfo> resourceInfoMap;
+    private final Map<String, InstanceConfig> instanceConfigMap;
 
     public RoutingTable() {
       resourceInfoMap = new HashMap<String, RoutingTableProvider.ResourceInfo>();
+      instanceConfigMap = new HashMap<String, InstanceConfig>();
     }
 
     public void addEntry(String resourceName, String partitionName, String state,
@@ -169,10 +182,17 @@ public class RoutingTableProvider implements ExternalViewChangeListener, Instanc
 
     }
 
+    public void addConfig(InstanceConfig config) {
+      instanceConfigMap.put(config.getInstanceName(), config);
+    }
+
     ResourceInfo get(String resourceName) {
       return resourceInfoMap.get(resourceName);
     }
 
+    InstanceConfig getConfig(String instanceName) {
+      return instanceConfigMap.get(instanceName);
+    }
   }
 
   class ResourceInfo {

http://git-wip-us.apache.org/repos/asf/helix/blob/f2475fa9/helix-core/src/test/java/org/apache/helix/TestHelper.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/TestHelper.java b/helix-core/src/test/java/org/apache/helix/TestHelper.java
index 4a9139f..64796ba 100644
--- a/helix-core/src/test/java/org/apache/helix/TestHelper.java
+++ b/helix-core/src/test/java/org/apache/helix/TestHelper.java
@@ -20,7 +20,9 @@ package org.apache.helix;
  */
 
 import java.io.File;
+import java.io.IOException;
 import java.lang.reflect.Method;
+import java.net.ServerSocket;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -769,4 +771,12 @@ public class TestHelper {
     System.out.println(sb.toString());
   }
 
+  public static int getRandomPort() throws IOException {
+    ServerSocket sock = new ServerSocket();
+    sock.bind(null);
+    int port = sock.getLocalPort();
+    sock.close();
+    return port;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/f2475fa9/helix-ipc/LICENSE
----------------------------------------------------------------------
diff --git a/helix-ipc/LICENSE b/helix-ipc/LICENSE
new file mode 100644
index 0000000..413913f
--- /dev/null
+++ b/helix-ipc/LICENSE
@@ -0,0 +1,273 @@
+
+                                 Apache License
+                           Version 2.0, January 2004
+                        http://www.apache.org/licenses/
+
+   TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+   1. Definitions.
+
+      "License" shall mean the terms and conditions for use, reproduction,
+      and distribution as defined by Sections 1 through 9 of this document.
+
+      "Licensor" shall mean the copyright owner or entity authorized by
+      the copyright owner that is granting the License.
+
+      "Legal Entity" shall mean the union of the acting entity and all
+      other entities that control, are controlled by, or are under common
+      control with that entity. For the purposes of this definition,
+      "control" means (i) the power, direct or indirect, to cause the
+      direction or management of such entity, whether by contract or
+      otherwise, or (ii) ownership of fifty percent (50%) or more of the
+      outstanding shares, or (iii) beneficial ownership of such entity.
+
+      "You" (or "Your") shall mean an individual or Legal Entity
+      exercising permissions granted by this License.
+
+      "Source" form shall mean the preferred form for making modifications,
+      including but not limited to software source code, documentation
+      source, and configuration files.
+
+      "Object" form shall mean any form resulting from mechanical
+      transformation or translation of a Source form, including but
+      not limited to compiled object code, generated documentation,
+      and conversions to other media types.
+
+      "Work" shall mean the work of authorship, whether in Source or
+      Object form, made available under the License, as indicated by a
+      copyright notice that is included in or attached to the work
+      (an example is provided in the Appendix below).
+
+      "Derivative Works" shall mean any work, whether in Source or Object
+      form, that is based on (or derived from) the Work and for which the
+      editorial revisions, annotations, elaborations, or other modifications
+      represent, as a whole, an original work of authorship. For the purposes
+      of this License, Derivative Works shall not include works that remain
+      separable from, or merely link (or bind by name) to the interfaces of,
+      the Work and Derivative Works thereof.
+
+      "Contribution" shall mean any work of authorship, including
+      the original version of the Work and any modifications or additions
+      to that Work or Derivative Works thereof, that is intentionally
+      submitted to Licensor for inclusion in the Work by the copyright owner
+      or by an individual or Legal Entity authorized to submit on behalf of
+      the copyright owner. For the purposes of this definition, "submitted"
+      means any form of electronic, verbal, or written communication sent
+      to the Licensor or its representatives, including but not limited to
+      communication on electronic mailing lists, source code control systems,
+      and issue tracking systems that are managed by, or on behalf of, the
+      Licensor for the purpose of discussing and improving the Work, but
+      excluding communication that is conspicuously marked or otherwise
+      designated in writing by the copyright owner as "Not a Contribution."
+
+      "Contributor" shall mean Licensor and any individual or Legal Entity
+      on behalf of whom a Contribution has been received by Licensor and
+      subsequently incorporated within the Work.
+
+   2. Grant of Copyright License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      copyright license to reproduce, prepare Derivative Works of,
+      publicly display, publicly perform, sublicense, and distribute the
+      Work and such Derivative Works in Source or Object form.
+
+   3. Grant of Patent License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      (except as stated in this section) patent license to make, have made,
+      use, offer to sell, sell, import, and otherwise transfer the Work,
+      where such license applies only to those patent claims licensable
+      by such Contributor that are necessarily infringed by their
+      Contribution(s) alone or by combination of their Contribution(s)
+      with the Work to which such Contribution(s) was submitted. If You
+      institute patent litigation against any entity (including a
+      cross-claim or counterclaim in a lawsuit) alleging that the Work
+      or a Contribution incorporated within the Work constitutes direct
+      or contributory patent infringement, then any patent licenses
+      granted to You under this License for that Work shall terminate
+      as of the date such litigation is filed.
+
+   4. Redistribution. You may reproduce and distribute copies of the
+      Work or Derivative Works thereof in any medium, with or without
+      modifications, and in Source or Object form, provided that You
+      meet the following conditions:
+
+      (a) You must give any other recipients of the Work or
+          Derivative Works a copy of this License; and
+
+      (b) You must cause any modified files to carry prominent notices
+          stating that You changed the files; and
+
+      (c) You must retain, in the Source form of any Derivative Works
+          that You distribute, all copyright, patent, trademark, and
+          attribution notices from the Source form of the Work,
+          excluding those notices that do not pertain to any part of
+          the Derivative Works; and
+
+      (d) If the Work includes a "NOTICE" text file as part of its
+          distribution, then any Derivative Works that You distribute must
+          include a readable copy of the attribution notices contained
+          within such NOTICE file, excluding those notices that do not
+          pertain to any part of the Derivative Works, in at least one
+          of the following places: within a NOTICE text file distributed
+          as part of the Derivative Works; within the Source form or
+          documentation, if provided along with the Derivative Works; or,
+          within a display generated by the Derivative Works, if and
+          wherever such third-party notices normally appear. The contents
+          of the NOTICE file are for informational purposes only and
+          do not modify the License. You may add Your own attribution
+          notices within Derivative Works that You distribute, alongside
+          or as an addendum to the NOTICE text from the Work, provided
+          that such additional attribution notices cannot be construed
+          as modifying the License.
+
+      You may add Your own copyright statement to Your modifications and
+      may provide additional or different license terms and conditions
+      for use, reproduction, or distribution of Your modifications, or
+      for any such Derivative Works as a whole, provided Your use,
+      reproduction, and distribution of the Work otherwise complies with
+      the conditions stated in this License.
+
+   5. Submission of Contributions. Unless You explicitly state otherwise,
+      any Contribution intentionally submitted for inclusion in the Work
+      by You to the Licensor shall be under the terms and conditions of
+      this License, without any additional terms or conditions.
+      Notwithstanding the above, nothing herein shall supersede or modify
+      the terms of any separate license agreement you may have executed
+      with Licensor regarding such Contributions.
+
+   6. Trademarks. This License does not grant permission to use the trade
+      names, trademarks, service marks, or product names of the Licensor,
+      except as required for reasonable and customary use in describing the
+      origin of the Work and reproducing the content of the NOTICE file.
+
+   7. Disclaimer of Warranty. Unless required by applicable law or
+      agreed to in writing, Licensor provides the Work (and each
+      Contributor provides its Contributions) on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+      implied, including, without limitation, any warranties or conditions
+      of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+      PARTICULAR PURPOSE. You are solely responsible for determining the
+      appropriateness of using or redistributing the Work and assume any
+      risks associated with Your exercise of permissions under this License.
+
+   8. Limitation of Liability. In no event and under no legal theory,
+      whether in tort (including negligence), contract, or otherwise,
+      unless required by applicable law (such as deliberate and grossly
+      negligent acts) or agreed to in writing, shall any Contributor be
+      liable to You for damages, including any direct, indirect, special,
+      incidental, or consequential damages of any character arising as a
+      result of this License or out of the use or inability to use the
+      Work (including but not limited to damages for loss of goodwill,
+      work stoppage, computer failure or malfunction, or any and all
+      other commercial damages or losses), even if such Contributor
+      has been advised of the possibility of such damages.
+
+   9. Accepting Warranty or Additional Liability. While redistributing
+      the Work or Derivative Works thereof, You may choose to offer,
+      and charge a fee for, acceptance of support, warranty, indemnity,
+      or other liability obligations and/or rights consistent with this
+      License. However, in accepting such obligations, You may act only
+      on Your own behalf and on Your sole responsibility, not on behalf
+      of any other Contributor, and only if You agree to indemnify,
+      defend, and hold each Contributor harmless for any liability
+      incurred by, or claims asserted against, such Contributor by reason
+      of your accepting any such warranty or additional liability.
+
+   END OF TERMS AND CONDITIONS
+
+   APPENDIX: How to apply the Apache License to your work.
+
+      To apply the Apache License to your work, attach the following
+      boilerplate notice, with the fields enclosed by brackets "[]"
+      replaced with your own identifying information. (Don't include
+      the brackets!)  The text should be enclosed in the appropriate
+      comment syntax for the file format. We also recommend that a
+      file or class name and description of purpose be included on the
+      same "printed page" as the copyright notice for easier
+      identification within third-party archives.
+
+   Copyright [yyyy] [name of copyright owner]
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+
+
+
+For xstream:
+
+Copyright (c) 2003-2006, Joe Walnes
+Copyright (c) 2006-2009, 2011 XStream Committers
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are met:
+
+1. Redistributions of source code must retain the above copyright notice, this list of
+conditions and the following disclaimer.
+
+2. Redistributions in binary form must reproduce the above copyright notice, this list of
+conditions and the following disclaimer in the documentation and/or other materials provided
+with the distribution.
+
+3. Neither the name of XStream nor the names of its contributors may be used to endorse
+or promote products derived from this software without specific prior written
+permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
+EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
+OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT
+SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
+INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
+TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY
+WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH
+DAMAGE.
+
+for jline:
+
+Copyright (c) 2002-2006, Marc Prud'hommeaux <mw...@cornell.edu>
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or
+without modification, are permitted provided that the following
+conditions are met:
+
+Redistributions of source code must retain the above copyright
+notice, this list of conditions and the following disclaimer.
+
+Redistributions in binary form must reproduce the above copyright
+notice, this list of conditions and the following disclaimer
+in the documentation and/or other materials provided with
+the distribution.
+
+Neither the name of JLine nor the names of its contributors
+may be used to endorse or promote products derived from this
+software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING,
+BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
+AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
+EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE
+FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY,
+OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING
+IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
+OF THE POSSIBILITY OF SUCH DAMAGE.
+
+
+

http://git-wip-us.apache.org/repos/asf/helix/blob/f2475fa9/helix-ipc/NOTICE
----------------------------------------------------------------------
diff --git a/helix-ipc/NOTICE b/helix-ipc/NOTICE
new file mode 100644
index 0000000..1ee0d24
--- /dev/null
+++ b/helix-ipc/NOTICE
@@ -0,0 +1,33 @@
+Apache Helix
+Copyright 2014 The Apache Software Foundation
+
+
+I. Included Software
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+Licensed under the Apache License 2.0.
+
+This product includes software developed at
+Codehaus (http://www.codehaus.org/).
+Licensed under the BSD License.
+
+This product includes software developed at
+jline (http://jline.sourceforge.net/).
+Licensed under the BSD License.
+
+This product includes software developed at
+Google (http://www.google.com/).
+Licensed under the Apache License 2.0.
+
+This product includes software developed at
+snakeyaml (http://www.snakeyaml.org/).
+Licensed under the Apache License 2.0.
+
+This product includes software developed at
+zkclient (https://github.com/sgroschupf/zkclient).
+Licensed under the Apache License 2.0.
+
+II. License Summary
+- Apache License 2.0
+- BSD License

http://git-wip-us.apache.org/repos/asf/helix/blob/f2475fa9/helix-ipc/pom.xml
----------------------------------------------------------------------
diff --git a/helix-ipc/pom.xml b/helix-ipc/pom.xml
new file mode 100644
index 0000000..90ab11d
--- /dev/null
+++ b/helix-ipc/pom.xml
@@ -0,0 +1,141 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <parent>
+    <groupId>org.apache.helix</groupId>
+    <artifactId>helix</artifactId>
+    <version>0.7.1-SNAPSHOT</version>
+  </parent>
+  <modelVersion>4.0.0</modelVersion>
+
+  <artifactId>helix-ipc</artifactId>
+  <packaging>bundle</packaging>
+
+  <name>Apache Helix :: IPC</name>
+
+  <properties>
+    <osgi.import>
+      javax.management*,
+      org.apache.commons.math*;version="[2.1,3)",
+      org.apache.log4j*;version="[1.2,2)",
+      org.restlet;version="[2.1.4,3]",
+      *
+    </osgi.import>
+    <osgi.ignore>
+      org.apache.helix.tools*
+    </osgi.ignore>
+    <osgi.export>org.apache.helix*;version="${project.version};-noimport:=true</osgi.export>
+  </properties>
+
+  <dependencies>
+      <dependency>
+          <groupId>org.apache.helix</groupId>
+          <artifactId>helix-core</artifactId>
+      </dependency>
+      <dependency>
+          <groupId>io.netty</groupId>
+          <artifactId>netty-all</artifactId>
+          <version>4.0.21.Final</version>
+      </dependency>
+      <dependency>
+          <groupId>com.codahale.metrics</groupId>
+          <artifactId>metrics-core</artifactId>
+          <version>3.0.1</version>
+      </dependency>
+      <dependency>
+          <groupId>org.apache.helix</groupId>
+          <artifactId>helix-core</artifactId>
+          <type>test-jar</type>
+          <scope>test</scope>
+      </dependency>
+      <dependency>
+          <groupId>org.testng</groupId>
+          <artifactId>testng</artifactId>
+          <scope>test</scope>
+          <exclusions>
+              <exclusion>
+                  <groupId>junit</groupId>
+                  <artifactId>junit</artifactId>
+              </exclusion>
+          </exclusions>
+      </dependency>
+  </dependencies>
+  <build>
+    <resources>
+      <resource>
+        <directory>${basedir}/src/main/resources</directory>
+        <filtering>true</filtering>
+      </resource>
+      <resource>
+        <directory>${basedir}</directory>
+        <includes>
+          <include>DISCLAIMER</include>
+        </includes>
+      </resource>
+    </resources>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+        <executions>
+          <execution>
+            <goals>
+              <goal>test-jar</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>appassembler-maven-plugin</artifactId>
+        <configuration>
+          <programs>
+            <program>
+              <mainClass>org.apache.helix.ipc.netty.NettyHelixIPCService</mainClass>
+              <name>netty-helix-ipc-service</name>
+            </program>
+          </programs>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-assembly-plugin</artifactId>
+        <configuration>
+          <!--
+          <descriptors>
+            <descriptor>src/assemble/assembly.xml</descriptor>
+          </descriptors>
+          -->
+          <descriptorRefs>
+            <descriptorRef>jar-with-dependencies</descriptorRef>
+          </descriptorRefs>
+        </configuration>
+        <executions>
+          <execution>
+            <phase>package</phase>
+            <goals>
+              <goal>single</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+</project>

http://git-wip-us.apache.org/repos/asf/helix/blob/f2475fa9/helix-ipc/src/assemble/assembly.xml
----------------------------------------------------------------------
diff --git a/helix-ipc/src/assemble/assembly.xml b/helix-ipc/src/assemble/assembly.xml
new file mode 100644
index 0000000..c2d08a1
--- /dev/null
+++ b/helix-ipc/src/assemble/assembly.xml
@@ -0,0 +1,60 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<assembly>
+  <id>pkg</id>
+  <formats>
+    <format>tar</format>
+  </formats>
+  <fileSets>
+    <fileSet>
+      <directory>${project.build.directory}/${project.artifactId}-pkg/bin</directory>
+      <outputDirectory>bin</outputDirectory>
+      <lineEnding>unix</lineEnding>
+      <fileMode>0755</fileMode>
+      <directoryMode>0755</directoryMode>
+    </fileSet>
+    <fileSet>
+      <directory>${project.build.directory}/${project.artifactId}-pkg/repo/</directory>
+      <outputDirectory>repo</outputDirectory>
+      <fileMode>0755</fileMode>
+      <directoryMode>0755</directoryMode>
+      <excludes>
+        <exclude>**/*.xml</exclude>
+      </excludes>
+    </fileSet>
+     <fileSet>
+      <directory>${project.build.directory}/${project.artifactId}-pkg/conf</directory>
+      <outputDirectory>conf</outputDirectory>
+      <lineEnding>unix</lineEnding>
+      <fileMode>0755</fileMode>
+      <directoryMode>0755</directoryMode>
+    </fileSet>
+    <fileSet>
+      <directory>${project.basedir}</directory>
+      <outputDirectory>/</outputDirectory>
+      <includes>
+        <include>LICENSE</include>
+        <include>NOTICE</include>
+        <include>DISCLAIMER</include>
+      </includes>
+      <fileMode>0755</fileMode>
+    </fileSet>
+  </fileSets>
+</assembly>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/helix/blob/f2475fa9/helix-ipc/src/main/config/log4j.properties
----------------------------------------------------------------------
diff --git a/helix-ipc/src/main/config/log4j.properties b/helix-ipc/src/main/config/log4j.properties
new file mode 100644
index 0000000..4b3dc31
--- /dev/null
+++ b/helix-ipc/src/main/config/log4j.properties
@@ -0,0 +1,31 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Set root logger level to DEBUG and its only appender to A1.
+log4j.rootLogger=ERROR,A1
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.A1=org.apache.log4j.ConsoleAppender
+
+# A1 uses PatternLayout.
+log4j.appender.A1.layout=org.apache.log4j.PatternLayout
+log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+
+log4j.logger.org.I0Itec=ERROR
+log4j.logger.org.apache=ERROR

http://git-wip-us.apache.org/repos/asf/helix/blob/f2475fa9/helix-ipc/src/main/java/org/apache/helix/ipc/HelixIPCCallback.java
----------------------------------------------------------------------
diff --git a/helix-ipc/src/main/java/org/apache/helix/ipc/HelixIPCCallback.java b/helix-ipc/src/main/java/org/apache/helix/ipc/HelixIPCCallback.java
new file mode 100644
index 0000000..9ea23ca
--- /dev/null
+++ b/helix-ipc/src/main/java/org/apache/helix/ipc/HelixIPCCallback.java
@@ -0,0 +1,32 @@
+package org.apache.helix.ipc;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import io.netty.buffer.ByteBuf;
+import org.apache.helix.resolver.HelixMessageScope;
+
+import java.util.UUID;
+
+/**
+ * Callback registered per message type to handle messages.
+ */
+public interface HelixIPCCallback {
+  void onMessage(HelixMessageScope scope, UUID messageId, ByteBuf message);
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/f2475fa9/helix-ipc/src/main/java/org/apache/helix/ipc/HelixIPCMessageManager.java
----------------------------------------------------------------------
diff --git a/helix-ipc/src/main/java/org/apache/helix/ipc/HelixIPCMessageManager.java b/helix-ipc/src/main/java/org/apache/helix/ipc/HelixIPCMessageManager.java
new file mode 100644
index 0000000..89b10eb
--- /dev/null
+++ b/helix-ipc/src/main/java/org/apache/helix/ipc/HelixIPCMessageManager.java
@@ -0,0 +1,163 @@
+package org.apache.helix.ipc;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import io.netty.buffer.ByteBuf;
+import org.apache.helix.resolver.HelixAddress;
+import org.apache.helix.resolver.HelixMessageScope;
+import org.apache.log4j.Logger;
+
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * A wrapper around a base IPC service that manages message retries / timeouts.
+ * <p>
+ * This class manages retries and timeouts, and can be used in the same way as a
+ * {@link org.apache.helix.ipc.HelixIPCService}.
+ * </p>
+ * <p>
+ * A message will be sent until the max number of retries has been reached (i.e. timed out), or it
+ * is acknowledged by the recipient. If the max number of retries is -1, it will be retried forever.
+ * </p>
+ * <p>
+ * A callback should be registered for every acknowledgement message type associated with any
+ * original message type sent by this class.
+ * </p>
+ * <p>
+ * For example, consider we have the two message types defined: DATA_REQ = 1, DATA_ACK = 2. One
+ * would do the following:
+ * 
+ * <pre>
+ * messageManager.registerCallback(DATA_ACK, new HelixIPCCallback() {
+ *   public void onMessage(HelixMessageScope scope, UUID messageId, ByteBuf message) {
+ *     // Process ACK
+ *   }
+ * 
+ *   public void onError(HelixMessageScope scope, UUID messageId, Throwable cause) {
+ *     // Message error or timeout
+ *   }
+ * });
+ * 
+ * messageManager.send(destinations, DATA_REQ, messageId, data);
+ * </pre>
+ * 
+ * </p>
+ * <p>
+ * In send, we note messageId, and retry until we get a DATA_ACK for the same messageId. The
+ * callback registered with the message manager will only be called once, even if the message is
+ * acknowledged several times.
+ * </p>
+ */
+public class HelixIPCMessageManager implements HelixIPCService {
+
+  private static final Logger LOG = Logger.getLogger(HelixIPCMessageManager.class);
+
+  private final ScheduledExecutorService scheduler;
+  private final HelixIPCService baseIpcService;
+  private final long messageTimeoutMillis;
+  private final int maxNumRetries;
+  private final ConcurrentMap<UUID, Boolean> pendingMessages;
+  private final ConcurrentMap<UUID, AtomicInteger> retriesLeft;
+  private final ConcurrentMap<UUID, ByteBuf> messageBuffers;
+  private final ConcurrentMap<Integer, HelixIPCCallback> callbacks;
+
+  public HelixIPCMessageManager(ScheduledExecutorService scheduler, HelixIPCService baseIpcService,
+      long messageTimeoutMillis, int maxNumRetries) {
+    this.scheduler = scheduler;
+    this.baseIpcService = baseIpcService;
+    this.maxNumRetries = maxNumRetries;
+    this.messageTimeoutMillis = messageTimeoutMillis;
+    this.pendingMessages = new ConcurrentHashMap<UUID, Boolean>();
+    this.retriesLeft = new ConcurrentHashMap<UUID, AtomicInteger>();
+    this.messageBuffers = new ConcurrentHashMap<UUID, ByteBuf>();
+    this.callbacks = new ConcurrentHashMap<Integer, HelixIPCCallback>();
+  }
+
+  @Override
+  public void start() throws Exception {
+    baseIpcService.start();
+  }
+
+  @Override
+  public void shutdown() throws Exception {
+    baseIpcService.shutdown();
+  }
+
+  @Override
+  public void send(final HelixAddress destination, final int messageType, final UUID messageId,
+      final ByteBuf message) {
+    // State
+    pendingMessages.put(messageId, true);
+    retriesLeft.putIfAbsent(messageId, new AtomicInteger(maxNumRetries));
+    messageBuffers.put(messageId, message);
+
+    // Will free it when we've finally received response
+    message.retain();
+
+    // Send initial message
+    baseIpcService.send(destination, messageType, messageId, message);
+
+    // Retries
+    scheduler.schedule(new Runnable() {
+      @Override
+      public void run() {
+        Boolean isPending = pendingMessages.get(messageId);
+        AtomicInteger numLeft = retriesLeft.get(messageId);
+        if (numLeft != null && isPending != null && isPending) {
+          if (numLeft.decrementAndGet() > 0 || maxNumRetries == -1) {
+            // n.b. will schedule another retry
+            send(destination, messageType, messageId, message);
+          } else {
+            LOG.warn("Message " + messageId + " timed out after " + maxNumRetries + " retries");
+          }
+        }
+      }
+    }, messageTimeoutMillis, TimeUnit.MILLISECONDS);
+  }
+
+  @Override
+  public void registerCallback(final int messageType, final HelixIPCCallback callback) {
+
+    // This callback will first check if the message is pending, then delegate to the provided
+    // callback if it has not yet done so.
+    HelixIPCCallback wrappedCallback = new HelixIPCCallback() {
+      @Override
+      public void onMessage(HelixMessageScope scope, UUID messageId, ByteBuf message) {
+        if (pendingMessages.replace(messageId, true, false)) {
+          pendingMessages.remove(messageId);
+          ByteBuf originalMessage = messageBuffers.remove(messageId);
+          if (originalMessage != null) {
+            originalMessage.release();
+          }
+          retriesLeft.remove(messageId);
+          callback.onMessage(scope, messageId, message);
+        }
+      }
+    };
+
+    callbacks.put(messageType, wrappedCallback);
+    baseIpcService.registerCallback(messageType, wrappedCallback);
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/f2475fa9/helix-ipc/src/main/java/org/apache/helix/ipc/HelixIPCService.java
----------------------------------------------------------------------
diff --git a/helix-ipc/src/main/java/org/apache/helix/ipc/HelixIPCService.java b/helix-ipc/src/main/java/org/apache/helix/ipc/HelixIPCService.java
new file mode 100644
index 0000000..6158514
--- /dev/null
+++ b/helix-ipc/src/main/java/org/apache/helix/ipc/HelixIPCService.java
@@ -0,0 +1,49 @@
+package org.apache.helix.ipc;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import io.netty.buffer.ByteBuf;
+import org.apache.helix.resolver.HelixAddress;
+
+import java.util.UUID;
+
+/**
+ * Allows message passing among instances in Helix clusters.
+ * <p>
+ * Messages are sent asynchronously using {@link #send}, and handled by callbacks registered via
+ * {@link #registerCallback}
+ * </p>
+ */
+public interface HelixIPCService {
+
+  static final String IPC_PORT = "IPC_PORT";
+
+  /** Starts service (must call before {@link #send}) */
+  void start() throws Exception;
+
+  /** Shuts down service and releases any resources */
+  void shutdown() throws Exception;
+
+  /** Sends a message to one or more instances that map to a cluster scope. */
+  void send(HelixAddress destination, int messageType, UUID messageId, ByteBuf message);
+
+  /** Registers a callback for a given message type */
+  void registerCallback(int messageType, HelixIPCCallback callback);
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/f2475fa9/helix-ipc/src/main/java/org/apache/helix/ipc/netty/NettyHelixIPCService.java
----------------------------------------------------------------------
diff --git a/helix-ipc/src/main/java/org/apache/helix/ipc/netty/NettyHelixIPCService.java b/helix-ipc/src/main/java/org/apache/helix/ipc/netty/NettyHelixIPCService.java
new file mode 100644
index 0000000..00d6157
--- /dev/null
+++ b/helix-ipc/src/main/java/org/apache/helix/ipc/netty/NettyHelixIPCService.java
@@ -0,0 +1,490 @@
+package org.apache.helix.ipc.netty;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.CompositeByteBuf;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+import io.netty.handler.codec.LengthFieldPrepender;
+
+import java.net.InetSocketAddress;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.helix.ipc.HelixIPCCallback;
+import org.apache.helix.ipc.HelixIPCService;
+import org.apache.helix.resolver.HelixAddress;
+import org.apache.helix.resolver.HelixMessageScope;
+import org.apache.log4j.Logger;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.JmxReporter;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricRegistry;
+
+/**
+ * Provides partition/state-level messaging among nodes in a Helix cluster.
+ * <p>
+ * The message format is (where len == 4B, and contains the length of the next field)
+ * 
+ * <pre>
+ *      +----------------------+
+ *      | totalLength (4B)     |
+ *      +----------------------+
+ *      | version (4B)         |
+ *      +----------------------+
+ *      | messageType (4B)     |
+ *      +----------------------+
+ *      | messageId (16B)      |
+ *      +----------------------+
+ *      | len | cluster        |
+ *      +----------------------+
+ *      | len | resource       |
+ *      +----------------------+
+ *      | len | partition      |
+ *      +----------------------+
+ *      | len | state          |
+ *      +----------------------+
+ *      | len | srcInstance    |
+ *      +----------------------+
+ *      | len | dstInstance    |
+ *      +----------------------+
+ *      | len | message        |
+ *      +----------------------+
+ * </pre>
+ * 
+ * </p>
+ */
+public class NettyHelixIPCService implements HelixIPCService {
+
+  private static final Logger LOG = Logger.getLogger(NettyHelixIPCService.class);
+  private static final int MESSAGE_VERSION = 1;
+
+  // Parameters for length header field of message (tells decoder to interpret but preserve length
+  // field in message)
+  private static final int MAX_FRAME_LENGTH = 1024 * 1024;
+  private static final int LENGTH_FIELD_OFFSET = 0;
+  private static final int LENGTH_FIELD_LENGTH = 4;
+  private static final int LENGTH_ADJUSTMENT = -4;
+  private static final int INITIAL_BYTES_TO_STRIP = 0;
+  private static final int NUM_LENGTH_FIELDS = 7;
+
+  private final Config config;
+  private final AtomicBoolean isShutdown;
+  private final Map<InetSocketAddress, List<Channel>> channelMap;
+  private final ConcurrentMap<Channel, Long> channelOpenTimes;
+  private final MetricRegistry metricRegistry;
+  private final ConcurrentMap<Integer, HelixIPCCallback> callbacks;
+
+  private EventLoopGroup eventLoopGroup;
+  private Bootstrap clientBootstrap;
+  private Meter statTxMsg;
+  private Meter statRxMsg;
+  private Meter statTxBytes;
+  private Meter statRxBytes;
+  private Counter statChannelOpen;
+  private Counter statError;
+  private JmxReporter jmxReporter;
+
+  public NettyHelixIPCService(Config config) {
+    super();
+    this.config = config;
+    this.isShutdown = new AtomicBoolean(true);
+    this.channelMap = new HashMap<InetSocketAddress, List<Channel>>();
+    this.channelOpenTimes = new ConcurrentHashMap<Channel, Long>();
+    this.metricRegistry = new MetricRegistry();
+    this.callbacks = new ConcurrentHashMap<Integer, HelixIPCCallback>();
+  }
+
+  /**
+   * Starts message handling server, creates client bootstrap, and bootstraps partition routing
+   * table.
+   */
+  public void start() throws Exception {
+    if (isShutdown.getAndSet(false)) {
+      eventLoopGroup = new NioEventLoopGroup();
+
+      statTxMsg = metricRegistry.meter(MetricRegistry.name(NettyHelixIPCService.class, "txMsg"));
+      statRxMsg = metricRegistry.meter(MetricRegistry.name(NettyHelixIPCService.class, "rxMsg"));
+      statTxBytes =
+          metricRegistry.meter(MetricRegistry.name(NettyHelixIPCService.class, "txBytes"));
+      statRxBytes =
+          metricRegistry.meter(MetricRegistry.name(NettyHelixIPCService.class, "rxBytes"));
+      statChannelOpen =
+          metricRegistry.counter(MetricRegistry.name(NettyHelixIPCService.class, "channelOpen"));
+      statError = metricRegistry.counter(MetricRegistry.name(NettyHelixIPCService.class, "error"));
+
+      // Report metrics via JMX
+      jmxReporter = JmxReporter.forRegistry(metricRegistry).build();
+      jmxReporter.start();
+
+      new ServerBootstrap().group(eventLoopGroup).channel(NioServerSocketChannel.class)
+          .option(ChannelOption.SO_KEEPALIVE, true).childOption(ChannelOption.SO_KEEPALIVE, true)
+          .childHandler(new ChannelInitializer<SocketChannel>() {
+            @Override
+            protected void initChannel(SocketChannel socketChannel) throws Exception {
+              socketChannel.pipeline().addLast(
+                  new LengthFieldBasedFrameDecoder(MAX_FRAME_LENGTH, LENGTH_FIELD_OFFSET,
+                      LENGTH_FIELD_LENGTH, LENGTH_ADJUSTMENT, INITIAL_BYTES_TO_STRIP));
+              socketChannel.pipeline().addLast(new HelixIPCCallbackHandler());
+            }
+          }).bind(new InetSocketAddress(config.getPort()));
+
+      clientBootstrap =
+          new Bootstrap().group(eventLoopGroup).channel(NioSocketChannel.class)
+              .option(ChannelOption.SO_KEEPALIVE, true)
+              .handler(new ChannelInitializer<SocketChannel>() {
+                @Override
+                protected void initChannel(SocketChannel socketChannel) throws Exception {
+                  socketChannel.pipeline().addLast(
+                      new LengthFieldPrepender(LENGTH_FIELD_LENGTH, true));
+                }
+              });
+    }
+  }
+
+  /**
+   * Shuts down event loops for message handling server and message passing client.
+   */
+  public void shutdown() throws Exception {
+    if (!isShutdown.getAndSet(true)) {
+      jmxReporter.stop();
+      eventLoopGroup.shutdownGracefully();
+    }
+  }
+
+  /**
+   * Sends a message to all partitions with a given state in the cluster.
+   */
+  @Override
+  public void send(HelixAddress destination, int messageType, UUID messageId, ByteBuf message) {
+    // Send message
+    try {
+      // Get list of channels
+      List<Channel> channels = channelMap.get(destination.getSocketAddress());
+      if (channels == null) {
+        synchronized (channelMap) {
+          channels = channelMap.get(destination.getSocketAddress());
+          if (channels == null) {
+            channels = new ArrayList<Channel>(config.getNumConnections());
+            for (int i = 0; i < config.getNumConnections(); i++) {
+              channels.add(null);
+            }
+            channelMap.put(destination.getSocketAddress(), channels);
+          }
+        }
+      }
+
+      // Pick the channel for this scope
+      int idx = (Integer.MAX_VALUE & destination.getScope().hashCode()) % channels.size();
+      Channel channel = channels.get(idx);
+      if (channel == null || !channel.isOpen() || isExpired(channel)) {
+        synchronized (channelMap) {
+          channel = channels.get(idx);
+          if (channel == null || !channel.isOpen() || isExpired(channel)) {
+            if (channel != null && channel.isOpen()) {
+              channel.close();
+            }
+            channel = clientBootstrap.connect(destination.getSocketAddress()).sync().channel();
+            channels.set(idx, channel);
+            statChannelOpen.inc();
+            channelOpenTimes.put(channel, System.currentTimeMillis());
+          }
+        }
+      }
+
+      // Compute total length
+      int headerLength =
+          NUM_LENGTH_FIELDS
+              * (Integer.SIZE / 8)
+              + (Integer.SIZE / 8)
+              * 2 // version, type
+              + (Long.SIZE / 8)
+              * 2 // 128 bit UUID
+              + getLength(destination.getScope().getCluster())
+              + getLength(destination.getScope().getResource())
+              + getLength(destination.getScope().getPartition())
+              + getLength(destination.getScope().getState()) + getLength(config.getInstanceName())
+              + getLength(destination.getInstanceName());
+      int messageLength = message == null ? 0 : message.readableBytes();
+
+      // Build message header
+      ByteBuf headerBuf = channel.alloc().buffer(headerLength);
+      headerBuf.writeInt(MESSAGE_VERSION).writeInt(messageType)
+          .writeLong(messageId.getMostSignificantBits())
+          .writeLong(messageId.getLeastSignificantBits());
+      writeStringWithLength(headerBuf, destination.getScope().getCluster());
+      writeStringWithLength(headerBuf, destination.getScope().getResource());
+      writeStringWithLength(headerBuf, destination.getScope().getPartition());
+      writeStringWithLength(headerBuf, destination.getScope().getState());
+      writeStringWithLength(headerBuf, config.getInstanceName());
+      writeStringWithLength(headerBuf, destination.getInstanceName());
+
+      // Compose message header and payload
+      headerBuf.writeInt(messageLength);
+      CompositeByteBuf fullByteBuf = channel.alloc().compositeBuffer(2);
+      fullByteBuf.addComponent(headerBuf);
+      fullByteBuf.writerIndex(headerBuf.readableBytes());
+      if (message != null) {
+        fullByteBuf.addComponent(message);
+        fullByteBuf.writerIndex(fullByteBuf.writerIndex() + message.readableBytes());
+      }
+
+      // Send
+      statTxMsg.mark();
+      statTxBytes.mark(fullByteBuf.readableBytes());
+      channel.writeAndFlush(fullByteBuf);
+    } catch (Exception e) {
+      statError.inc();
+      throw new IllegalStateException("Could not send message to " + destination, e);
+    }
+  }
+
+  private boolean isExpired(Channel channel) {
+    Long channelOpenTime = channelOpenTimes.get(channel);
+    return channelOpenTime != null
+        && System.currentTimeMillis() - channelOpenTime >= config.getMaxChannelLifeMillis();
+  }
+
+  @Override
+  public void registerCallback(int messageType, HelixIPCCallback callback) {
+    callbacks.put(messageType, callback);
+  }
+
+  @ChannelHandler.Sharable
+  private class HelixIPCCallbackHandler extends SimpleChannelInboundHandler<ByteBuf> {
+
+    HelixIPCCallbackHandler() {
+      super(false); // we will manage reference
+    }
+
+    @Override
+    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf byteBuf) throws Exception {
+      try {
+        int idx = 0;
+
+        // Message length
+        int messageLength = byteBuf.readInt();
+        idx += 4;
+
+        // Message version
+        @SuppressWarnings("unused")
+        int messageVersion = byteBuf.readInt();
+        idx += 4;
+
+        // Message type
+        int messageType = byteBuf.readInt();
+        idx += 4;
+
+        // Message ID
+        UUID messageId = new UUID(byteBuf.readLong(), byteBuf.readLong());
+        idx += 16;
+
+        // Cluster
+        byteBuf.readerIndex(idx);
+        int clusterSize = byteBuf.readInt();
+        idx += 4;
+        checkLength("clusterSize", clusterSize, messageLength);
+        String clusterName = toNonEmptyString(clusterSize, byteBuf);
+        idx += clusterSize;
+
+        // Resource
+        byteBuf.readerIndex(idx);
+        int resourceSize = byteBuf.readInt();
+        idx += 4;
+        checkLength("resourceSize", resourceSize, messageLength);
+        String resourceName = toNonEmptyString(resourceSize, byteBuf);
+        idx += resourceSize;
+
+        // Partition
+        byteBuf.readerIndex(idx);
+        int partitionSize = byteBuf.readInt();
+        idx += 4;
+        checkLength("partitionSize", partitionSize, messageLength);
+        String partitionName = toNonEmptyString(partitionSize, byteBuf);
+        idx += partitionSize;
+
+        // State
+        byteBuf.readerIndex(idx);
+        int stateSize = byteBuf.readInt();
+        idx += 4;
+        checkLength("stateSize", stateSize, messageLength);
+        String state = toNonEmptyString(stateSize, byteBuf);
+        idx += stateSize;
+
+        // Source instance
+        byteBuf.readerIndex(idx);
+        int srcInstanceSize = byteBuf.readInt();
+        idx += 4;
+        checkLength("srcInstanceSize", srcInstanceSize, messageLength);
+        String srcInstance = toNonEmptyString(srcInstanceSize, byteBuf);
+        idx += srcInstanceSize;
+
+        // Destination instance
+        byteBuf.readerIndex(idx);
+        int dstInstanceSize = byteBuf.readInt();
+        idx += 4;
+        checkLength("dstInstanceSize", dstInstanceSize, messageLength);
+        String dstInstance = toNonEmptyString(dstInstanceSize, byteBuf);
+        idx += dstInstanceSize;
+
+        // Position at message
+        byteBuf.readerIndex(idx + 4);
+
+        // Error check
+        if (dstInstance == null) {
+          throw new IllegalStateException("Received message addressed to null destination from "
+              + srcInstance);
+        } else if (!dstInstance.equals(config.getInstanceName())) {
+          throw new IllegalStateException(config.getInstanceName()
+              + " received message addressed to " + dstInstance + " from " + srcInstance);
+        } else if (callbacks.get(messageType) == null) {
+          throw new IllegalStateException("No callback registered for message type " + messageType);
+        }
+
+        // Build scope
+        HelixMessageScope scope =
+            new HelixMessageScope.Builder().cluster(clusterName).resource(resourceName)
+                .partition(partitionName).state(state).sourceInstance(srcInstance).build();
+
+        // Get callback
+        HelixIPCCallback callback = callbacks.get(messageType);
+        if (callback == null) {
+          throw new IllegalStateException("No callback registered for message type " + messageType);
+        }
+
+        // Handle callback
+        callback.onMessage(scope, messageId, byteBuf);
+
+        // Stats
+        statRxMsg.mark();
+        statRxBytes.mark(messageLength);
+      } finally {
+        byteBuf.release();
+      }
+    }
+
+    @Override
+    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable cause) {
+      LOG.error(cause);
+    }
+  }
+
+  /** Given a byte buf w/ a certain reader index, encodes the next length bytes as a String */
+  private static String toNonEmptyString(int length, ByteBuf byteBuf) {
+    if (byteBuf.readableBytes() >= length) {
+      return byteBuf.toString(byteBuf.readerIndex(), length, Charset.defaultCharset());
+    }
+    return null;
+  }
+
+  /** Writes [s.length(), s] to buf, or [0] if s is null */
+  private static void writeStringWithLength(ByteBuf buf, String s) {
+    if (s == null) {
+      buf.writeInt(0);
+      return;
+    }
+
+    buf.writeInt(s.length());
+    for (int i = 0; i < s.length(); i++) {
+      buf.writeByte(s.charAt(i));
+    }
+  }
+
+  /** Returns the length of a string, or 0 if s is null */
+  private static int getLength(String s) {
+    return s == null ? 0 : s.length();
+  }
+
+  /**
+   * @throws java.lang.IllegalArgumentException if length > messageLength (attempt to prevent OOM
+   *           exceptions)
+   */
+  private static void checkLength(String fieldName, int length, int messageLength)
+      throws IllegalArgumentException {
+    if (length > messageLength) {
+      throw new IllegalArgumentException(fieldName + "=" + length
+          + " is greater than messageLength=" + messageLength);
+    }
+  }
+
+  public static class Config {
+    private String instanceName;
+    private int port;
+    private int numConnections = 1;
+    private long maxChannelLifeMillis = 5000;
+
+    public Config setInstanceName(String instanceName) {
+      this.instanceName = instanceName;
+      return this;
+    }
+
+    public Config setPort(int port) {
+      this.port = port;
+      return this;
+    }
+
+    public Config setNumConnections(int numConnections) {
+      this.numConnections = numConnections;
+      return this;
+    }
+
+    public Config setMaxChannelLifeMillis(long maxChannelLifeMillis) {
+      this.maxChannelLifeMillis = maxChannelLifeMillis;
+      return this;
+    }
+
+    public String getInstanceName() {
+      return instanceName;
+    }
+
+    public int getPort() {
+      return port;
+    }
+
+    public int getNumConnections() {
+      return numConnections;
+    }
+
+    public long getMaxChannelLifeMillis() {
+      return maxChannelLifeMillis;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/f2475fa9/helix-ipc/src/main/java/org/apache/helix/resolver/AbstractHelixResolver.java
----------------------------------------------------------------------
diff --git a/helix-ipc/src/main/java/org/apache/helix/resolver/AbstractHelixResolver.java b/helix-ipc/src/main/java/org/apache/helix/resolver/AbstractHelixResolver.java
new file mode 100644
index 0000000..c0fd2bb
--- /dev/null
+++ b/helix-ipc/src/main/java/org/apache/helix/resolver/AbstractHelixResolver.java
@@ -0,0 +1,295 @@
+package org.apache.helix.resolver;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.net.InetSocketAddress;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.log4j.Logger;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+/**
+ * A basic implementation of a resolver in terms of expiring routing tables
+ */
+public abstract class AbstractHelixResolver implements HelixResolver {
+  private static final Logger LOG = Logger.getLogger(AbstractHelixResolver.class);
+  private static final int DEFAULT_THREAD_POOL_SIZE = 10;
+  private static final long DEFAULT_LEASE_LENGTH_MS = 60 * 60 * 1000; // TODO: are these good
+                                                                      // values?
+  private static final String IPC_PORT = "IPC_PORT";
+  private final Map<String, Spectator> _connections;
+  private boolean _isConnected;
+  private ScheduledExecutorService _executor;
+
+  protected AbstractHelixResolver() {
+    _connections = Maps.newHashMap();
+    _isConnected = false;
+  }
+
+  @Override
+  public void connect() {
+    _executor = Executors.newScheduledThreadPool(DEFAULT_THREAD_POOL_SIZE);
+    _isConnected = true;
+  }
+
+  @Override
+  public void disconnect() {
+    synchronized (_connections) {
+      for (Spectator connection : _connections.values()) {
+        connection.shutdown();
+      }
+      _connections.clear();
+    }
+    _executor.shutdown();
+    _isConnected = false;
+  }
+
+  @Override
+  public Set<HelixAddress> getDestinations(HelixMessageScope scope) {
+    if (!scope.isValid()) {
+      LOG.error("Scope " + scope + " is not valid!");
+      return new HashSet<HelixAddress>();
+    } else if (!_isConnected) {
+      LOG.error("Cannot resolve " + scope + " without first connecting!");
+      return new HashSet<HelixAddress>();
+    }
+
+    // Connect or refresh connection
+    String cluster = scope.getCluster();
+    ResolverRoutingTable routingTable;
+    Spectator connection = _connections.get(cluster);
+    if (connection == null || !connection.getManager().isConnected()) {
+      synchronized (_connections) {
+        connection = _connections.get(cluster);
+        if (connection == null || !connection.getManager().isConnected()) {
+          connection = new Spectator(cluster, DEFAULT_LEASE_LENGTH_MS);
+          connection.init();
+          _connections.put(cluster, connection);
+        }
+      }
+    }
+    routingTable = connection.getRoutingTable();
+
+    // Resolve all resources, either explicitly or match all
+    Set<String> resources;
+    if (scope.getResource() != null) {
+      resources = Sets.newHashSet(scope.getResource());
+    } else {
+      resources = routingTable.getResources();
+    }
+
+    // Resolve all partitions
+    Map<String, Set<String>> partitionMap = Maps.newHashMap();
+    if (scope.getPartition() != null) {
+      for (String resource : resources) {
+        partitionMap.put(resource, Sets.newHashSet(scope.getPartition()));
+      }
+    } else {
+      for (String resource : resources) {
+        partitionMap.put(resource, routingTable.getPartitions(resource));
+      }
+    }
+
+    // Resolve all states
+    Set<String> states;
+    if (scope.getState() != null) {
+      states = Sets.newHashSet(scope.getState());
+    } else {
+      states = routingTable.getStates();
+    }
+
+    // Get all the participants that match
+    Set<InstanceConfig> participants = Sets.newHashSet();
+    for (String resource : resources) {
+      for (String partition : partitionMap.get(resource)) {
+        for (String state : states) {
+          participants.addAll(routingTable.getInstances(resource, partition, state));
+        }
+      }
+    }
+
+    // Resolve those participants
+    Set<HelixAddress> result = new HashSet<HelixAddress>();
+    for (InstanceConfig participant : participants) {
+      String ipcPort = participant.getRecord().getSimpleField(IPC_PORT);
+      if (ipcPort == null) {
+        LOG.error("No ipc address registered for target instance " + participant.getInstanceName()
+            + ", skipping");
+      } else {
+        result.add(new HelixAddress(scope, participant.getInstanceName(), new InetSocketAddress(
+            participant.getHostName(), Integer.valueOf(ipcPort))));
+      }
+    }
+
+    return result;
+  }
+
+  @Override
+  public HelixAddress getSource(HelixMessageScope scope) {
+    // Connect or refresh connection
+    String cluster = scope.getCluster();
+    ResolverRoutingTable routingTable;
+    Spectator connection = _connections.get(cluster);
+    if (connection == null || !connection.getManager().isConnected()) {
+      synchronized (_connections) {
+        connection = _connections.get(cluster);
+        if (connection == null || !connection.getManager().isConnected()) {
+          connection = new Spectator(cluster, DEFAULT_LEASE_LENGTH_MS);
+          connection.init();
+          _connections.put(cluster, connection);
+        }
+      }
+    }
+    routingTable = connection.getRoutingTable();
+
+    if (scope.getSourceInstance() != null) {
+      InstanceConfig config = routingTable.getInstanceConfig(scope.getSourceInstance());
+      String ipcPort = config.getRecord().getSimpleField(IPC_PORT);
+      if (ipcPort == null) {
+        throw new IllegalStateException("No IPC address registered for source instance "
+            + scope.getSourceInstance());
+      }
+      return new HelixAddress(scope, scope.getSourceInstance(), new InetSocketAddress(
+          config.getHostName(), Integer.valueOf(ipcPort)));
+    }
+
+    return null;
+  }
+
+  @Override
+  public boolean isConnected() {
+    return _isConnected;
+  }
+
+  /**
+   * Create a Helix manager connection based on the appropriate backing store
+   * @param cluster the name of the cluster to connect to
+   * @return HelixManager instance
+   */
+  protected abstract HelixManager createManager(String cluster);
+
+  private class Spectator {
+    private final String _cluster;
+    private final HelixManager _manager;
+    private final ResolverRoutingTable _routingTable;
+    private final long _leaseLengthMs;
+    private ScheduledFuture<?> _future;
+
+    /**
+     * Initialize a spectator. This does not automatically connect.
+     * @param cluster the cluster to spectate
+     * @param leaseLengthMs the expiry of this spectator after the last request
+     */
+    public Spectator(String cluster, long leaseLengthMs) {
+      _cluster = cluster;
+      _manager = createManager(cluster);
+      _leaseLengthMs = leaseLengthMs;
+      _routingTable = new ResolverRoutingTable();
+    }
+
+    /**
+     * Connect and initialize the routing table
+     */
+    public void init() {
+      try {
+        _manager.connect();
+        _manager.addExternalViewChangeListener(_routingTable);
+        _manager.addInstanceConfigChangeListener(_routingTable);
+
+        // Force an initial refresh
+        HelixDataAccessor accessor = _manager.getHelixDataAccessor();
+        List<ExternalView> externalViews =
+            accessor.getChildValues(accessor.keyBuilder().externalViews());
+        NotificationContext context = new NotificationContext(_manager);
+        context.setType(NotificationContext.Type.INIT);
+        _routingTable.onExternalViewChange(externalViews, context);
+        List<InstanceConfig> instanceConfigs =
+            accessor.getChildValues(accessor.keyBuilder().instanceConfigs());
+        _routingTable.onInstanceConfigChange(instanceConfigs, context);
+      } catch (Exception e) {
+        LOG.error("Error setting up routing table", e);
+      }
+    }
+
+    /**
+     * Clean up the connection to the spectated cluster
+     */
+    public void shutdown() {
+      resetFuture();
+      expire();
+    }
+
+    /**
+     * Get the dynamically-updating routing table for this cluster
+     * @return ResolverRoutingTable, a RoutingTableProvider that can answer questions about its
+     *         contents
+     */
+    public ResolverRoutingTable getRoutingTable() {
+      renew();
+      return _routingTable;
+    }
+
+    public HelixManager getManager() {
+      return _manager;
+    }
+
+    private synchronized void renew() {
+      resetFuture();
+
+      // Schedule this connection to expire if not renewed quickly enough
+      _future = _executor.schedule(new Runnable() {
+        @Override
+        public void run() {
+          expire();
+        }
+      }, _leaseLengthMs, TimeUnit.MILLISECONDS);
+
+    }
+
+    private synchronized void resetFuture() {
+      if (_future != null && !_future.isDone()) {
+        _future.cancel(true);
+      }
+    }
+
+    private void expire() {
+      synchronized (_connections) {
+        _connections.remove(_cluster);
+        if (_manager != null && _manager.isConnected()) {
+          _manager.disconnect();
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/f2475fa9/helix-ipc/src/main/java/org/apache/helix/resolver/HelixAddress.java
----------------------------------------------------------------------
diff --git a/helix-ipc/src/main/java/org/apache/helix/resolver/HelixAddress.java b/helix-ipc/src/main/java/org/apache/helix/resolver/HelixAddress.java
new file mode 100644
index 0000000..657e8bd
--- /dev/null
+++ b/helix-ipc/src/main/java/org/apache/helix/resolver/HelixAddress.java
@@ -0,0 +1,70 @@
+package org.apache.helix.resolver;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.base.Objects;
+
+import java.net.InetSocketAddress;
+
+public class HelixAddress {
+
+  private final HelixMessageScope scope;
+  private final String instanceName;
+  private final InetSocketAddress socketAddress;
+
+  public HelixAddress(HelixMessageScope scope, String instanceName, InetSocketAddress socketAddress) {
+    this.scope = scope;
+    this.instanceName = instanceName;
+    this.socketAddress = socketAddress;
+  }
+
+  public HelixMessageScope getScope() {
+    return scope;
+  }
+
+  public String getInstanceName() {
+    return instanceName;
+  }
+
+  public InetSocketAddress getSocketAddress() {
+    return socketAddress;
+  }
+
+  @Override
+  public String toString() {
+    return Objects.toStringHelper(this).addValue(scope).addValue(instanceName)
+        .addValue(socketAddress).toString();
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hashCode(scope, instanceName, socketAddress);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (!(o instanceof HelixAddress)) {
+      return false;
+    }
+    HelixAddress a = (HelixAddress) o;
+    return a.getScope().equals(scope) && a.getInstanceName().equals(instanceName)
+        && a.getSocketAddress().equals(socketAddress);
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/f2475fa9/helix-ipc/src/main/java/org/apache/helix/resolver/HelixMessageScope.java
----------------------------------------------------------------------
diff --git a/helix-ipc/src/main/java/org/apache/helix/resolver/HelixMessageScope.java b/helix-ipc/src/main/java/org/apache/helix/resolver/HelixMessageScope.java
new file mode 100644
index 0000000..ba06556
--- /dev/null
+++ b/helix-ipc/src/main/java/org/apache/helix/resolver/HelixMessageScope.java
@@ -0,0 +1,151 @@
+package org.apache.helix.resolver;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.base.Objects;
+
+/**
+ * A definition of the addressing scope of a message.
+ */
+public class HelixMessageScope {
+  private final String _cluster;
+  private final String _resource;
+  private final String _partition;
+  private final String _state;
+  private final String _srcInstance;
+
+  private HelixMessageScope(String cluster, String resource, String partition, String state,
+      String srcInstance) {
+    _cluster = cluster;
+    _resource = resource;
+    _partition = partition;
+    _state = state;
+    _srcInstance = srcInstance;
+  }
+
+  @Override
+  public String toString() {
+    return Objects.toStringHelper(this).addValue(_cluster).addValue(_resource).addValue(_partition)
+        .addValue(_state).toString();
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hashCode(_cluster, _resource, _partition, _state);
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (other instanceof HelixMessageScope) {
+      HelixMessageScope that = (HelixMessageScope) other;
+      return Objects.equal(_cluster, that._cluster) && Objects.equal(_resource, that._resource)
+          && Objects.equal(_partition, that._partition) && Objects.equal(_state, that._state);
+    }
+    return false;
+  }
+
+  public String getCluster() {
+    return _cluster;
+  }
+
+  public String getResource() {
+    return _resource;
+  }
+
+  public String getPartition() {
+    return _partition;
+  }
+
+  public String getState() {
+    return _state;
+  }
+
+  public String getSourceInstance() {
+    return _srcInstance;
+  }
+
+  public boolean isValid() {
+    return _cluster != null && ((_partition != null && _resource != null) || _partition == null);
+  }
+
+  /**
+   * Creator for a HelixMessageScope
+   */
+  public static class Builder {
+    private String _cluster;
+    private String _resource;
+    private String _partition;
+    private String _state;
+    private String _sourceInstance;
+
+    /**
+     * Associate the scope with a cluster
+     * @param cluster the cluster to scope routing to
+     * @return Builder
+     */
+    public Builder cluster(String cluster) {
+      _cluster = cluster;
+      return this;
+    }
+
+    /**
+     * Associate the scope with a resource
+     * @param resource a resource served by the cluster
+     * @return Builder
+     */
+    public Builder resource(String resource) {
+      _resource = resource;
+      return this;
+    }
+
+    /**
+     * Associate the scope with a partition
+     * @param partition a specific partition of the scoped resource
+     * @return Builder
+     */
+    public Builder partition(String partition) {
+      _partition = partition;
+      return this;
+    }
+
+    /**
+     * Associate the scope with a state
+     * @param state a state that a resource in the cluster can be in
+     * @return Builder
+     */
+    public Builder state(String state) {
+      _state = state;
+      return this;
+    }
+
+    public Builder sourceInstance(String sourceInstance) {
+      _sourceInstance = sourceInstance;
+      return this;
+    }
+
+    /**
+     * Create the scope
+     * @return HelixMessageScope instance corresponding to the built scope
+     */
+    public HelixMessageScope build() {
+      return new HelixMessageScope(_cluster, _resource, _partition, _state, _sourceInstance);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/f2475fa9/helix-ipc/src/main/java/org/apache/helix/resolver/HelixResolver.java
----------------------------------------------------------------------
diff --git a/helix-ipc/src/main/java/org/apache/helix/resolver/HelixResolver.java b/helix-ipc/src/main/java/org/apache/helix/resolver/HelixResolver.java
new file mode 100644
index 0000000..b7f2f3e
--- /dev/null
+++ b/helix-ipc/src/main/java/org/apache/helix/resolver/HelixResolver.java
@@ -0,0 +1,47 @@
+package org.apache.helix.resolver;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Set;
+
+/**
+ * An interface that resolves a message scope to a direct address.
+ */
+public interface HelixResolver {
+  /**
+   * Initialize a connection for scope resolution.
+   */
+  void connect();
+
+  /**
+   * Tear down any state and open connections to Helix clusters.
+   */
+  void disconnect();
+
+  /**
+   * Check the connection status
+   * @return true if connected, false otherwise
+   */
+  boolean isConnected();
+
+  Set<HelixAddress> getDestinations(HelixMessageScope scope);
+
+  HelixAddress getSource(HelixMessageScope scope);
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/f2475fa9/helix-ipc/src/main/java/org/apache/helix/resolver/ResolverRoutingTable.java
----------------------------------------------------------------------
diff --git a/helix-ipc/src/main/java/org/apache/helix/resolver/ResolverRoutingTable.java b/helix-ipc/src/main/java/org/apache/helix/resolver/ResolverRoutingTable.java
new file mode 100644
index 0000000..2bc5413
--- /dev/null
+++ b/helix-ipc/src/main/java/org/apache/helix/resolver/ResolverRoutingTable.java
@@ -0,0 +1,92 @@
+package org.apache.helix.resolver;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.NotificationContext;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.spectator.RoutingTableProvider;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+/**
+ * A routing table that can also return all resources, partitions, and states in the cluster
+ */
+public class ResolverRoutingTable extends RoutingTableProvider {
+  Map<String, Set<String>> _resourceMap;
+  Set<String> _stateSet;
+
+  /**
+   * Create the table.
+   */
+  public ResolverRoutingTable() {
+    super();
+    _resourceMap = Maps.newHashMap();
+    _stateSet = Sets.newHashSet();
+  }
+
+  /**
+   * Get all resources that are currently served in the cluster.
+   * @return set of resource names
+   */
+  public synchronized Set<String> getResources() {
+    return Sets.newHashSet(_resourceMap.keySet());
+  }
+
+  /**
+   * Get all partitions currently served for a resource.
+   * @param resource the resource for which to look up partitions
+   * @return set of partition names
+   */
+  public synchronized Set<String> getPartitions(String resource) {
+    if (_resourceMap.containsKey(resource)) {
+      return Sets.newHashSet(_resourceMap.get(resource));
+    } else {
+      return Collections.emptySet();
+    }
+  }
+
+  /**
+   * Get all states that partitions of all resources are currently in
+   * @return set of state names
+   */
+  public synchronized Set<String> getStates() {
+    return Sets.newHashSet(_stateSet);
+  }
+
+  @Override
+  public synchronized void onExternalViewChange(List<ExternalView> externalViewList,
+      NotificationContext changeContext) {
+    super.onExternalViewChange(externalViewList, changeContext);
+    _resourceMap.clear();
+    _stateSet.clear();
+    for (ExternalView externalView : externalViewList) {
+      _resourceMap.put(externalView.getResourceName(), externalView.getPartitionSet());
+      for (String partition : externalView.getPartitionSet()) {
+        _stateSet.addAll(externalView.getStateMap(partition).values());
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/f2475fa9/helix-ipc/src/main/java/org/apache/helix/resolver/zk/ZKHelixResolver.java
----------------------------------------------------------------------
diff --git a/helix-ipc/src/main/java/org/apache/helix/resolver/zk/ZKHelixResolver.java b/helix-ipc/src/main/java/org/apache/helix/resolver/zk/ZKHelixResolver.java
new file mode 100644
index 0000000..48fa81a
--- /dev/null
+++ b/helix-ipc/src/main/java/org/apache/helix/resolver/zk/ZKHelixResolver.java
@@ -0,0 +1,45 @@
+package org.apache.helix.resolver.zk;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.resolver.AbstractHelixResolver;
+
+/**
+ * A ZooKeeper-specific {@link org.apache.helix.resolver.HelixResolver}
+ */
+public class ZKHelixResolver extends AbstractHelixResolver {
+  private final String _zkAddress;
+
+  /**
+   * Create a ZK-based Helix resolver
+   * @param zkConnectString the connection string to the ZooKeeper ensemble
+   */
+  public ZKHelixResolver(String zkConnectString) {
+    _zkAddress = zkConnectString;
+  }
+
+  @Override
+  protected HelixManager createManager(String cluster) {
+    return HelixManagerFactory.getZKHelixManager(cluster, null, InstanceType.SPECTATOR, _zkAddress);
+  }
+}


[3/4] git commit: Removed extraneous files in helix-ipc

Posted by ka...@apache.org.
Removed extraneous files in helix-ipc

Removed helix-ipc/LICENSE as per @hsaputra recommendation.

Removed some scripts intended to generate benchmark executables.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/3bd83a3c
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/3bd83a3c
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/3bd83a3c

Branch: refs/heads/master
Commit: 3bd83a3c090a004337b8f6900caeed20de5d4cde
Parents: f2475fa
Author: Greg Brandt <br...@gmail.com>
Authored: Tue Aug 26 15:57:03 2014 -0700
Committer: Greg Brandt <br...@gmail.com>
Committed: Tue Aug 26 15:57:03 2014 -0700

----------------------------------------------------------------------
 helix-ipc/LICENSE                               | 273 -------------------
 helix-ipc/src/test/resources/build_benchmark.sh |  29 --
 helix-ipc/src/test/resources/run_benchmark.sh   |  37 ---
 3 files changed, 339 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/3bd83a3c/helix-ipc/LICENSE
----------------------------------------------------------------------
diff --git a/helix-ipc/LICENSE b/helix-ipc/LICENSE
deleted file mode 100644
index 413913f..0000000
--- a/helix-ipc/LICENSE
+++ /dev/null
@@ -1,273 +0,0 @@
-
-                                 Apache License
-                           Version 2.0, January 2004
-                        http://www.apache.org/licenses/
-
-   TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
-
-   1. Definitions.
-
-      "License" shall mean the terms and conditions for use, reproduction,
-      and distribution as defined by Sections 1 through 9 of this document.
-
-      "Licensor" shall mean the copyright owner or entity authorized by
-      the copyright owner that is granting the License.
-
-      "Legal Entity" shall mean the union of the acting entity and all
-      other entities that control, are controlled by, or are under common
-      control with that entity. For the purposes of this definition,
-      "control" means (i) the power, direct or indirect, to cause the
-      direction or management of such entity, whether by contract or
-      otherwise, or (ii) ownership of fifty percent (50%) or more of the
-      outstanding shares, or (iii) beneficial ownership of such entity.
-
-      "You" (or "Your") shall mean an individual or Legal Entity
-      exercising permissions granted by this License.
-
-      "Source" form shall mean the preferred form for making modifications,
-      including but not limited to software source code, documentation
-      source, and configuration files.
-
-      "Object" form shall mean any form resulting from mechanical
-      transformation or translation of a Source form, including but
-      not limited to compiled object code, generated documentation,
-      and conversions to other media types.
-
-      "Work" shall mean the work of authorship, whether in Source or
-      Object form, made available under the License, as indicated by a
-      copyright notice that is included in or attached to the work
-      (an example is provided in the Appendix below).
-
-      "Derivative Works" shall mean any work, whether in Source or Object
-      form, that is based on (or derived from) the Work and for which the
-      editorial revisions, annotations, elaborations, or other modifications
-      represent, as a whole, an original work of authorship. For the purposes
-      of this License, Derivative Works shall not include works that remain
-      separable from, or merely link (or bind by name) to the interfaces of,
-      the Work and Derivative Works thereof.
-
-      "Contribution" shall mean any work of authorship, including
-      the original version of the Work and any modifications or additions
-      to that Work or Derivative Works thereof, that is intentionally
-      submitted to Licensor for inclusion in the Work by the copyright owner
-      or by an individual or Legal Entity authorized to submit on behalf of
-      the copyright owner. For the purposes of this definition, "submitted"
-      means any form of electronic, verbal, or written communication sent
-      to the Licensor or its representatives, including but not limited to
-      communication on electronic mailing lists, source code control systems,
-      and issue tracking systems that are managed by, or on behalf of, the
-      Licensor for the purpose of discussing and improving the Work, but
-      excluding communication that is conspicuously marked or otherwise
-      designated in writing by the copyright owner as "Not a Contribution."
-
-      "Contributor" shall mean Licensor and any individual or Legal Entity
-      on behalf of whom a Contribution has been received by Licensor and
-      subsequently incorporated within the Work.
-
-   2. Grant of Copyright License. Subject to the terms and conditions of
-      this License, each Contributor hereby grants to You a perpetual,
-      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
-      copyright license to reproduce, prepare Derivative Works of,
-      publicly display, publicly perform, sublicense, and distribute the
-      Work and such Derivative Works in Source or Object form.
-
-   3. Grant of Patent License. Subject to the terms and conditions of
-      this License, each Contributor hereby grants to You a perpetual,
-      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
-      (except as stated in this section) patent license to make, have made,
-      use, offer to sell, sell, import, and otherwise transfer the Work,
-      where such license applies only to those patent claims licensable
-      by such Contributor that are necessarily infringed by their
-      Contribution(s) alone or by combination of their Contribution(s)
-      with the Work to which such Contribution(s) was submitted. If You
-      institute patent litigation against any entity (including a
-      cross-claim or counterclaim in a lawsuit) alleging that the Work
-      or a Contribution incorporated within the Work constitutes direct
-      or contributory patent infringement, then any patent licenses
-      granted to You under this License for that Work shall terminate
-      as of the date such litigation is filed.
-
-   4. Redistribution. You may reproduce and distribute copies of the
-      Work or Derivative Works thereof in any medium, with or without
-      modifications, and in Source or Object form, provided that You
-      meet the following conditions:
-
-      (a) You must give any other recipients of the Work or
-          Derivative Works a copy of this License; and
-
-      (b) You must cause any modified files to carry prominent notices
-          stating that You changed the files; and
-
-      (c) You must retain, in the Source form of any Derivative Works
-          that You distribute, all copyright, patent, trademark, and
-          attribution notices from the Source form of the Work,
-          excluding those notices that do not pertain to any part of
-          the Derivative Works; and
-
-      (d) If the Work includes a "NOTICE" text file as part of its
-          distribution, then any Derivative Works that You distribute must
-          include a readable copy of the attribution notices contained
-          within such NOTICE file, excluding those notices that do not
-          pertain to any part of the Derivative Works, in at least one
-          of the following places: within a NOTICE text file distributed
-          as part of the Derivative Works; within the Source form or
-          documentation, if provided along with the Derivative Works; or,
-          within a display generated by the Derivative Works, if and
-          wherever such third-party notices normally appear. The contents
-          of the NOTICE file are for informational purposes only and
-          do not modify the License. You may add Your own attribution
-          notices within Derivative Works that You distribute, alongside
-          or as an addendum to the NOTICE text from the Work, provided
-          that such additional attribution notices cannot be construed
-          as modifying the License.
-
-      You may add Your own copyright statement to Your modifications and
-      may provide additional or different license terms and conditions
-      for use, reproduction, or distribution of Your modifications, or
-      for any such Derivative Works as a whole, provided Your use,
-      reproduction, and distribution of the Work otherwise complies with
-      the conditions stated in this License.
-
-   5. Submission of Contributions. Unless You explicitly state otherwise,
-      any Contribution intentionally submitted for inclusion in the Work
-      by You to the Licensor shall be under the terms and conditions of
-      this License, without any additional terms or conditions.
-      Notwithstanding the above, nothing herein shall supersede or modify
-      the terms of any separate license agreement you may have executed
-      with Licensor regarding such Contributions.
-
-   6. Trademarks. This License does not grant permission to use the trade
-      names, trademarks, service marks, or product names of the Licensor,
-      except as required for reasonable and customary use in describing the
-      origin of the Work and reproducing the content of the NOTICE file.
-
-   7. Disclaimer of Warranty. Unless required by applicable law or
-      agreed to in writing, Licensor provides the Work (and each
-      Contributor provides its Contributions) on an "AS IS" BASIS,
-      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
-      implied, including, without limitation, any warranties or conditions
-      of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
-      PARTICULAR PURPOSE. You are solely responsible for determining the
-      appropriateness of using or redistributing the Work and assume any
-      risks associated with Your exercise of permissions under this License.
-
-   8. Limitation of Liability. In no event and under no legal theory,
-      whether in tort (including negligence), contract, or otherwise,
-      unless required by applicable law (such as deliberate and grossly
-      negligent acts) or agreed to in writing, shall any Contributor be
-      liable to You for damages, including any direct, indirect, special,
-      incidental, or consequential damages of any character arising as a
-      result of this License or out of the use or inability to use the
-      Work (including but not limited to damages for loss of goodwill,
-      work stoppage, computer failure or malfunction, or any and all
-      other commercial damages or losses), even if such Contributor
-      has been advised of the possibility of such damages.
-
-   9. Accepting Warranty or Additional Liability. While redistributing
-      the Work or Derivative Works thereof, You may choose to offer,
-      and charge a fee for, acceptance of support, warranty, indemnity,
-      or other liability obligations and/or rights consistent with this
-      License. However, in accepting such obligations, You may act only
-      on Your own behalf and on Your sole responsibility, not on behalf
-      of any other Contributor, and only if You agree to indemnify,
-      defend, and hold each Contributor harmless for any liability
-      incurred by, or claims asserted against, such Contributor by reason
-      of your accepting any such warranty or additional liability.
-
-   END OF TERMS AND CONDITIONS
-
-   APPENDIX: How to apply the Apache License to your work.
-
-      To apply the Apache License to your work, attach the following
-      boilerplate notice, with the fields enclosed by brackets "[]"
-      replaced with your own identifying information. (Don't include
-      the brackets!)  The text should be enclosed in the appropriate
-      comment syntax for the file format. We also recommend that a
-      file or class name and description of purpose be included on the
-      same "printed page" as the copyright notice for easier
-      identification within third-party archives.
-
-   Copyright [yyyy] [name of copyright owner]
-
-   Licensed under the Apache License, Version 2.0 (the "License");
-   you may not use this file except in compliance with the License.
-   You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
-   Unless required by applicable law or agreed to in writing, software
-   distributed under the License is distributed on an "AS IS" BASIS,
-   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-   See the License for the specific language governing permissions and
-   limitations under the License.
-
-
-
-For xstream:
-
-Copyright (c) 2003-2006, Joe Walnes
-Copyright (c) 2006-2009, 2011 XStream Committers
-All rights reserved.
-
-Redistribution and use in source and binary forms, with or without
-modification, are permitted provided that the following conditions are met:
-
-1. Redistributions of source code must retain the above copyright notice, this list of
-conditions and the following disclaimer.
-
-2. Redistributions in binary form must reproduce the above copyright notice, this list of
-conditions and the following disclaimer in the documentation and/or other materials provided
-with the distribution.
-
-3. Neither the name of XStream nor the names of its contributors may be used to endorse
-or promote products derived from this software without specific prior written
-permission.
-
-THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
-EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
-OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT
-SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
-INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
-TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
-BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
-CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY
-WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH
-DAMAGE.
-
-for jline:
-
-Copyright (c) 2002-2006, Marc Prud'hommeaux <mw...@cornell.edu>
-All rights reserved.
-
-Redistribution and use in source and binary forms, with or
-without modification, are permitted provided that the following
-conditions are met:
-
-Redistributions of source code must retain the above copyright
-notice, this list of conditions and the following disclaimer.
-
-Redistributions in binary form must reproduce the above copyright
-notice, this list of conditions and the following disclaimer
-in the documentation and/or other materials provided with
-the distribution.
-
-Neither the name of JLine nor the names of its contributors
-may be used to endorse or promote products derived from this
-software without specific prior written permission.
-
-THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING,
-BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
-AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
-EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE
-FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY,
-OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
-PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
-AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
-LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING
-IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
-OF THE POSSIBILITY OF SUCH DAMAGE.
-
-
-

http://git-wip-us.apache.org/repos/asf/helix/blob/3bd83a3c/helix-ipc/src/test/resources/build_benchmark.sh
----------------------------------------------------------------------
diff --git a/helix-ipc/src/test/resources/build_benchmark.sh b/helix-ipc/src/test/resources/build_benchmark.sh
deleted file mode 100644
index 8b94e38..0000000
--- a/helix-ipc/src/test/resources/build_benchmark.sh
+++ /dev/null
@@ -1,29 +0,0 @@
-#!/bin/bash
-
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-cd helix-ipc
-mvn package -DskipTests
-mkdir -p /tmp/ipc-benchmark
-cp target/helix-ipc-0.7-1-jar-with-dependencies.jar /tmp/ipc-benchmark
-cp target/helix-ipc-0.7.1-tests.jar /tmp/ipc-benchmark
-cp src/test/resources/run_benchmark.sh /tmp/ipc-benchmark
-cd /tmp
-tar cvzf ipc-benchmark.tar.gz ipc-benchmark

http://git-wip-us.apache.org/repos/asf/helix/blob/3bd83a3c/helix-ipc/src/test/resources/run_benchmark.sh
----------------------------------------------------------------------
diff --git a/helix-ipc/src/test/resources/run_benchmark.sh b/helix-ipc/src/test/resources/run_benchmark.sh
deleted file mode 100644
index 0b9373d..0000000
--- a/helix-ipc/src/test/resources/run_benchmark.sh
+++ /dev/null
@@ -1,37 +0,0 @@
-#!/bin/bash
-
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-$JAVA_HOME/bin/java -cp helix-ipc-0.7.1-jar-with-dependencies.jar:helix-ipc-0.7.1-tests.jar \
-    -Xloggc:gc_$JMX_PORT.log \
-    -XX:+UseConcMarkSweepGC \
-    -XX:+UseCMSInitiatingOccupancyOnly \
-    -XX:CMSInitiatingOccupancyFraction=65 \
-    -XX:NewRatio=2 \
-    -Xmx4g \
-    -Xms4g \
-    -Dcom.sun.management.jmxremote \
-    -Dcom.sun.management.jmxremote.port=$JMX_PORT \
-    -Dcom.sun.management.jmxremote.authenticate=false \
-    -Dcom.sun.management.jmxremote.ssl=false \
-    -Dio.netty.resourceLeakDetection \
-    -Dio.netty.allocator.type=pooled \
-    -Dio.netty.noPreferDirect=false \
-    org.apache.helix.ipc.benchmark.BenchmarkDriver $@