You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@twill.apache.org by ch...@apache.org on 2013/12/13 23:27:38 UTC

[22/28] [TWILL-14] Bootstrapping for the site generation. Reorganization of the source tree happens:

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/core/src/test/java/org/apache/twill/internal/ControllerTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/twill/internal/ControllerTest.java b/core/src/test/java/org/apache/twill/internal/ControllerTest.java
deleted file mode 100644
index 382dc95..0000000
--- a/core/src/test/java/org/apache/twill/internal/ControllerTest.java
+++ /dev/null
@@ -1,211 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal;
-
-import org.apache.twill.api.Command;
-import org.apache.twill.api.ResourceReport;
-import org.apache.twill.api.RunId;
-import org.apache.twill.api.ServiceController;
-import org.apache.twill.api.TwillController;
-import org.apache.twill.api.logging.LogHandler;
-import org.apache.twill.common.ServiceListenerAdapter;
-import org.apache.twill.common.Threads;
-import org.apache.twill.internal.state.StateNode;
-import org.apache.twill.internal.zookeeper.InMemoryZKServer;
-import org.apache.twill.zookeeper.NodeData;
-import org.apache.twill.zookeeper.ZKClient;
-import org.apache.twill.zookeeper.ZKClientService;
-import com.google.common.base.Suppliers;
-import com.google.common.collect.ImmutableList;
-import com.google.common.util.concurrent.AbstractIdleService;
-import com.google.common.util.concurrent.Service;
-import com.google.gson.JsonObject;
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-/**
- *
- */
-public class ControllerTest {
-
-  private static final Logger LOG = LoggerFactory.getLogger(ControllerTest.class);
-
-  @Test
-  public void testController() throws ExecutionException, InterruptedException, TimeoutException {
-    InMemoryZKServer zkServer = InMemoryZKServer.builder().build();
-    zkServer.startAndWait();
-
-    LOG.info("ZKServer: " + zkServer.getConnectionStr());
-
-    try {
-      RunId runId = RunIds.generate();
-      ZKClientService zkClientService = ZKClientService.Builder.of(zkServer.getConnectionStr()).build();
-      zkClientService.startAndWait();
-
-      Service service = createService(zkClientService, runId);
-      service.startAndWait();
-
-      TwillController controller = getController(zkClientService, runId);
-      controller.sendCommand(Command.Builder.of("test").build()).get(2, TimeUnit.SECONDS);
-      controller.stop().get(2, TimeUnit.SECONDS);
-
-      Assert.assertEquals(ServiceController.State.TERMINATED, controller.state());
-
-      final CountDownLatch terminateLatch = new CountDownLatch(1);
-      service.addListener(new ServiceListenerAdapter() {
-        @Override
-        public void terminated(Service.State from) {
-          terminateLatch.countDown();
-        }
-      }, Threads.SAME_THREAD_EXECUTOR);
-
-      Assert.assertTrue(service.state() == Service.State.TERMINATED || terminateLatch.await(2, TimeUnit.SECONDS));
-
-      zkClientService.stopAndWait();
-
-    } finally {
-      zkServer.stopAndWait();
-    }
-  }
-
-  // Test controller created before service starts.
-  @Test
-  public void testControllerBefore() throws InterruptedException {
-    InMemoryZKServer zkServer = InMemoryZKServer.builder().build();
-    zkServer.startAndWait();
-
-    LOG.info("ZKServer: " + zkServer.getConnectionStr());
-    try {
-      RunId runId = RunIds.generate();
-      ZKClientService zkClientService = ZKClientService.Builder.of(zkServer.getConnectionStr()).build();
-      zkClientService.startAndWait();
-
-      final CountDownLatch runLatch = new CountDownLatch(1);
-      final CountDownLatch stopLatch = new CountDownLatch(1);
-      TwillController controller = getController(zkClientService, runId);
-      controller.addListener(new ServiceListenerAdapter() {
-        @Override
-        public void running() {
-          runLatch.countDown();
-        }
-
-        @Override
-        public void terminated(Service.State from) {
-          stopLatch.countDown();
-        }
-      }, Threads.SAME_THREAD_EXECUTOR);
-
-      Service service = createService(zkClientService, runId);
-      service.start();
-
-      Assert.assertTrue(runLatch.await(2, TimeUnit.SECONDS));
-      Assert.assertFalse(stopLatch.await(2, TimeUnit.SECONDS));
-
-      service.stop();
-
-      Assert.assertTrue(stopLatch.await(2, TimeUnit.SECONDS));
-
-    } finally {
-      zkServer.stopAndWait();
-    }
-  }
-
-  // Test controller listener receive first state change without state transition from service
-  @Test
-  public void testControllerListener() throws InterruptedException {
-    InMemoryZKServer zkServer = InMemoryZKServer.builder().build();
-    zkServer.startAndWait();
-
-    LOG.info("ZKServer: " + zkServer.getConnectionStr());
-    try {
-      RunId runId = RunIds.generate();
-      ZKClientService zkClientService = ZKClientService.Builder.of(zkServer.getConnectionStr()).build();
-      zkClientService.startAndWait();
-
-      Service service = createService(zkClientService, runId);
-      service.startAndWait();
-
-      final CountDownLatch runLatch = new CountDownLatch(1);
-      TwillController controller = getController(zkClientService, runId);
-      controller.addListener(new ServiceListenerAdapter() {
-        @Override
-        public void running() {
-          runLatch.countDown();
-        }
-      }, Threads.SAME_THREAD_EXECUTOR);
-
-      Assert.assertTrue(runLatch.await(2, TimeUnit.SECONDS));
-
-      service.stopAndWait();
-
-      zkClientService.stopAndWait();
-    } finally {
-      zkServer.stopAndWait();
-    }
-  }
-
-  private Service createService(ZKClient zkClient, RunId runId) {
-    return new ZKServiceDecorator(
-      zkClient, runId, Suppliers.ofInstance(new JsonObject()), new AbstractIdleService() {
-
-      @Override
-      protected void startUp() throws Exception {
-        LOG.info("Start");
-      }
-
-      @Override
-      protected void shutDown() throws Exception {
-        LOG.info("Stop");
-      }
-    });
-  }
-
-  private TwillController getController(ZKClient zkClient, RunId runId) {
-    TwillController controller = new AbstractTwillController(runId, zkClient, ImmutableList.<LogHandler>of()) {
-
-      @Override
-      public void kill() {
-        // No-op
-      }
-
-      @Override
-      protected void instanceNodeUpdated(NodeData nodeData) {
-        // No-op
-      }
-
-      @Override
-      protected void stateNodeUpdated(StateNode stateNode) {
-        // No-op
-      }
-
-      @Override
-      public ResourceReport getResourceReport() {
-        return null;
-      }
-    };
-    controller.startAndWait();
-    return controller;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/core/src/test/java/org/apache/twill/internal/state/MessageCodecTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/twill/internal/state/MessageCodecTest.java b/core/src/test/java/org/apache/twill/internal/state/MessageCodecTest.java
deleted file mode 100644
index d267cf8..0000000
--- a/core/src/test/java/org/apache/twill/internal/state/MessageCodecTest.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.state;
-
-import org.apache.twill.api.Command;
-import com.google.common.collect.ImmutableMap;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.Map;
-
-/**
- *
- */
-public class MessageCodecTest {
-
-  @Test
-  public void testCodec() {
-    Message message = MessageCodec.decode(MessageCodec.encode(new Message() {
-
-      @Override
-      public Type getType() {
-        return Type.SYSTEM;
-      }
-
-      @Override
-      public Scope getScope() {
-        return Scope.APPLICATION;
-      }
-
-      @Override
-      public String getRunnableName() {
-        return null;
-      }
-
-      @Override
-      public Command getCommand() {
-        return new Command() {
-          @Override
-          public String getCommand() {
-            return "stop";
-          }
-
-          @Override
-          public Map<String, String> getOptions() {
-            return ImmutableMap.of("timeout", "1", "timeoutUnit", "SECONDS");
-          }
-        };
-      }
-    }));
-
-    Assert.assertEquals(Message.Type.SYSTEM, message.getType());
-    Assert.assertEquals(Message.Scope.APPLICATION, message.getScope());
-    Assert.assertNull(message.getRunnableName());
-    Assert.assertEquals("stop", message.getCommand().getCommand());
-    Assert.assertEquals(ImmutableMap.of("timeout", "1", "timeoutUnit", "SECONDS"), message.getCommand().getOptions());
-  }
-
-  @Test
-  public void testFailureDecode() {
-    Assert.assertNull(MessageCodec.decode("".getBytes()));
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/core/src/test/java/org/apache/twill/internal/state/ZKServiceDecoratorTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/twill/internal/state/ZKServiceDecoratorTest.java b/core/src/test/java/org/apache/twill/internal/state/ZKServiceDecoratorTest.java
deleted file mode 100644
index 47d8562..0000000
--- a/core/src/test/java/org/apache/twill/internal/state/ZKServiceDecoratorTest.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.state;
-
-import org.apache.twill.api.RunId;
-import org.apache.twill.internal.RunIds;
-import org.apache.twill.internal.ZKServiceDecorator;
-import org.apache.twill.internal.zookeeper.InMemoryZKServer;
-import org.apache.twill.zookeeper.NodeData;
-import org.apache.twill.zookeeper.ZKClientService;
-import org.apache.twill.zookeeper.ZKClients;
-import com.google.common.base.Charsets;
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Suppliers;
-import com.google.common.util.concurrent.AbstractIdleService;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.Service;
-import com.google.gson.Gson;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.data.Stat;
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicReference;
-
-/**
- *
- */
-public class ZKServiceDecoratorTest {
-
-  private static final Logger LOG = LoggerFactory.getLogger(ZKServiceDecoratorTest.class);
-
-  @Test
-  public void testStateTransition() throws InterruptedException, ExecutionException, TimeoutException {
-    InMemoryZKServer zkServer = InMemoryZKServer.builder().build();
-    zkServer.startAndWait();
-
-    try {
-      final String namespace = Joiner.on('/').join("/twill", RunIds.generate(), "runnables", "Runner1");
-
-      final ZKClientService zkClient = ZKClientService.Builder.of(zkServer.getConnectionStr()).build();
-      zkClient.startAndWait();
-      zkClient.create(namespace, null, CreateMode.PERSISTENT).get();
-
-      try {
-        JsonObject content = new JsonObject();
-        content.addProperty("containerId", "container-123");
-        content.addProperty("host", "localhost");
-
-        RunId runId = RunIds.generate();
-        final Semaphore semaphore = new Semaphore(0);
-        ZKServiceDecorator service = new ZKServiceDecorator(ZKClients.namespace(zkClient, namespace),
-                                                            runId, Suppliers.ofInstance(content),
-                                                            new AbstractIdleService() {
-          @Override
-          protected void startUp() throws Exception {
-            Preconditions.checkArgument(semaphore.tryAcquire(5, TimeUnit.SECONDS), "Fail to start");
-          }
-
-          @Override
-          protected void shutDown() throws Exception {
-            Preconditions.checkArgument(semaphore.tryAcquire(5, TimeUnit.SECONDS), "Fail to stop");
-          }
-        });
-
-        final String runnablePath = namespace + "/" + runId.getId();
-        final AtomicReference<String> stateMatch = new AtomicReference<String>("STARTING");
-        watchDataChange(zkClient, runnablePath + "/state", semaphore, stateMatch);
-        Assert.assertEquals(Service.State.RUNNING, service.start().get(5, TimeUnit.SECONDS));
-
-        stateMatch.set("STOPPING");
-        Assert.assertEquals(Service.State.TERMINATED, service.stop().get(5, TimeUnit.SECONDS));
-
-      } finally {
-        zkClient.stopAndWait();
-      }
-    } finally {
-      zkServer.stopAndWait();
-    }
-  }
-
-  private void watchDataChange(final ZKClientService zkClient, final String path,
-                               final Semaphore semaphore, final AtomicReference<String> stateMatch) {
-    Futures.addCallback(zkClient.getData(path, new Watcher() {
-      @Override
-      public void process(WatchedEvent event) {
-        if (event.getType() == Event.EventType.NodeDataChanged) {
-          watchDataChange(zkClient, path, semaphore, stateMatch);
-        }
-      }
-    }), new FutureCallback<NodeData>() {
-      @Override
-      public void onSuccess(NodeData result) {
-        String content = new String(result.getData(), Charsets.UTF_8);
-        JsonObject json = new Gson().fromJson(content, JsonElement.class).getAsJsonObject();
-        if (stateMatch.get().equals(json.get("state").getAsString())) {
-          semaphore.release();
-        }
-      }
-
-      @Override
-      public void onFailure(Throwable t) {
-        exists();
-      }
-
-      private void exists() {
-        Futures.addCallback(zkClient.exists(path, new Watcher() {
-          @Override
-          public void process(WatchedEvent event) {
-            if (event.getType() == Event.EventType.NodeCreated) {
-              watchDataChange(zkClient, path, semaphore, stateMatch);
-            }
-          }
-        }), new FutureCallback<Stat>() {
-          @Override
-          public void onSuccess(Stat result) {
-            if (result != null) {
-              watchDataChange(zkClient, path, semaphore, stateMatch);
-            }
-          }
-
-          @Override
-          public void onFailure(Throwable t) {
-            LOG.error(t.getMessage(), t);
-          }
-        });
-      }
-    });
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/core/src/test/java/org/apache/twill/internal/utils/ApplicationBundlerTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/twill/internal/utils/ApplicationBundlerTest.java b/core/src/test/java/org/apache/twill/internal/utils/ApplicationBundlerTest.java
deleted file mode 100644
index 508cadb..0000000
--- a/core/src/test/java/org/apache/twill/internal/utils/ApplicationBundlerTest.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.utils;
-
-import org.apache.twill.filesystem.LocalLocationFactory;
-import org.apache.twill.filesystem.Location;
-import org.apache.twill.internal.ApplicationBundler;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import com.google.common.io.ByteStreams;
-import com.google.common.io.Files;
-import org.junit.Assert;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.net.URLClassLoader;
-import java.util.List;
-import java.util.jar.JarEntry;
-import java.util.jar.JarInputStream;
-
-/**
- *
- */
-public class ApplicationBundlerTest {
-
-  @Rule
-  public TemporaryFolder tmpDir = new TemporaryFolder();
-
-  @Test
-  public void testFindDependencies() throws IOException, ClassNotFoundException {
-    Location location = new LocalLocationFactory(tmpDir.newFolder()).create("test.jar");
-
-    // Create a jar file with by tracing dependency
-    ApplicationBundler bundler = new ApplicationBundler(ImmutableList.<String>of());
-    bundler.createBundle(location, ApplicationBundler.class);
-
-    File targetDir = tmpDir.newFolder();
-    unjar(new File(location.toURI()), targetDir);
-
-    // Load the class back, it should be loaded by the custom classloader
-    ClassLoader classLoader = createClassLoader(targetDir);
-    Class<?> clz = classLoader.loadClass(ApplicationBundler.class.getName());
-    Assert.assertSame(classLoader, clz.getClassLoader());
-
-    // For system classes, they shouldn't be packaged, hence loaded by different classloader.
-    clz = classLoader.loadClass(Object.class.getName());
-    Assert.assertNotSame(classLoader, clz.getClassLoader());
-  }
-
-  private void unjar(File jarFile, File targetDir) throws IOException {
-    JarInputStream jarInput = new JarInputStream(new FileInputStream(jarFile));
-    try {
-      JarEntry jarEntry = jarInput.getNextJarEntry();
-      while (jarEntry != null) {
-        File target = new File(targetDir, jarEntry.getName());
-        if (jarEntry.isDirectory()) {
-          target.mkdirs();
-        } else {
-          target.getParentFile().mkdirs();
-          ByteStreams.copy(jarInput, Files.newOutputStreamSupplier(target));
-        }
-
-        jarEntry = jarInput.getNextJarEntry();
-      }
-    } finally {
-      jarInput.close();
-    }
-  }
-
-  private ClassLoader createClassLoader(File dir) throws MalformedURLException {
-    List<URL> urls = Lists.newArrayList();
-    urls.add(new File(dir, "classes").toURI().toURL());
-    File[] libFiles = new File(dir, "lib").listFiles();
-    if (libFiles != null) {
-      for (File file : libFiles) {
-        urls.add(file.toURI().toURL());
-      }
-    }
-    return new URLClassLoader(urls.toArray(new URL[0])) {
-      @Override
-      protected synchronized Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException {
-        // Load class from the given URLs first before delegating to parent.
-        try {
-          return super.findClass(name);
-        } catch (ClassNotFoundException e) {
-          ClassLoader parent = getParent();
-          return parent == null ? ClassLoader.getSystemClassLoader().loadClass(name) : parent.loadClass(name);
-        }
-      }
-    };
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java b/core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java
deleted file mode 100644
index 40fc3ed..0000000
--- a/core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java
+++ /dev/null
@@ -1,220 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.kafka.client;
-
-import org.apache.twill.common.Services;
-import org.apache.twill.internal.kafka.EmbeddedKafkaServer;
-import org.apache.twill.internal.kafka.client.Compression;
-import org.apache.twill.internal.kafka.client.SimpleKafkaClient;
-import org.apache.twill.internal.utils.Networks;
-import org.apache.twill.internal.zookeeper.InMemoryZKServer;
-import org.apache.twill.zookeeper.ZKClientService;
-import com.google.common.base.Charsets;
-import com.google.common.base.Preconditions;
-import com.google.common.io.ByteStreams;
-import com.google.common.io.Files;
-import com.google.common.util.concurrent.Futures;
-import org.apache.commons.compress.archivers.ArchiveEntry;
-import org.apache.commons.compress.archivers.ArchiveException;
-import org.apache.commons.compress.archivers.ArchiveInputStream;
-import org.apache.commons.compress.archivers.ArchiveStreamFactory;
-import org.apache.commons.compress.compressors.CompressorException;
-import org.apache.commons.compress.compressors.CompressorStreamFactory;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.Iterator;
-import java.util.Properties;
-import java.util.concurrent.TimeUnit;
-
-/**
- *
- */
-public class KafkaTest {
-
-  private static final Logger LOG = LoggerFactory.getLogger(KafkaTest.class);
-
-  @ClassRule
-  public static final TemporaryFolder TMP_FOLDER = new TemporaryFolder();
-
-  private static InMemoryZKServer zkServer;
-  private static EmbeddedKafkaServer kafkaServer;
-  private static ZKClientService zkClientService;
-  private static KafkaClient kafkaClient;
-
-  @BeforeClass
-  public static void init() throws Exception {
-    zkServer = InMemoryZKServer.builder().setDataDir(TMP_FOLDER.newFolder()).build();
-    zkServer.startAndWait();
-
-    // Extract the kafka.tgz and start the kafka server
-    kafkaServer = new EmbeddedKafkaServer(extractKafka(), generateKafkaConfig(zkServer.getConnectionStr()));
-    kafkaServer.startAndWait();
-
-    zkClientService = ZKClientService.Builder.of(zkServer.getConnectionStr()).build();
-
-    kafkaClient = new SimpleKafkaClient(zkClientService);
-    Services.chainStart(zkClientService, kafkaClient).get();
-  }
-
-  @AfterClass
-  public static void finish() throws Exception {
-    Services.chainStop(kafkaClient, zkClientService).get();
-    kafkaServer.stopAndWait();
-    zkServer.stopAndWait();
-  }
-
-  @Test
-  public void testKafkaClient() throws Exception {
-    String topic = "testClient";
-
-    Thread t1 = createPublishThread(kafkaClient, topic, Compression.GZIP, "GZIP Testing message", 10);
-    Thread t2 = createPublishThread(kafkaClient, topic, Compression.NONE, "Testing message", 10);
-
-    t1.start();
-    t2.start();
-
-    Thread t3 = createPublishThread(kafkaClient, topic, Compression.SNAPPY, "Snappy Testing message", 10);
-    t2.join();
-    t3.start();
-
-    Iterator<FetchedMessage> consumer = kafkaClient.consume(topic, 0, 0, 1048576);
-    int count = 0;
-    long startTime = System.nanoTime();
-    while (count < 30 && consumer.hasNext() && secondsPassed(startTime, TimeUnit.NANOSECONDS) < 5) {
-      LOG.info(Charsets.UTF_8.decode(consumer.next().getBuffer()).toString());
-      count++;
-    }
-
-    Assert.assertEquals(30, count);
-  }
-
-  @Test (timeout = 10000)
-  public void testOffset() throws Exception {
-    String topic = "testOffset";
-
-    // Initial earliest offset should be 0.
-    long[] offsets = kafkaClient.getOffset(topic, 0, -2, 10).get();
-    Assert.assertArrayEquals(new long[]{0L}, offsets);
-
-    // Publish some messages
-    Thread publishThread = createPublishThread(kafkaClient, topic, Compression.NONE, "Testing", 2000);
-    publishThread.start();
-    publishThread.join();
-
-    // Fetch earliest offset, should still be 0.
-    offsets = kafkaClient.getOffset(topic, 0, -2, 10).get();
-    Assert.assertArrayEquals(new long[]{0L}, offsets);
-
-    // Fetch latest offset
-    offsets = kafkaClient.getOffset(topic, 0, -1, 10).get();
-    Iterator<FetchedMessage> consumer = kafkaClient.consume(topic, 0, offsets[0], 1048576);
-
-    // Publish one more message, the consumer should see the new message being published.
-    publishThread = createPublishThread(kafkaClient, topic, Compression.NONE, "Testing", 1, 3000);
-    publishThread.start();
-    publishThread.join();
-
-    // Should see the last message being published.
-    Assert.assertTrue(consumer.hasNext());
-    Assert.assertEquals("3000 Testing", Charsets.UTF_8.decode(consumer.next().getBuffer()).toString());
-  }
-
-  private Thread createPublishThread(final KafkaClient kafkaClient, final String topic,
-                                     final Compression compression, final String message, final int count) {
-    return createPublishThread(kafkaClient, topic, compression, message, count, 0);
-  }
-
-  private Thread createPublishThread(final KafkaClient kafkaClient, final String topic, final Compression compression,
-                                     final String message, final int count, final int base) {
-    return new Thread() {
-      public void run() {
-        PreparePublish preparePublish = kafkaClient.preparePublish(topic, compression);
-        for (int i = 0; i < count; i++) {
-          preparePublish.add(((base + i) + " " + message).getBytes(Charsets.UTF_8), 0);
-        }
-        Futures.getUnchecked(preparePublish.publish());
-      }
-    };
-  }
-
-  private long secondsPassed(long startTime, TimeUnit startUnit) {
-    return TimeUnit.SECONDS.convert(System.nanoTime() - TimeUnit.NANOSECONDS.convert(startTime, startUnit),
-                                    TimeUnit.NANOSECONDS);
-  }
-
-  private static File extractKafka() throws IOException, ArchiveException, CompressorException {
-    File kafkaExtract = TMP_FOLDER.newFolder();
-    InputStream kakfaResource = KafkaTest.class.getClassLoader().getResourceAsStream("kafka-0.7.2.tgz");
-    ArchiveInputStream archiveInput = new ArchiveStreamFactory()
-      .createArchiveInputStream(ArchiveStreamFactory.TAR,
-                                new CompressorStreamFactory()
-                                  .createCompressorInputStream(CompressorStreamFactory.GZIP, kakfaResource));
-
-    try {
-      ArchiveEntry entry = archiveInput.getNextEntry();
-      while (entry != null) {
-        File file = new File(kafkaExtract, entry.getName());
-        if (entry.isDirectory()) {
-          file.mkdirs();
-        } else {
-          ByteStreams.copy(archiveInput, Files.newOutputStreamSupplier(file));
-        }
-        entry = archiveInput.getNextEntry();
-      }
-    } finally {
-      archiveInput.close();
-    }
-    return kafkaExtract;
-  }
-
-  private static Properties generateKafkaConfig(String zkConnectStr) throws IOException {
-    int port = Networks.getRandomPort();
-    Preconditions.checkState(port > 0, "Failed to get random port.");
-
-    Properties prop = new Properties();
-    prop.setProperty("log.dir", TMP_FOLDER.newFolder().getAbsolutePath());
-    prop.setProperty("zk.connect", zkConnectStr);
-    prop.setProperty("num.threads", "8");
-    prop.setProperty("port", Integer.toString(port));
-    prop.setProperty("log.flush.interval", "1000");
-    prop.setProperty("max.socket.request.bytes", "104857600");
-    prop.setProperty("log.cleanup.interval.mins", "1");
-    prop.setProperty("log.default.flush.scheduler.interval.ms", "1000");
-    prop.setProperty("zk.connectiontimeout.ms", "1000000");
-    prop.setProperty("socket.receive.buffer", "1048576");
-    prop.setProperty("enable.zookeeper", "true");
-    prop.setProperty("log.retention.hours", "24");
-    prop.setProperty("brokerid", "0");
-    prop.setProperty("socket.send.buffer", "1048576");
-    prop.setProperty("num.partitions", "1");
-    // Use a really small file size to force some flush to happen
-    prop.setProperty("log.file.size", "1024");
-    prop.setProperty("log.default.flush.interval.ms", "1000");
-    return prop;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/core/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/core/src/test/resources/logback-test.xml b/core/src/test/resources/logback-test.xml
deleted file mode 100644
index 3c36660..0000000
--- a/core/src/test/resources/logback-test.xml
+++ /dev/null
@@ -1,18 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-
-<!-- Default logback configuration for twill library -->
-<configuration>
-    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
-        <encoder>
-            <pattern>%d{ISO8601} - %-5p [%t:%C{1}@%L] - %m%n</pattern>
-        </encoder>
-    </appender>
-
-    <logger name="org.apache.hadoop" level="WARN" />
-    <logger name="org.apache.zookeeper" level="WARN" />
-
-    <root level="INFO">
-        <appender-ref ref="STDOUT"/>
-    </root>
-
-</configuration>

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/discovery-api/pom.xml
----------------------------------------------------------------------
diff --git a/discovery-api/pom.xml b/discovery-api/pom.xml
deleted file mode 100644
index e41b214..0000000
--- a/discovery-api/pom.xml
+++ /dev/null
@@ -1,39 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements.  See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership.  The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License.  You may obtain a copy of the License at
-
-     http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-    <parent>
-        <artifactId>twill-parent</artifactId>
-        <groupId>org.apache.twill</groupId>
-        <version>0.1.0-SNAPSHOT</version>
-    </parent>
-    <modelVersion>4.0.0</modelVersion>
-
-    <artifactId>twill-discovery-api</artifactId>
-    <name>Twill discovery service API</name>
-
-    <dependencies>
-        <dependency>
-            <groupId>${project.groupId}</groupId>
-            <artifactId>twill-common</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-    </dependencies>
-</project>

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/discovery-api/src/main/java/org/apache/twill/discovery/Discoverable.java
----------------------------------------------------------------------
diff --git a/discovery-api/src/main/java/org/apache/twill/discovery/Discoverable.java b/discovery-api/src/main/java/org/apache/twill/discovery/Discoverable.java
deleted file mode 100644
index a5529fe..0000000
--- a/discovery-api/src/main/java/org/apache/twill/discovery/Discoverable.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.twill.discovery;
-
-import java.net.InetSocketAddress;
-
-/**
- * Discoverable defines the attributes of service to be discovered.
- */
-public interface Discoverable {
-
-  /**
-   * @return Name of the service
-   */
-  String getName();
-
-  /**
-   * @return An {@link InetSocketAddress} representing the host+port of the service.
-   */
-  InetSocketAddress getSocketAddress();
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/discovery-api/src/main/java/org/apache/twill/discovery/DiscoveryService.java
----------------------------------------------------------------------
diff --git a/discovery-api/src/main/java/org/apache/twill/discovery/DiscoveryService.java b/discovery-api/src/main/java/org/apache/twill/discovery/DiscoveryService.java
deleted file mode 100644
index a26fff8..0000000
--- a/discovery-api/src/main/java/org/apache/twill/discovery/DiscoveryService.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.discovery;
-
-
-import org.apache.twill.common.Cancellable;
-
-/**
- * DiscoveryService defines interface for registering {@link Discoverable}.
- */
-public interface DiscoveryService {
-
-  /**
-   * Registers a {@link Discoverable} service.
-   * @param discoverable Information of the service provider that could be discovered.
-   * @return A {@link Cancellable} for un-registration.
-   */
-  Cancellable register(Discoverable discoverable);
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/discovery-api/src/main/java/org/apache/twill/discovery/DiscoveryServiceClient.java
----------------------------------------------------------------------
diff --git a/discovery-api/src/main/java/org/apache/twill/discovery/DiscoveryServiceClient.java b/discovery-api/src/main/java/org/apache/twill/discovery/DiscoveryServiceClient.java
deleted file mode 100644
index 89cf269..0000000
--- a/discovery-api/src/main/java/org/apache/twill/discovery/DiscoveryServiceClient.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.discovery;
-
-/**
- * Interface for {@link DiscoveryServiceClient} to discover services registered with {@link DiscoveryService}.
- */
-public interface DiscoveryServiceClient {
-
-  /**
-   * Retrieves a list of {@link Discoverable} for the a service with the given name.
-   *
-   * @param name Name of the service
-   * @return A live {@link Iterable} that on each call to {@link Iterable#iterator()} returns
-   *         an {@link java.util.Iterator Iterator} that reflects the latest set of
-   *         available {@link Discoverable} services.
-   */
-  Iterable<Discoverable> discover(String name);
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/discovery-core/pom.xml
----------------------------------------------------------------------
diff --git a/discovery-core/pom.xml b/discovery-core/pom.xml
deleted file mode 100644
index 2612138..0000000
--- a/discovery-core/pom.xml
+++ /dev/null
@@ -1,52 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements.  See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership.  The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License.  You may obtain a copy of the License at
-
-     http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-    <parent>
-        <artifactId>twill-parent</artifactId>
-        <groupId>org.apache.twill</groupId>
-        <version>0.1.0-SNAPSHOT</version>
-    </parent>
-    <modelVersion>4.0.0</modelVersion>
-
-    <artifactId>twill-discovery-core</artifactId>
-    <name>Twill discovery service implementations</name>
-
-    <dependencies>
-        <dependency>
-            <groupId>${project.groupId}</groupId>
-            <artifactId>twill-discovery-api</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>${project.groupId}</groupId>
-            <artifactId>twill-zookeeper</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>com.google.code.gson</groupId>
-            <artifactId>gson</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>junit</groupId>
-            <artifactId>junit</artifactId>
-        </dependency>
-    </dependencies>
-</project>

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/discovery-core/src/main/java/org/apache/twill/discovery/DiscoverableWrapper.java
----------------------------------------------------------------------
diff --git a/discovery-core/src/main/java/org/apache/twill/discovery/DiscoverableWrapper.java b/discovery-core/src/main/java/org/apache/twill/discovery/DiscoverableWrapper.java
deleted file mode 100644
index 5fa97d1..0000000
--- a/discovery-core/src/main/java/org/apache/twill/discovery/DiscoverableWrapper.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.discovery;
-
-import java.net.InetSocketAddress;
-
-/**
- * Wrapper for a discoverable.
- */
-final class DiscoverableWrapper implements Discoverable {
-  private final String name;
-  private final InetSocketAddress address;
-
-  DiscoverableWrapper(Discoverable discoverable) {
-    this.name = discoverable.getName();
-    this.address = discoverable.getSocketAddress();
-  }
-
-  @Override
-  public String getName() {
-    return name;
-  }
-
-  @Override
-  public InetSocketAddress getSocketAddress() {
-    return address;
-  }
-
-  @Override
-  public String toString() {
-    return "{name=" + name + ", address=" + address;
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) {
-      return true;
-    }
-    if (o == null || getClass() != o.getClass()) {
-      return false;
-    }
-
-    Discoverable other = (Discoverable) o;
-
-    return name.equals(other.getName()) && address.equals(other.getSocketAddress());
-  }
-
-  @Override
-  public int hashCode() {
-    int result = name.hashCode();
-    result = 31 * result + address.hashCode();
-    return result;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/discovery-core/src/main/java/org/apache/twill/discovery/InMemoryDiscoveryService.java
----------------------------------------------------------------------
diff --git a/discovery-core/src/main/java/org/apache/twill/discovery/InMemoryDiscoveryService.java b/discovery-core/src/main/java/org/apache/twill/discovery/InMemoryDiscoveryService.java
deleted file mode 100644
index 7a9e984..0000000
--- a/discovery-core/src/main/java/org/apache/twill/discovery/InMemoryDiscoveryService.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.discovery;
-
-import org.apache.twill.common.Cancellable;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Multimap;
-
-import java.util.Iterator;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-/**
- * A simple in memory implementation of {@link DiscoveryService} and {@link DiscoveryServiceClient}.
- */
-public class InMemoryDiscoveryService implements DiscoveryService, DiscoveryServiceClient {
-
-  private final Multimap<String, Discoverable> services = HashMultimap.create();
-  private final Lock lock = new ReentrantLock();
-
-  @Override
-  public Cancellable register(final Discoverable discoverable) {
-    lock.lock();
-    try {
-      final Discoverable wrapper = new DiscoverableWrapper(discoverable);
-      services.put(wrapper.getName(), wrapper);
-      return new Cancellable() {
-        @Override
-        public void cancel() {
-          lock.lock();
-          try {
-            services.remove(wrapper.getName(), wrapper);
-          } finally {
-            lock.unlock();
-          }
-        }
-      };
-    } finally {
-      lock.unlock();
-    }
-  }
-
-  @Override
-  public Iterable<Discoverable> discover(final String name) {
-    return new Iterable<Discoverable>() {
-      @Override
-      public Iterator<Discoverable> iterator() {
-        lock.lock();
-        try {
-          return ImmutableList.copyOf(services.get(name)).iterator();
-        } finally {
-          lock.unlock();
-        }
-      }
-    };
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/discovery-core/src/main/java/org/apache/twill/discovery/ZKDiscoveryService.java
----------------------------------------------------------------------
diff --git a/discovery-core/src/main/java/org/apache/twill/discovery/ZKDiscoveryService.java b/discovery-core/src/main/java/org/apache/twill/discovery/ZKDiscoveryService.java
deleted file mode 100644
index e2f9bc0..0000000
--- a/discovery-core/src/main/java/org/apache/twill/discovery/ZKDiscoveryService.java
+++ /dev/null
@@ -1,511 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.discovery;
-
-import org.apache.twill.common.Cancellable;
-import org.apache.twill.common.Threads;
-import org.apache.twill.zookeeper.NodeChildren;
-import org.apache.twill.zookeeper.NodeData;
-import org.apache.twill.zookeeper.OperationFuture;
-import org.apache.twill.zookeeper.ZKClient;
-import org.apache.twill.zookeeper.ZKClients;
-import org.apache.twill.zookeeper.ZKOperations;
-import com.google.common.base.Charsets;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Multimap;
-import com.google.common.hash.Hashing;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
-import com.google.gson.GsonBuilder;
-import com.google.gson.JsonDeserializationContext;
-import com.google.gson.JsonDeserializer;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonParseException;
-import com.google.gson.JsonSerializationContext;
-import com.google.gson.JsonSerializer;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.data.Stat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.lang.reflect.Type;
-import java.net.InetSocketAddress;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-/**
- * Zookeeper implementation of {@link DiscoveryService} and {@link DiscoveryServiceClient}.
- * <p>
- *   Discoverable services are registered within Zookeeper under the namespace 'discoverable' by default.
- *   If you would like to change the namespace under which the services are registered then you can pass
- *   in the namespace during construction of {@link ZKDiscoveryService}.
- * </p>
- *
- * <p>
- *   Following is a simple example of how {@link ZKDiscoveryService} can be used for registering services
- *   and also for discovering the registered services.
- *   <blockquote>
- *    <pre>
- *      {@code
- *
- *      DiscoveryService service = new ZKDiscoveryService(zkClient);
- *      service.register(new Discoverable() {
- *        @Override
- *        public String getName() {
- *          return 'service-name';
- *        }
- *
- *        @Override
- *        public InetSocketAddress getSocketAddress() {
- *          return new InetSocketAddress(hostname, port);
- *        }
- *      });
- *      ...
- *      ...
- *      Iterable<Discoverable> services = service.discovery("service-name");
- *      ...
- *      }
- *    </pre>
- *   </blockquote>
- * </p>
- */
-public class ZKDiscoveryService implements DiscoveryService, DiscoveryServiceClient {
-  private static final Logger LOG = LoggerFactory.getLogger(ZKDiscoveryService.class);
-  private static final String NAMESPACE = "/discoverable";
-
-  private static final long RETRY_MILLIS = 1000;
-
-  // In memory map for recreating ephemeral nodes after session expires.
-  // It map from discoverable to the corresponding Cancellable
-  private final Multimap<Discoverable, DiscoveryCancellable> discoverables;
-  private final Lock lock;
-
-  private final LoadingCache<String, Iterable<Discoverable>> services;
-  private final ZKClient zkClient;
-  private final ScheduledExecutorService retryExecutor;
-
-  /**
-   * Constructs ZKDiscoveryService using the provided zookeeper client for storing service registry.
-   * @param zkClient The {@link ZKClient} for interacting with zookeeper.
-   */
-  public ZKDiscoveryService(ZKClient zkClient) {
-    this(zkClient, NAMESPACE);
-  }
-
-  /**
-   * Constructs ZKDiscoveryService using the provided zookeeper client for storing service registry under namepsace.
-   * @param zkClient of zookeeper quorum
-   * @param namespace under which the service registered would be stored in zookeeper.
-   *                  If namespace is {@code null}, no namespace will be used.
-   */
-  public ZKDiscoveryService(ZKClient zkClient, String namespace) {
-    this.discoverables = HashMultimap.create();
-    this.lock = new ReentrantLock();
-    this.retryExecutor = Executors.newSingleThreadScheduledExecutor(
-      Threads.createDaemonThreadFactory("zk-discovery-retry"));
-    this.zkClient = namespace == null ? zkClient : ZKClients.namespace(zkClient, namespace);
-    this.services = CacheBuilder.newBuilder().build(createServiceLoader());
-    this.zkClient.addConnectionWatcher(createConnectionWatcher());
-  }
-
-  /**
-   * Registers a {@link Discoverable} in zookeeper.
-   * <p>
-   *   Registering a {@link Discoverable} will create a node <base>/<service-name>
-   *   in zookeeper as a ephemeral node. If the node already exists (timeout associated with emphemeral, then a runtime
-   *   exception is thrown to make sure that a service with an intent to register is not started without registering.
-   *   When a runtime is thrown, expectation is that the process being started with fail and would be started again
-   *   by the monitoring service.
-   * </p>
-   * @param discoverable Information of the service provider that could be discovered.
-   * @return An instance of {@link Cancellable}
-   */
-  @Override
-  public Cancellable register(final Discoverable discoverable) {
-    final Discoverable wrapper = new DiscoverableWrapper(discoverable);
-    final SettableFuture<String> future = SettableFuture.create();
-    final DiscoveryCancellable cancellable = new DiscoveryCancellable(wrapper);
-
-    // Create the zk ephemeral node.
-    Futures.addCallback(doRegister(wrapper), new FutureCallback<String>() {
-      @Override
-      public void onSuccess(String result) {
-        // Set the sequence node path to cancellable for future cancellation.
-        cancellable.setPath(result);
-        lock.lock();
-        try {
-          discoverables.put(wrapper, cancellable);
-        } finally {
-          lock.unlock();
-        }
-        LOG.debug("Service registered: {} {}", wrapper, result);
-        future.set(result);
-      }
-
-      @Override
-      public void onFailure(Throwable t) {
-        if (t instanceof KeeperException.NodeExistsException) {
-          handleRegisterFailure(discoverable, future, this, t);
-        } else {
-          LOG.warn("Failed to register: {}", wrapper, t);
-          future.setException(t);
-        }
-      }
-    }, Threads.SAME_THREAD_EXECUTOR);
-
-    Futures.getUnchecked(future);
-    return cancellable;
-  }
-
-  @Override
-  public Iterable<Discoverable> discover(String service) {
-    return services.getUnchecked(service);
-  }
-
-  /**
-   * Handle registration failure.
-   *
-   * @param discoverable The discoverable to register.
-   * @param completion A settable future to set when registration is completed / failed.
-   * @param creationCallback A future callback for path creation.
-   * @param failureCause The original cause of failure.
-   */
-  private void handleRegisterFailure(final Discoverable discoverable,
-                                     final SettableFuture<String> completion,
-                                     final FutureCallback<String> creationCallback,
-                                     final Throwable failureCause) {
-
-    final String path = getNodePath(discoverable);
-    Futures.addCallback(zkClient.exists(path), new FutureCallback<Stat>() {
-      @Override
-      public void onSuccess(Stat result) {
-        if (result == null) {
-          // If the node is gone, simply retry.
-          LOG.info("Node {} is gone. Retry registration for {}.", path, discoverable);
-          retryRegister(discoverable, creationCallback);
-          return;
-        }
-
-        long ephemeralOwner = result.getEphemeralOwner();
-        if (ephemeralOwner == 0) {
-          // it is not an ephemeral node, something wrong.
-          LOG.error("Node {} already exists and is not an ephemeral node. Discoverable registration failed: {}.",
-                    path, discoverable);
-          completion.setException(failureCause);
-          return;
-        }
-        Long sessionId = zkClient.getSessionId();
-        if (sessionId == null || ephemeralOwner != sessionId) {
-          // This zkClient is not valid or doesn't own the ephemeral node, simply keep retrying.
-          LOG.info("Owner of {} is different. Retry registration for {}.", path, discoverable);
-          retryRegister(discoverable, creationCallback);
-        } else {
-          // This client owned the node, treat the registration as completed.
-          // This could happen if same client tries to register twice (due to mistake or failure race condition).
-          completion.set(path);
-        }
-      }
-
-      @Override
-      public void onFailure(Throwable t) {
-        // If exists call failed, simply retry creation.
-        LOG.warn("Error when getting stats on {}. Retry registration for {}.", path, discoverable);
-        retryRegister(discoverable, creationCallback);
-      }
-    }, Threads.SAME_THREAD_EXECUTOR);
-  }
-
-  private OperationFuture<String> doRegister(Discoverable discoverable) {
-    byte[] discoverableBytes = encode(discoverable);
-    return zkClient.create(getNodePath(discoverable), discoverableBytes, CreateMode.EPHEMERAL, true);
-  }
-
-  private void retryRegister(final Discoverable discoverable, final FutureCallback<String> creationCallback) {
-    retryExecutor.schedule(new Runnable() {
-
-      @Override
-      public void run() {
-        Futures.addCallback(doRegister(discoverable), creationCallback, Threads.SAME_THREAD_EXECUTOR);
-      }
-    }, RETRY_MILLIS, TimeUnit.MILLISECONDS);
-  }
-
-
-  /**
-   * Generate unique node path for a given {@link Discoverable}.
-   * @param discoverable An instance of {@link Discoverable}.
-   * @return A node name based on the discoverable.
-   */
-  private String getNodePath(Discoverable discoverable) {
-    InetSocketAddress socketAddress = discoverable.getSocketAddress();
-    String node = Hashing.md5()
-                         .newHasher()
-                         .putBytes(socketAddress.getAddress().getAddress())
-                         .putInt(socketAddress.getPort())
-                         .hash().toString();
-
-    return String.format("/%s/%s", discoverable.getName(), node);
-  }
-
-  private Watcher createConnectionWatcher() {
-    return new Watcher() {
-      // Watcher is invoked from single event thread, hence safe to use normal mutable variable.
-      private boolean expired;
-
-      @Override
-      public void process(WatchedEvent event) {
-        if (event.getState() == Event.KeeperState.Expired) {
-          LOG.warn("ZK Session expired: {}", zkClient.getConnectString());
-          expired = true;
-        } else if (event.getState() == Event.KeeperState.SyncConnected && expired) {
-          LOG.info("Reconnected after expiration: {}", zkClient.getConnectString());
-          expired = false;
-
-          // Re-register all services
-          lock.lock();
-          try {
-            for (final Map.Entry<Discoverable, DiscoveryCancellable> entry : discoverables.entries()) {
-              LOG.info("Re-registering service: {}", entry.getKey());
-
-              // Must be non-blocking in here.
-              Futures.addCallback(doRegister(entry.getKey()), new FutureCallback<String>() {
-                @Override
-                public void onSuccess(String result) {
-                  // Updates the cancellable to the newly created sequential node.
-                  entry.getValue().setPath(result);
-                  LOG.debug("Service re-registered: {} {}", entry.getKey(), result);
-                }
-
-                @Override
-                public void onFailure(Throwable t) {
-                  // When failed to create the node, there would be no retry and simply make the cancellable do nothing.
-                  entry.getValue().setPath(null);
-                  LOG.error("Failed to re-register service: {}", entry.getKey(), t);
-                }
-              }, Threads.SAME_THREAD_EXECUTOR);
-            }
-          } finally {
-            lock.unlock();
-          }
-        }
-      }
-    };
-  }
-
-  /**
-   * Creates a CacheLoader for creating live Iterable for watching instances changes for a given service.
-   */
-  private CacheLoader<String, Iterable<Discoverable>> createServiceLoader() {
-    return new CacheLoader<String, Iterable<Discoverable>>() {
-      @Override
-      public Iterable<Discoverable> load(String service) throws Exception {
-        // The atomic reference is to keep the resulting Iterable live. It always contains a
-        // immutable snapshot of the latest detected set of Discoverable.
-        final AtomicReference<Iterable<Discoverable>> iterable =
-              new AtomicReference<Iterable<Discoverable>>(ImmutableList.<Discoverable>of());
-        final String serviceBase = "/" + service;
-
-        // Watch for children changes in /service
-        ZKOperations.watchChildren(zkClient, serviceBase, new ZKOperations.ChildrenCallback() {
-          @Override
-          public void updated(NodeChildren nodeChildren) {
-            // Fetch data of all children nodes in parallel.
-            List<String> children = nodeChildren.getChildren();
-            List<OperationFuture<NodeData>> dataFutures = Lists.newArrayListWithCapacity(children.size());
-            for (String child : children) {
-              dataFutures.add(zkClient.getData(serviceBase + "/" + child));
-            }
-
-            // Update the service map when all fetching are done.
-            final ListenableFuture<List<NodeData>> fetchFuture = Futures.successfulAsList(dataFutures);
-            fetchFuture.addListener(new Runnable() {
-              @Override
-              public void run() {
-                ImmutableList.Builder<Discoverable> builder = ImmutableList.builder();
-                for (NodeData nodeData : Futures.getUnchecked(fetchFuture)) {
-                  // For successful fetch, decode the content.
-                  if (nodeData != null) {
-                    Discoverable discoverable = decode(nodeData.getData());
-                    if (discoverable != null) {
-                      builder.add(discoverable);
-                    }
-                  }
-                }
-                iterable.set(builder.build());
-              }
-            }, Threads.SAME_THREAD_EXECUTOR);
-          }
-        });
-
-        return new Iterable<Discoverable>() {
-          @Override
-          public Iterator<Discoverable> iterator() {
-            return iterable.get().iterator();
-          }
-        };
-      }
-    };
-  }
-
-  /**
-   * Static helper function for decoding array of bytes into a {@link DiscoverableWrapper} object.
-   * @param bytes representing serialized {@link DiscoverableWrapper}
-   * @return null if bytes are null; else an instance of {@link DiscoverableWrapper}
-   */
-  private static Discoverable decode(byte[] bytes) {
-    if (bytes == null) {
-      return null;
-    }
-    String content = new String(bytes, Charsets.UTF_8);
-    return new GsonBuilder().registerTypeAdapter(Discoverable.class, new DiscoverableCodec())
-      .create()
-      .fromJson(content, Discoverable.class);
-  }
-
-  /**
-   * Static helper function for encoding an instance of {@link DiscoverableWrapper} into array of bytes.
-   * @param discoverable An instance of {@link Discoverable}
-   * @return array of bytes representing an instance of <code>discoverable</code>
-   */
-  private static byte[] encode(Discoverable discoverable) {
-    return new GsonBuilder().registerTypeAdapter(DiscoverableWrapper.class, new DiscoverableCodec())
-      .create()
-      .toJson(discoverable, DiscoverableWrapper.class)
-      .getBytes(Charsets.UTF_8);
-  }
-
-  /**
-   * Inner class for cancelling (un-register) discovery service.
-   */
-  private final class DiscoveryCancellable implements Cancellable {
-
-    private final Discoverable discoverable;
-    private final AtomicBoolean cancelled;
-    private volatile String path;
-
-    DiscoveryCancellable(Discoverable discoverable) {
-      this.discoverable = discoverable;
-      this.cancelled = new AtomicBoolean();
-    }
-
-    /**
-     * Set the zk node path representing the ephemeral sequence node of this registered discoverable.
-     * Called from ZK event thread when creating of the node completed, either from normal registration or
-     * re-registration due to session expiration.
-     *
-     * @param path The path to ephemeral sequence node.
-     */
-    void setPath(String path) {
-      this.path = path;
-      if (cancelled.get() && path != null) {
-        // Simply delete the path if it's already cancelled
-        // It's for the case when session expire happened and re-registration completed after this has been cancelled.
-        // Not bother with the result as if there is error, nothing much we could do.
-        zkClient.delete(path);
-      }
-    }
-
-    @Override
-    public void cancel() {
-      if (!cancelled.compareAndSet(false, true)) {
-        return;
-      }
-
-      // Take a snapshot of the volatile path.
-      String path = this.path;
-
-      // If it is null, meaning cancel() is called before the ephemeral node is created, hence
-      // setPath() will be called in future (through zk callback when creation is completed)
-      // so that deletion will be done in setPath().
-      if (path == null) {
-        return;
-      }
-
-      // Remove this Cancellable from the map so that upon session expiration won't try to register.
-      lock.lock();
-      try {
-        discoverables.remove(discoverable, this);
-      } finally {
-        lock.unlock();
-      }
-
-      // Delete the path. It's ok if the path not exists
-      // (e.g. what session expired and before node has been re-created)
-      Futures.getUnchecked(ZKOperations.ignoreError(zkClient.delete(path),
-                                                    KeeperException.NoNodeException.class, path));
-      LOG.debug("Service unregistered: {} {}", discoverable, path);
-    }
-  }
-
-  /**
-   * SerDe for converting a {@link DiscoverableWrapper} into a JSON object
-   * or from a JSON object into {@link DiscoverableWrapper}.
-   */
-  private static final class DiscoverableCodec implements JsonSerializer<Discoverable>, JsonDeserializer<Discoverable> {
-
-    @Override
-    public Discoverable deserialize(JsonElement json, Type typeOfT,
-                                    JsonDeserializationContext context) throws JsonParseException {
-      JsonObject jsonObj = json.getAsJsonObject();
-      final String service = jsonObj.get("service").getAsString();
-      String hostname = jsonObj.get("hostname").getAsString();
-      int port = jsonObj.get("port").getAsInt();
-      final InetSocketAddress address = new InetSocketAddress(hostname, port);
-      return new Discoverable() {
-        @Override
-        public String getName() {
-          return service;
-        }
-
-        @Override
-        public InetSocketAddress getSocketAddress() {
-          return address;
-        }
-      };
-    }
-
-    @Override
-    public JsonElement serialize(Discoverable src, Type typeOfSrc, JsonSerializationContext context) {
-      JsonObject jsonObj = new JsonObject();
-      jsonObj.addProperty("service", src.getName());
-      jsonObj.addProperty("hostname", src.getSocketAddress().getHostName());
-      jsonObj.addProperty("port", src.getSocketAddress().getPort());
-      return jsonObj;
-    }
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/discovery-core/src/main/java/org/apache/twill/discovery/package-info.java
----------------------------------------------------------------------
diff --git a/discovery-core/src/main/java/org/apache/twill/discovery/package-info.java b/discovery-core/src/main/java/org/apache/twill/discovery/package-info.java
deleted file mode 100644
index a1d6e0c..0000000
--- a/discovery-core/src/main/java/org/apache/twill/discovery/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * Classes in this package provides service discovery implementations.
- */
-package org.apache.twill.discovery;

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/discovery-core/src/test/java/org/apache/twill/discovery/InMemoryDiscoveryServiceTest.java
----------------------------------------------------------------------
diff --git a/discovery-core/src/test/java/org/apache/twill/discovery/InMemoryDiscoveryServiceTest.java b/discovery-core/src/test/java/org/apache/twill/discovery/InMemoryDiscoveryServiceTest.java
deleted file mode 100644
index d8cc375..0000000
--- a/discovery-core/src/test/java/org/apache/twill/discovery/InMemoryDiscoveryServiceTest.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.twill.discovery;
-
-import org.apache.twill.common.Cancellable;
-import com.google.common.collect.Iterables;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.net.InetSocketAddress;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Test memory based service discovery service.
- */
-public class InMemoryDiscoveryServiceTest {
-  private Cancellable register(DiscoveryService service, final String name, final String host, final int port) {
-    return service.register(new Discoverable() {
-      @Override
-      public String getName() {
-        return name;
-      }
-
-      @Override
-      public InetSocketAddress getSocketAddress() {
-        return new InetSocketAddress(host, port);
-      }
-    });
-  }
-
-  @Test
-  public void simpleDiscoverable() throws Exception {
-    DiscoveryService discoveryService = new InMemoryDiscoveryService();
-    DiscoveryServiceClient discoveryServiceClient = (DiscoveryServiceClient) discoveryService;
-
-    // Register one service running on one host:port
-    Cancellable cancellable = register(discoveryService, "foo", "localhost", 8090);
-    Iterable<Discoverable> discoverables = discoveryServiceClient.discover("foo");
-
-    // Discover that registered host:port.
-    Assert.assertTrue(Iterables.size(discoverables) == 1);
-
-    // Remove the service
-    cancellable.cancel();
-
-    // There should be no service.
-    discoverables = discoveryServiceClient.discover("foo");
-    TimeUnit.MILLISECONDS.sleep(100);
-    Assert.assertTrue(Iterables.size(discoverables) == 0);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/discovery-core/src/test/java/org/apache/twill/discovery/ZKDiscoveryServiceTest.java
----------------------------------------------------------------------
diff --git a/discovery-core/src/test/java/org/apache/twill/discovery/ZKDiscoveryServiceTest.java b/discovery-core/src/test/java/org/apache/twill/discovery/ZKDiscoveryServiceTest.java
deleted file mode 100644
index feee8db..0000000
--- a/discovery-core/src/test/java/org/apache/twill/discovery/ZKDiscoveryServiceTest.java
+++ /dev/null
@@ -1,253 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.discovery;
-
-import org.apache.twill.common.Cancellable;
-import org.apache.twill.common.Services;
-import org.apache.twill.internal.zookeeper.InMemoryZKServer;
-import org.apache.twill.internal.zookeeper.KillZKSession;
-import org.apache.twill.zookeeper.RetryStrategies;
-import org.apache.twill.zookeeper.ZKClientService;
-import org.apache.twill.zookeeper.ZKClientServices;
-import org.apache.twill.zookeeper.ZKClients;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.Futures;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.net.InetSocketAddress;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Test Zookeeper based discovery service.
- */
-public class ZKDiscoveryServiceTest {
-  private static final Logger LOG = LoggerFactory.getLogger(ZKDiscoveryServiceTest.class);
-
-  private static InMemoryZKServer zkServer;
-  private static ZKClientService zkClient;
-
-  @BeforeClass
-  public static void beforeClass() {
-    zkServer = InMemoryZKServer.builder().setTickTime(100000).build();
-    zkServer.startAndWait();
-
-    zkClient = ZKClientServices.delegate(
-      ZKClients.retryOnFailure(
-        ZKClients.reWatchOnExpire(
-          ZKClientService.Builder.of(zkServer.getConnectionStr()).build()),
-        RetryStrategies.fixDelay(1, TimeUnit.SECONDS)));
-    zkClient.startAndWait();
-  }
-
-  @AfterClass
-  public static void afterClass() {
-    Futures.getUnchecked(Services.chainStop(zkClient, zkServer));
-  }
-
-  private Cancellable register(DiscoveryService service, final String name, final String host, final int port) {
-    return service.register(new Discoverable() {
-      @Override
-      public String getName() {
-        return name;
-      }
-
-      @Override
-      public InetSocketAddress getSocketAddress() {
-        return new InetSocketAddress(host, port);
-      }
-    });
-  }
-
-
-  private boolean waitTillExpected(int expected, Iterable<Discoverable> discoverables) throws Exception {
-    for (int i = 0; i < 10; ++i) {
-      TimeUnit.MILLISECONDS.sleep(10);
-      if (Iterables.size(discoverables) == expected) {
-        return true;
-      }
-    }
-    return (Iterables.size(discoverables) == expected);
-  }
-
-  @Test (timeout = 5000)
-  public void testDoubleRegister() throws Exception {
-    ZKDiscoveryService discoveryService = new ZKDiscoveryService(zkClient);
-    DiscoveryServiceClient discoveryServiceClient = discoveryService;
-
-    // Register on the same host port, it shouldn't fail.
-    Cancellable cancellable = register(discoveryService, "test_double_reg", "localhost", 54321);
-    Cancellable cancellable2 = register(discoveryService, "test_double_reg", "localhost", 54321);
-
-    Iterable<Discoverable> discoverables = discoveryServiceClient.discover("test_double_reg");
-
-    Assert.assertTrue(waitTillExpected(1, discoverables));
-
-    cancellable.cancel();
-    cancellable2.cancel();
-
-    // Register again with two different clients, but killing session of the first one.
-    final ZKClientService zkClient2 = ZKClientServices.delegate(
-      ZKClients.retryOnFailure(
-        ZKClients.reWatchOnExpire(
-          ZKClientService.Builder.of(zkServer.getConnectionStr()).build()),
-        RetryStrategies.fixDelay(1, TimeUnit.SECONDS)));
-    zkClient2.startAndWait();
-
-    try {
-      ZKDiscoveryService discoveryService2 = new ZKDiscoveryService(zkClient2);
-      cancellable2 = register(discoveryService2, "test_multi_client", "localhost", 54321);
-
-      // Schedule a thread to shutdown zkClient2.
-      new Thread() {
-        @Override
-        public void run() {
-          try {
-            TimeUnit.SECONDS.sleep(2);
-            zkClient2.stopAndWait();
-          } catch (InterruptedException e) {
-            LOG.error(e.getMessage(), e);
-          }
-        }
-      }.start();
-
-      // This call would block until zkClient2 is shutdown.
-      cancellable = register(discoveryService, "test_multi_client", "localhost", 54321);
-      cancellable.cancel();
-
-    } finally {
-      zkClient2.stopAndWait();
-    }
-  }
-
-  @Test
-  public void testSessionExpires() throws Exception {
-    ZKDiscoveryService discoveryService = new ZKDiscoveryService(zkClient);
-    DiscoveryServiceClient discoveryServiceClient = discoveryService;
-
-    Cancellable cancellable = register(discoveryService, "test_expires", "localhost", 54321);
-
-    Iterable<Discoverable> discoverables = discoveryServiceClient.discover("test_expires");
-
-    // Discover that registered host:port.
-    Assert.assertTrue(waitTillExpected(1, discoverables));
-
-    KillZKSession.kill(zkClient.getZooKeeperSupplier().get(), zkServer.getConnectionStr(), 5000);
-
-    // Register one more endpoint to make sure state has been reflected after reconnection
-    Cancellable cancellable2 = register(discoveryService, "test_expires", "localhost", 54322);
-
-    // Reconnection would trigger re-registration.
-    Assert.assertTrue(waitTillExpected(2, discoverables));
-
-    cancellable.cancel();
-    cancellable2.cancel();
-
-    // Verify that both are now gone.
-    Assert.assertTrue(waitTillExpected(0, discoverables));
-  }
-
-  @Test
-  public void simpleDiscoverable() throws Exception {
-    DiscoveryService discoveryService = new ZKDiscoveryService(zkClient);
-    DiscoveryServiceClient discoveryServiceClient = new ZKDiscoveryService(zkClient);
-
-    // Register one service running on one host:port
-    Cancellable cancellable = register(discoveryService, "foo", "localhost", 8090);
-    Iterable<Discoverable> discoverables = discoveryServiceClient.discover("foo");
-
-    // Discover that registered host:port.
-    Assert.assertTrue(waitTillExpected(1, discoverables));
-
-    // Remove the service
-    cancellable.cancel();
-
-    // There should be no service.
-
-    discoverables = discoveryServiceClient.discover("foo");
-
-    Assert.assertTrue(waitTillExpected(0, discoverables));
-  }
-
-  @Test
-  public void manySameDiscoverable() throws Exception {
-    List<Cancellable> cancellables = Lists.newArrayList();
-    DiscoveryService discoveryService = new ZKDiscoveryService(zkClient);
-    DiscoveryServiceClient discoveryServiceClient = new ZKDiscoveryService(zkClient);
-
-    cancellables.add(register(discoveryService, "manyDiscoverable", "localhost", 1));
-    cancellables.add(register(discoveryService, "manyDiscoverable", "localhost", 2));
-    cancellables.add(register(discoveryService, "manyDiscoverable", "localhost", 3));
-    cancellables.add(register(discoveryService, "manyDiscoverable", "localhost", 4));
-    cancellables.add(register(discoveryService, "manyDiscoverable", "localhost", 5));
-
-    Iterable<Discoverable> discoverables = discoveryServiceClient.discover("manyDiscoverable");
-    Assert.assertTrue(waitTillExpected(5, discoverables));
-
-    for (int i = 0; i < 5; i++) {
-      cancellables.get(i).cancel();
-      Assert.assertTrue(waitTillExpected(4 - i, discoverables));
-    }
-  }
-
-  @Test
-  public void multiServiceDiscoverable() throws Exception {
-    List<Cancellable> cancellables = Lists.newArrayList();
-    DiscoveryService discoveryService = new ZKDiscoveryService(zkClient);
-    DiscoveryServiceClient discoveryServiceClient = new ZKDiscoveryService(zkClient);
-
-    cancellables.add(register(discoveryService, "service1", "localhost", 1));
-    cancellables.add(register(discoveryService, "service1", "localhost", 2));
-    cancellables.add(register(discoveryService, "service1", "localhost", 3));
-    cancellables.add(register(discoveryService, "service1", "localhost", 4));
-    cancellables.add(register(discoveryService, "service1", "localhost", 5));
-
-    cancellables.add(register(discoveryService, "service2", "localhost", 1));
-    cancellables.add(register(discoveryService, "service2", "localhost", 2));
-    cancellables.add(register(discoveryService, "service2", "localhost", 3));
-
-    cancellables.add(register(discoveryService, "service3", "localhost", 1));
-    cancellables.add(register(discoveryService, "service3", "localhost", 2));
-
-    Iterable<Discoverable> discoverables = discoveryServiceClient.discover("service1");
-    Assert.assertTrue(waitTillExpected(5, discoverables));
-
-    discoverables = discoveryServiceClient.discover("service2");
-    Assert.assertTrue(waitTillExpected(3, discoverables));
-
-    discoverables = discoveryServiceClient.discover("service3");
-    Assert.assertTrue(waitTillExpected(2, discoverables));
-
-    cancellables.add(register(discoveryService, "service3", "localhost", 3));
-    Assert.assertTrue(waitTillExpected(3, discoverables)); // Shows live iterator.
-
-    for (Cancellable cancellable : cancellables) {
-      cancellable.cancel();
-    }
-
-    Assert.assertTrue(waitTillExpected(0, discoveryServiceClient.discover("service1")));
-    Assert.assertTrue(waitTillExpected(0, discoveryServiceClient.discover("service2")));
-    Assert.assertTrue(waitTillExpected(0, discoveryServiceClient.discover("service3")));
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/discovery-core/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/discovery-core/src/test/resources/logback-test.xml b/discovery-core/src/test/resources/logback-test.xml
deleted file mode 100644
index 2615cb4..0000000
--- a/discovery-core/src/test/resources/logback-test.xml
+++ /dev/null
@@ -1,17 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-
-<!-- Default logback configuration for twill library -->
-<configuration>
-    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
-        <encoder>
-            <pattern>%d{ISO8601} - %-5p [%t:%C{1}@%L] - %m%n</pattern>
-        </encoder>
-    </appender>
-
-    <logger name="org.apache.twill" level="DEBUG" />
-
-    <root level="WARN">
-        <appender-ref ref="STDOUT"/>
-    </root>
-
-</configuration>