You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@twill.apache.org by ch...@apache.org on 2013/12/13 23:27:38 UTC
[22/28] [TWILL-14] Bootstrapping for the site generation.
Reorganization of the source tree happens:
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/core/src/test/java/org/apache/twill/internal/ControllerTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/twill/internal/ControllerTest.java b/core/src/test/java/org/apache/twill/internal/ControllerTest.java
deleted file mode 100644
index 382dc95..0000000
--- a/core/src/test/java/org/apache/twill/internal/ControllerTest.java
+++ /dev/null
@@ -1,211 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal;
-
-import org.apache.twill.api.Command;
-import org.apache.twill.api.ResourceReport;
-import org.apache.twill.api.RunId;
-import org.apache.twill.api.ServiceController;
-import org.apache.twill.api.TwillController;
-import org.apache.twill.api.logging.LogHandler;
-import org.apache.twill.common.ServiceListenerAdapter;
-import org.apache.twill.common.Threads;
-import org.apache.twill.internal.state.StateNode;
-import org.apache.twill.internal.zookeeper.InMemoryZKServer;
-import org.apache.twill.zookeeper.NodeData;
-import org.apache.twill.zookeeper.ZKClient;
-import org.apache.twill.zookeeper.ZKClientService;
-import com.google.common.base.Suppliers;
-import com.google.common.collect.ImmutableList;
-import com.google.common.util.concurrent.AbstractIdleService;
-import com.google.common.util.concurrent.Service;
-import com.google.gson.JsonObject;
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-/**
- *
- */
-public class ControllerTest {
-
- private static final Logger LOG = LoggerFactory.getLogger(ControllerTest.class);
-
- @Test
- public void testController() throws ExecutionException, InterruptedException, TimeoutException {
- InMemoryZKServer zkServer = InMemoryZKServer.builder().build();
- zkServer.startAndWait();
-
- LOG.info("ZKServer: " + zkServer.getConnectionStr());
-
- try {
- RunId runId = RunIds.generate();
- ZKClientService zkClientService = ZKClientService.Builder.of(zkServer.getConnectionStr()).build();
- zkClientService.startAndWait();
-
- Service service = createService(zkClientService, runId);
- service.startAndWait();
-
- TwillController controller = getController(zkClientService, runId);
- controller.sendCommand(Command.Builder.of("test").build()).get(2, TimeUnit.SECONDS);
- controller.stop().get(2, TimeUnit.SECONDS);
-
- Assert.assertEquals(ServiceController.State.TERMINATED, controller.state());
-
- final CountDownLatch terminateLatch = new CountDownLatch(1);
- service.addListener(new ServiceListenerAdapter() {
- @Override
- public void terminated(Service.State from) {
- terminateLatch.countDown();
- }
- }, Threads.SAME_THREAD_EXECUTOR);
-
- Assert.assertTrue(service.state() == Service.State.TERMINATED || terminateLatch.await(2, TimeUnit.SECONDS));
-
- zkClientService.stopAndWait();
-
- } finally {
- zkServer.stopAndWait();
- }
- }
-
- // Test controller created before service starts.
- @Test
- public void testControllerBefore() throws InterruptedException {
- InMemoryZKServer zkServer = InMemoryZKServer.builder().build();
- zkServer.startAndWait();
-
- LOG.info("ZKServer: " + zkServer.getConnectionStr());
- try {
- RunId runId = RunIds.generate();
- ZKClientService zkClientService = ZKClientService.Builder.of(zkServer.getConnectionStr()).build();
- zkClientService.startAndWait();
-
- final CountDownLatch runLatch = new CountDownLatch(1);
- final CountDownLatch stopLatch = new CountDownLatch(1);
- TwillController controller = getController(zkClientService, runId);
- controller.addListener(new ServiceListenerAdapter() {
- @Override
- public void running() {
- runLatch.countDown();
- }
-
- @Override
- public void terminated(Service.State from) {
- stopLatch.countDown();
- }
- }, Threads.SAME_THREAD_EXECUTOR);
-
- Service service = createService(zkClientService, runId);
- service.start();
-
- Assert.assertTrue(runLatch.await(2, TimeUnit.SECONDS));
- Assert.assertFalse(stopLatch.await(2, TimeUnit.SECONDS));
-
- service.stop();
-
- Assert.assertTrue(stopLatch.await(2, TimeUnit.SECONDS));
-
- } finally {
- zkServer.stopAndWait();
- }
- }
-
- // Test controller listener receive first state change without state transition from service
- @Test
- public void testControllerListener() throws InterruptedException {
- InMemoryZKServer zkServer = InMemoryZKServer.builder().build();
- zkServer.startAndWait();
-
- LOG.info("ZKServer: " + zkServer.getConnectionStr());
- try {
- RunId runId = RunIds.generate();
- ZKClientService zkClientService = ZKClientService.Builder.of(zkServer.getConnectionStr()).build();
- zkClientService.startAndWait();
-
- Service service = createService(zkClientService, runId);
- service.startAndWait();
-
- final CountDownLatch runLatch = new CountDownLatch(1);
- TwillController controller = getController(zkClientService, runId);
- controller.addListener(new ServiceListenerAdapter() {
- @Override
- public void running() {
- runLatch.countDown();
- }
- }, Threads.SAME_THREAD_EXECUTOR);
-
- Assert.assertTrue(runLatch.await(2, TimeUnit.SECONDS));
-
- service.stopAndWait();
-
- zkClientService.stopAndWait();
- } finally {
- zkServer.stopAndWait();
- }
- }
-
- private Service createService(ZKClient zkClient, RunId runId) {
- return new ZKServiceDecorator(
- zkClient, runId, Suppliers.ofInstance(new JsonObject()), new AbstractIdleService() {
-
- @Override
- protected void startUp() throws Exception {
- LOG.info("Start");
- }
-
- @Override
- protected void shutDown() throws Exception {
- LOG.info("Stop");
- }
- });
- }
-
- private TwillController getController(ZKClient zkClient, RunId runId) {
- TwillController controller = new AbstractTwillController(runId, zkClient, ImmutableList.<LogHandler>of()) {
-
- @Override
- public void kill() {
- // No-op
- }
-
- @Override
- protected void instanceNodeUpdated(NodeData nodeData) {
- // No-op
- }
-
- @Override
- protected void stateNodeUpdated(StateNode stateNode) {
- // No-op
- }
-
- @Override
- public ResourceReport getResourceReport() {
- return null;
- }
- };
- controller.startAndWait();
- return controller;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/core/src/test/java/org/apache/twill/internal/state/MessageCodecTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/twill/internal/state/MessageCodecTest.java b/core/src/test/java/org/apache/twill/internal/state/MessageCodecTest.java
deleted file mode 100644
index d267cf8..0000000
--- a/core/src/test/java/org/apache/twill/internal/state/MessageCodecTest.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.state;
-
-import org.apache.twill.api.Command;
-import com.google.common.collect.ImmutableMap;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.Map;
-
-/**
- *
- */
-public class MessageCodecTest {
-
- @Test
- public void testCodec() {
- Message message = MessageCodec.decode(MessageCodec.encode(new Message() {
-
- @Override
- public Type getType() {
- return Type.SYSTEM;
- }
-
- @Override
- public Scope getScope() {
- return Scope.APPLICATION;
- }
-
- @Override
- public String getRunnableName() {
- return null;
- }
-
- @Override
- public Command getCommand() {
- return new Command() {
- @Override
- public String getCommand() {
- return "stop";
- }
-
- @Override
- public Map<String, String> getOptions() {
- return ImmutableMap.of("timeout", "1", "timeoutUnit", "SECONDS");
- }
- };
- }
- }));
-
- Assert.assertEquals(Message.Type.SYSTEM, message.getType());
- Assert.assertEquals(Message.Scope.APPLICATION, message.getScope());
- Assert.assertNull(message.getRunnableName());
- Assert.assertEquals("stop", message.getCommand().getCommand());
- Assert.assertEquals(ImmutableMap.of("timeout", "1", "timeoutUnit", "SECONDS"), message.getCommand().getOptions());
- }
-
- @Test
- public void testFailureDecode() {
- Assert.assertNull(MessageCodec.decode("".getBytes()));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/core/src/test/java/org/apache/twill/internal/state/ZKServiceDecoratorTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/twill/internal/state/ZKServiceDecoratorTest.java b/core/src/test/java/org/apache/twill/internal/state/ZKServiceDecoratorTest.java
deleted file mode 100644
index 47d8562..0000000
--- a/core/src/test/java/org/apache/twill/internal/state/ZKServiceDecoratorTest.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.state;
-
-import org.apache.twill.api.RunId;
-import org.apache.twill.internal.RunIds;
-import org.apache.twill.internal.ZKServiceDecorator;
-import org.apache.twill.internal.zookeeper.InMemoryZKServer;
-import org.apache.twill.zookeeper.NodeData;
-import org.apache.twill.zookeeper.ZKClientService;
-import org.apache.twill.zookeeper.ZKClients;
-import com.google.common.base.Charsets;
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Suppliers;
-import com.google.common.util.concurrent.AbstractIdleService;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.Service;
-import com.google.gson.Gson;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.data.Stat;
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicReference;
-
-/**
- *
- */
-public class ZKServiceDecoratorTest {
-
- private static final Logger LOG = LoggerFactory.getLogger(ZKServiceDecoratorTest.class);
-
- @Test
- public void testStateTransition() throws InterruptedException, ExecutionException, TimeoutException {
- InMemoryZKServer zkServer = InMemoryZKServer.builder().build();
- zkServer.startAndWait();
-
- try {
- final String namespace = Joiner.on('/').join("/twill", RunIds.generate(), "runnables", "Runner1");
-
- final ZKClientService zkClient = ZKClientService.Builder.of(zkServer.getConnectionStr()).build();
- zkClient.startAndWait();
- zkClient.create(namespace, null, CreateMode.PERSISTENT).get();
-
- try {
- JsonObject content = new JsonObject();
- content.addProperty("containerId", "container-123");
- content.addProperty("host", "localhost");
-
- RunId runId = RunIds.generate();
- final Semaphore semaphore = new Semaphore(0);
- ZKServiceDecorator service = new ZKServiceDecorator(ZKClients.namespace(zkClient, namespace),
- runId, Suppliers.ofInstance(content),
- new AbstractIdleService() {
- @Override
- protected void startUp() throws Exception {
- Preconditions.checkArgument(semaphore.tryAcquire(5, TimeUnit.SECONDS), "Fail to start");
- }
-
- @Override
- protected void shutDown() throws Exception {
- Preconditions.checkArgument(semaphore.tryAcquire(5, TimeUnit.SECONDS), "Fail to stop");
- }
- });
-
- final String runnablePath = namespace + "/" + runId.getId();
- final AtomicReference<String> stateMatch = new AtomicReference<String>("STARTING");
- watchDataChange(zkClient, runnablePath + "/state", semaphore, stateMatch);
- Assert.assertEquals(Service.State.RUNNING, service.start().get(5, TimeUnit.SECONDS));
-
- stateMatch.set("STOPPING");
- Assert.assertEquals(Service.State.TERMINATED, service.stop().get(5, TimeUnit.SECONDS));
-
- } finally {
- zkClient.stopAndWait();
- }
- } finally {
- zkServer.stopAndWait();
- }
- }
-
- private void watchDataChange(final ZKClientService zkClient, final String path,
- final Semaphore semaphore, final AtomicReference<String> stateMatch) {
- Futures.addCallback(zkClient.getData(path, new Watcher() {
- @Override
- public void process(WatchedEvent event) {
- if (event.getType() == Event.EventType.NodeDataChanged) {
- watchDataChange(zkClient, path, semaphore, stateMatch);
- }
- }
- }), new FutureCallback<NodeData>() {
- @Override
- public void onSuccess(NodeData result) {
- String content = new String(result.getData(), Charsets.UTF_8);
- JsonObject json = new Gson().fromJson(content, JsonElement.class).getAsJsonObject();
- if (stateMatch.get().equals(json.get("state").getAsString())) {
- semaphore.release();
- }
- }
-
- @Override
- public void onFailure(Throwable t) {
- exists();
- }
-
- private void exists() {
- Futures.addCallback(zkClient.exists(path, new Watcher() {
- @Override
- public void process(WatchedEvent event) {
- if (event.getType() == Event.EventType.NodeCreated) {
- watchDataChange(zkClient, path, semaphore, stateMatch);
- }
- }
- }), new FutureCallback<Stat>() {
- @Override
- public void onSuccess(Stat result) {
- if (result != null) {
- watchDataChange(zkClient, path, semaphore, stateMatch);
- }
- }
-
- @Override
- public void onFailure(Throwable t) {
- LOG.error(t.getMessage(), t);
- }
- });
- }
- });
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/core/src/test/java/org/apache/twill/internal/utils/ApplicationBundlerTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/twill/internal/utils/ApplicationBundlerTest.java b/core/src/test/java/org/apache/twill/internal/utils/ApplicationBundlerTest.java
deleted file mode 100644
index 508cadb..0000000
--- a/core/src/test/java/org/apache/twill/internal/utils/ApplicationBundlerTest.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.utils;
-
-import org.apache.twill.filesystem.LocalLocationFactory;
-import org.apache.twill.filesystem.Location;
-import org.apache.twill.internal.ApplicationBundler;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import com.google.common.io.ByteStreams;
-import com.google.common.io.Files;
-import org.junit.Assert;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.net.URLClassLoader;
-import java.util.List;
-import java.util.jar.JarEntry;
-import java.util.jar.JarInputStream;
-
-/**
- *
- */
-public class ApplicationBundlerTest {
-
- @Rule
- public TemporaryFolder tmpDir = new TemporaryFolder();
-
- @Test
- public void testFindDependencies() throws IOException, ClassNotFoundException {
- Location location = new LocalLocationFactory(tmpDir.newFolder()).create("test.jar");
-
- // Create a jar file with by tracing dependency
- ApplicationBundler bundler = new ApplicationBundler(ImmutableList.<String>of());
- bundler.createBundle(location, ApplicationBundler.class);
-
- File targetDir = tmpDir.newFolder();
- unjar(new File(location.toURI()), targetDir);
-
- // Load the class back, it should be loaded by the custom classloader
- ClassLoader classLoader = createClassLoader(targetDir);
- Class<?> clz = classLoader.loadClass(ApplicationBundler.class.getName());
- Assert.assertSame(classLoader, clz.getClassLoader());
-
- // For system classes, they shouldn't be packaged, hence loaded by different classloader.
- clz = classLoader.loadClass(Object.class.getName());
- Assert.assertNotSame(classLoader, clz.getClassLoader());
- }
-
- private void unjar(File jarFile, File targetDir) throws IOException {
- JarInputStream jarInput = new JarInputStream(new FileInputStream(jarFile));
- try {
- JarEntry jarEntry = jarInput.getNextJarEntry();
- while (jarEntry != null) {
- File target = new File(targetDir, jarEntry.getName());
- if (jarEntry.isDirectory()) {
- target.mkdirs();
- } else {
- target.getParentFile().mkdirs();
- ByteStreams.copy(jarInput, Files.newOutputStreamSupplier(target));
- }
-
- jarEntry = jarInput.getNextJarEntry();
- }
- } finally {
- jarInput.close();
- }
- }
-
- private ClassLoader createClassLoader(File dir) throws MalformedURLException {
- List<URL> urls = Lists.newArrayList();
- urls.add(new File(dir, "classes").toURI().toURL());
- File[] libFiles = new File(dir, "lib").listFiles();
- if (libFiles != null) {
- for (File file : libFiles) {
- urls.add(file.toURI().toURL());
- }
- }
- return new URLClassLoader(urls.toArray(new URL[0])) {
- @Override
- protected synchronized Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException {
- // Load class from the given URLs first before delegating to parent.
- try {
- return super.findClass(name);
- } catch (ClassNotFoundException e) {
- ClassLoader parent = getParent();
- return parent == null ? ClassLoader.getSystemClassLoader().loadClass(name) : parent.loadClass(name);
- }
- }
- };
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java b/core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java
deleted file mode 100644
index 40fc3ed..0000000
--- a/core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java
+++ /dev/null
@@ -1,220 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.kafka.client;
-
-import org.apache.twill.common.Services;
-import org.apache.twill.internal.kafka.EmbeddedKafkaServer;
-import org.apache.twill.internal.kafka.client.Compression;
-import org.apache.twill.internal.kafka.client.SimpleKafkaClient;
-import org.apache.twill.internal.utils.Networks;
-import org.apache.twill.internal.zookeeper.InMemoryZKServer;
-import org.apache.twill.zookeeper.ZKClientService;
-import com.google.common.base.Charsets;
-import com.google.common.base.Preconditions;
-import com.google.common.io.ByteStreams;
-import com.google.common.io.Files;
-import com.google.common.util.concurrent.Futures;
-import org.apache.commons.compress.archivers.ArchiveEntry;
-import org.apache.commons.compress.archivers.ArchiveException;
-import org.apache.commons.compress.archivers.ArchiveInputStream;
-import org.apache.commons.compress.archivers.ArchiveStreamFactory;
-import org.apache.commons.compress.compressors.CompressorException;
-import org.apache.commons.compress.compressors.CompressorStreamFactory;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.Iterator;
-import java.util.Properties;
-import java.util.concurrent.TimeUnit;
-
-/**
- *
- */
-public class KafkaTest {
-
- private static final Logger LOG = LoggerFactory.getLogger(KafkaTest.class);
-
- @ClassRule
- public static final TemporaryFolder TMP_FOLDER = new TemporaryFolder();
-
- private static InMemoryZKServer zkServer;
- private static EmbeddedKafkaServer kafkaServer;
- private static ZKClientService zkClientService;
- private static KafkaClient kafkaClient;
-
- @BeforeClass
- public static void init() throws Exception {
- zkServer = InMemoryZKServer.builder().setDataDir(TMP_FOLDER.newFolder()).build();
- zkServer.startAndWait();
-
- // Extract the kafka.tgz and start the kafka server
- kafkaServer = new EmbeddedKafkaServer(extractKafka(), generateKafkaConfig(zkServer.getConnectionStr()));
- kafkaServer.startAndWait();
-
- zkClientService = ZKClientService.Builder.of(zkServer.getConnectionStr()).build();
-
- kafkaClient = new SimpleKafkaClient(zkClientService);
- Services.chainStart(zkClientService, kafkaClient).get();
- }
-
- @AfterClass
- public static void finish() throws Exception {
- Services.chainStop(kafkaClient, zkClientService).get();
- kafkaServer.stopAndWait();
- zkServer.stopAndWait();
- }
-
- @Test
- public void testKafkaClient() throws Exception {
- String topic = "testClient";
-
- Thread t1 = createPublishThread(kafkaClient, topic, Compression.GZIP, "GZIP Testing message", 10);
- Thread t2 = createPublishThread(kafkaClient, topic, Compression.NONE, "Testing message", 10);
-
- t1.start();
- t2.start();
-
- Thread t3 = createPublishThread(kafkaClient, topic, Compression.SNAPPY, "Snappy Testing message", 10);
- t2.join();
- t3.start();
-
- Iterator<FetchedMessage> consumer = kafkaClient.consume(topic, 0, 0, 1048576);
- int count = 0;
- long startTime = System.nanoTime();
- while (count < 30 && consumer.hasNext() && secondsPassed(startTime, TimeUnit.NANOSECONDS) < 5) {
- LOG.info(Charsets.UTF_8.decode(consumer.next().getBuffer()).toString());
- count++;
- }
-
- Assert.assertEquals(30, count);
- }
-
- @Test (timeout = 10000)
- public void testOffset() throws Exception {
- String topic = "testOffset";
-
- // Initial earliest offset should be 0.
- long[] offsets = kafkaClient.getOffset(topic, 0, -2, 10).get();
- Assert.assertArrayEquals(new long[]{0L}, offsets);
-
- // Publish some messages
- Thread publishThread = createPublishThread(kafkaClient, topic, Compression.NONE, "Testing", 2000);
- publishThread.start();
- publishThread.join();
-
- // Fetch earliest offset, should still be 0.
- offsets = kafkaClient.getOffset(topic, 0, -2, 10).get();
- Assert.assertArrayEquals(new long[]{0L}, offsets);
-
- // Fetch latest offset
- offsets = kafkaClient.getOffset(topic, 0, -1, 10).get();
- Iterator<FetchedMessage> consumer = kafkaClient.consume(topic, 0, offsets[0], 1048576);
-
- // Publish one more message, the consumer should see the new message being published.
- publishThread = createPublishThread(kafkaClient, topic, Compression.NONE, "Testing", 1, 3000);
- publishThread.start();
- publishThread.join();
-
- // Should see the last message being published.
- Assert.assertTrue(consumer.hasNext());
- Assert.assertEquals("3000 Testing", Charsets.UTF_8.decode(consumer.next().getBuffer()).toString());
- }
-
- private Thread createPublishThread(final KafkaClient kafkaClient, final String topic,
- final Compression compression, final String message, final int count) {
- return createPublishThread(kafkaClient, topic, compression, message, count, 0);
- }
-
- private Thread createPublishThread(final KafkaClient kafkaClient, final String topic, final Compression compression,
- final String message, final int count, final int base) {
- return new Thread() {
- public void run() {
- PreparePublish preparePublish = kafkaClient.preparePublish(topic, compression);
- for (int i = 0; i < count; i++) {
- preparePublish.add(((base + i) + " " + message).getBytes(Charsets.UTF_8), 0);
- }
- Futures.getUnchecked(preparePublish.publish());
- }
- };
- }
-
- private long secondsPassed(long startTime, TimeUnit startUnit) {
- return TimeUnit.SECONDS.convert(System.nanoTime() - TimeUnit.NANOSECONDS.convert(startTime, startUnit),
- TimeUnit.NANOSECONDS);
- }
-
- private static File extractKafka() throws IOException, ArchiveException, CompressorException {
- File kafkaExtract = TMP_FOLDER.newFolder();
- InputStream kakfaResource = KafkaTest.class.getClassLoader().getResourceAsStream("kafka-0.7.2.tgz");
- ArchiveInputStream archiveInput = new ArchiveStreamFactory()
- .createArchiveInputStream(ArchiveStreamFactory.TAR,
- new CompressorStreamFactory()
- .createCompressorInputStream(CompressorStreamFactory.GZIP, kakfaResource));
-
- try {
- ArchiveEntry entry = archiveInput.getNextEntry();
- while (entry != null) {
- File file = new File(kafkaExtract, entry.getName());
- if (entry.isDirectory()) {
- file.mkdirs();
- } else {
- ByteStreams.copy(archiveInput, Files.newOutputStreamSupplier(file));
- }
- entry = archiveInput.getNextEntry();
- }
- } finally {
- archiveInput.close();
- }
- return kafkaExtract;
- }
-
- private static Properties generateKafkaConfig(String zkConnectStr) throws IOException {
- int port = Networks.getRandomPort();
- Preconditions.checkState(port > 0, "Failed to get random port.");
-
- Properties prop = new Properties();
- prop.setProperty("log.dir", TMP_FOLDER.newFolder().getAbsolutePath());
- prop.setProperty("zk.connect", zkConnectStr);
- prop.setProperty("num.threads", "8");
- prop.setProperty("port", Integer.toString(port));
- prop.setProperty("log.flush.interval", "1000");
- prop.setProperty("max.socket.request.bytes", "104857600");
- prop.setProperty("log.cleanup.interval.mins", "1");
- prop.setProperty("log.default.flush.scheduler.interval.ms", "1000");
- prop.setProperty("zk.connectiontimeout.ms", "1000000");
- prop.setProperty("socket.receive.buffer", "1048576");
- prop.setProperty("enable.zookeeper", "true");
- prop.setProperty("log.retention.hours", "24");
- prop.setProperty("brokerid", "0");
- prop.setProperty("socket.send.buffer", "1048576");
- prop.setProperty("num.partitions", "1");
- // Use a really small file size to force some flush to happen
- prop.setProperty("log.file.size", "1024");
- prop.setProperty("log.default.flush.interval.ms", "1000");
- return prop;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/core/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/core/src/test/resources/logback-test.xml b/core/src/test/resources/logback-test.xml
deleted file mode 100644
index 3c36660..0000000
--- a/core/src/test/resources/logback-test.xml
+++ /dev/null
@@ -1,18 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-
-<!-- Default logback configuration for twill library -->
-<configuration>
- <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
- <encoder>
- <pattern>%d{ISO8601} - %-5p [%t:%C{1}@%L] - %m%n</pattern>
- </encoder>
- </appender>
-
- <logger name="org.apache.hadoop" level="WARN" />
- <logger name="org.apache.zookeeper" level="WARN" />
-
- <root level="INFO">
- <appender-ref ref="STDOUT"/>
- </root>
-
-</configuration>
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/discovery-api/pom.xml
----------------------------------------------------------------------
diff --git a/discovery-api/pom.xml b/discovery-api/pom.xml
deleted file mode 100644
index e41b214..0000000
--- a/discovery-api/pom.xml
+++ /dev/null
@@ -1,39 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <parent>
- <artifactId>twill-parent</artifactId>
- <groupId>org.apache.twill</groupId>
- <version>0.1.0-SNAPSHOT</version>
- </parent>
- <modelVersion>4.0.0</modelVersion>
-
- <artifactId>twill-discovery-api</artifactId>
- <name>Twill discovery service API</name>
-
- <dependencies>
- <dependency>
- <groupId>${project.groupId}</groupId>
- <artifactId>twill-common</artifactId>
- <version>${project.version}</version>
- </dependency>
- </dependencies>
-</project>
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/discovery-api/src/main/java/org/apache/twill/discovery/Discoverable.java
----------------------------------------------------------------------
diff --git a/discovery-api/src/main/java/org/apache/twill/discovery/Discoverable.java b/discovery-api/src/main/java/org/apache/twill/discovery/Discoverable.java
deleted file mode 100644
index a5529fe..0000000
--- a/discovery-api/src/main/java/org/apache/twill/discovery/Discoverable.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.twill.discovery;
-
-import java.net.InetSocketAddress;
-
-/**
- * Discoverable defines the attributes of service to be discovered.
- */
-public interface Discoverable {
-
- /**
- * @return Name of the service
- */
- String getName();
-
- /**
- * @return An {@link InetSocketAddress} representing the host+port of the service.
- */
- InetSocketAddress getSocketAddress();
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/discovery-api/src/main/java/org/apache/twill/discovery/DiscoveryService.java
----------------------------------------------------------------------
diff --git a/discovery-api/src/main/java/org/apache/twill/discovery/DiscoveryService.java b/discovery-api/src/main/java/org/apache/twill/discovery/DiscoveryService.java
deleted file mode 100644
index a26fff8..0000000
--- a/discovery-api/src/main/java/org/apache/twill/discovery/DiscoveryService.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.discovery;
-
-
-import org.apache.twill.common.Cancellable;
-
-/**
- * DiscoveryService defines interface for registering {@link Discoverable}.
- */
-public interface DiscoveryService {
-
- /**
- * Registers a {@link Discoverable} service.
- * @param discoverable Information of the service provider that could be discovered.
- * @return A {@link Cancellable} for un-registration.
- */
- Cancellable register(Discoverable discoverable);
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/discovery-api/src/main/java/org/apache/twill/discovery/DiscoveryServiceClient.java
----------------------------------------------------------------------
diff --git a/discovery-api/src/main/java/org/apache/twill/discovery/DiscoveryServiceClient.java b/discovery-api/src/main/java/org/apache/twill/discovery/DiscoveryServiceClient.java
deleted file mode 100644
index 89cf269..0000000
--- a/discovery-api/src/main/java/org/apache/twill/discovery/DiscoveryServiceClient.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.discovery;
-
-/**
- * Interface for {@link DiscoveryServiceClient} to discover services registered with {@link DiscoveryService}.
- */
-public interface DiscoveryServiceClient {
-
- /**
- * Retrieves a list of {@link Discoverable} for the a service with the given name.
- *
- * @param name Name of the service
- * @return A live {@link Iterable} that on each call to {@link Iterable#iterator()} returns
- * an {@link java.util.Iterator Iterator} that reflects the latest set of
- * available {@link Discoverable} services.
- */
- Iterable<Discoverable> discover(String name);
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/discovery-core/pom.xml
----------------------------------------------------------------------
diff --git a/discovery-core/pom.xml b/discovery-core/pom.xml
deleted file mode 100644
index 2612138..0000000
--- a/discovery-core/pom.xml
+++ /dev/null
@@ -1,52 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <parent>
- <artifactId>twill-parent</artifactId>
- <groupId>org.apache.twill</groupId>
- <version>0.1.0-SNAPSHOT</version>
- </parent>
- <modelVersion>4.0.0</modelVersion>
-
- <artifactId>twill-discovery-core</artifactId>
- <name>Twill discovery service implementations</name>
-
- <dependencies>
- <dependency>
- <groupId>${project.groupId}</groupId>
- <artifactId>twill-discovery-api</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>${project.groupId}</groupId>
- <artifactId>twill-zookeeper</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>com.google.code.gson</groupId>
- <artifactId>gson</artifactId>
- </dependency>
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- </dependency>
- </dependencies>
-</project>
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/discovery-core/src/main/java/org/apache/twill/discovery/DiscoverableWrapper.java
----------------------------------------------------------------------
diff --git a/discovery-core/src/main/java/org/apache/twill/discovery/DiscoverableWrapper.java b/discovery-core/src/main/java/org/apache/twill/discovery/DiscoverableWrapper.java
deleted file mode 100644
index 5fa97d1..0000000
--- a/discovery-core/src/main/java/org/apache/twill/discovery/DiscoverableWrapper.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.discovery;
-
-import java.net.InetSocketAddress;
-
-/**
- * Wrapper for a discoverable.
- */
-final class DiscoverableWrapper implements Discoverable {
- private final String name;
- private final InetSocketAddress address;
-
- DiscoverableWrapper(Discoverable discoverable) {
- this.name = discoverable.getName();
- this.address = discoverable.getSocketAddress();
- }
-
- @Override
- public String getName() {
- return name;
- }
-
- @Override
- public InetSocketAddress getSocketAddress() {
- return address;
- }
-
- @Override
- public String toString() {
- return "{name=" + name + ", address=" + address;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
-
- Discoverable other = (Discoverable) o;
-
- return name.equals(other.getName()) && address.equals(other.getSocketAddress());
- }
-
- @Override
- public int hashCode() {
- int result = name.hashCode();
- result = 31 * result + address.hashCode();
- return result;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/discovery-core/src/main/java/org/apache/twill/discovery/InMemoryDiscoveryService.java
----------------------------------------------------------------------
diff --git a/discovery-core/src/main/java/org/apache/twill/discovery/InMemoryDiscoveryService.java b/discovery-core/src/main/java/org/apache/twill/discovery/InMemoryDiscoveryService.java
deleted file mode 100644
index 7a9e984..0000000
--- a/discovery-core/src/main/java/org/apache/twill/discovery/InMemoryDiscoveryService.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.discovery;
-
-import org.apache.twill.common.Cancellable;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Multimap;
-
-import java.util.Iterator;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-/**
- * A simple in memory implementation of {@link DiscoveryService} and {@link DiscoveryServiceClient}.
- */
-public class InMemoryDiscoveryService implements DiscoveryService, DiscoveryServiceClient {
-
- private final Multimap<String, Discoverable> services = HashMultimap.create();
- private final Lock lock = new ReentrantLock();
-
- @Override
- public Cancellable register(final Discoverable discoverable) {
- lock.lock();
- try {
- final Discoverable wrapper = new DiscoverableWrapper(discoverable);
- services.put(wrapper.getName(), wrapper);
- return new Cancellable() {
- @Override
- public void cancel() {
- lock.lock();
- try {
- services.remove(wrapper.getName(), wrapper);
- } finally {
- lock.unlock();
- }
- }
- };
- } finally {
- lock.unlock();
- }
- }
-
- @Override
- public Iterable<Discoverable> discover(final String name) {
- return new Iterable<Discoverable>() {
- @Override
- public Iterator<Discoverable> iterator() {
- lock.lock();
- try {
- return ImmutableList.copyOf(services.get(name)).iterator();
- } finally {
- lock.unlock();
- }
- }
- };
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/discovery-core/src/main/java/org/apache/twill/discovery/ZKDiscoveryService.java
----------------------------------------------------------------------
diff --git a/discovery-core/src/main/java/org/apache/twill/discovery/ZKDiscoveryService.java b/discovery-core/src/main/java/org/apache/twill/discovery/ZKDiscoveryService.java
deleted file mode 100644
index e2f9bc0..0000000
--- a/discovery-core/src/main/java/org/apache/twill/discovery/ZKDiscoveryService.java
+++ /dev/null
@@ -1,511 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.discovery;
-
-import org.apache.twill.common.Cancellable;
-import org.apache.twill.common.Threads;
-import org.apache.twill.zookeeper.NodeChildren;
-import org.apache.twill.zookeeper.NodeData;
-import org.apache.twill.zookeeper.OperationFuture;
-import org.apache.twill.zookeeper.ZKClient;
-import org.apache.twill.zookeeper.ZKClients;
-import org.apache.twill.zookeeper.ZKOperations;
-import com.google.common.base.Charsets;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Multimap;
-import com.google.common.hash.Hashing;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
-import com.google.gson.GsonBuilder;
-import com.google.gson.JsonDeserializationContext;
-import com.google.gson.JsonDeserializer;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonParseException;
-import com.google.gson.JsonSerializationContext;
-import com.google.gson.JsonSerializer;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.data.Stat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.lang.reflect.Type;
-import java.net.InetSocketAddress;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-/**
- * Zookeeper implementation of {@link DiscoveryService} and {@link DiscoveryServiceClient}.
- * <p>
- * Discoverable services are registered within Zookeeper under the namespace 'discoverable' by default.
- * If you would like to change the namespace under which the services are registered then you can pass
- * in the namespace during construction of {@link ZKDiscoveryService}.
- * </p>
- *
- * <p>
- * Following is a simple example of how {@link ZKDiscoveryService} can be used for registering services
- * and also for discovering the registered services.
- * <blockquote>
- * <pre>
- * {@code
- *
- * DiscoveryService service = new ZKDiscoveryService(zkClient);
- * service.register(new Discoverable() {
- * @Override
- * public String getName() {
- * return 'service-name';
- * }
- *
- * @Override
- * public InetSocketAddress getSocketAddress() {
- * return new InetSocketAddress(hostname, port);
- * }
- * });
- * ...
- * ...
- * Iterable<Discoverable> services = service.discovery("service-name");
- * ...
- * }
- * </pre>
- * </blockquote>
- * </p>
- */
-public class ZKDiscoveryService implements DiscoveryService, DiscoveryServiceClient {
- private static final Logger LOG = LoggerFactory.getLogger(ZKDiscoveryService.class);
- private static final String NAMESPACE = "/discoverable";
-
- private static final long RETRY_MILLIS = 1000;
-
- // In memory map for recreating ephemeral nodes after session expires.
- // It map from discoverable to the corresponding Cancellable
- private final Multimap<Discoverable, DiscoveryCancellable> discoverables;
- private final Lock lock;
-
- private final LoadingCache<String, Iterable<Discoverable>> services;
- private final ZKClient zkClient;
- private final ScheduledExecutorService retryExecutor;
-
- /**
- * Constructs ZKDiscoveryService using the provided zookeeper client for storing service registry.
- * @param zkClient The {@link ZKClient} for interacting with zookeeper.
- */
- public ZKDiscoveryService(ZKClient zkClient) {
- this(zkClient, NAMESPACE);
- }
-
- /**
- * Constructs ZKDiscoveryService using the provided zookeeper client for storing service registry under namepsace.
- * @param zkClient of zookeeper quorum
- * @param namespace under which the service registered would be stored in zookeeper.
- * If namespace is {@code null}, no namespace will be used.
- */
- public ZKDiscoveryService(ZKClient zkClient, String namespace) {
- this.discoverables = HashMultimap.create();
- this.lock = new ReentrantLock();
- this.retryExecutor = Executors.newSingleThreadScheduledExecutor(
- Threads.createDaemonThreadFactory("zk-discovery-retry"));
- this.zkClient = namespace == null ? zkClient : ZKClients.namespace(zkClient, namespace);
- this.services = CacheBuilder.newBuilder().build(createServiceLoader());
- this.zkClient.addConnectionWatcher(createConnectionWatcher());
- }
-
- /**
- * Registers a {@link Discoverable} in zookeeper.
- * <p>
- * Registering a {@link Discoverable} will create a node <base>/<service-name>
- * in zookeeper as a ephemeral node. If the node already exists (timeout associated with emphemeral, then a runtime
- * exception is thrown to make sure that a service with an intent to register is not started without registering.
- * When a runtime is thrown, expectation is that the process being started with fail and would be started again
- * by the monitoring service.
- * </p>
- * @param discoverable Information of the service provider that could be discovered.
- * @return An instance of {@link Cancellable}
- */
- @Override
- public Cancellable register(final Discoverable discoverable) {
- final Discoverable wrapper = new DiscoverableWrapper(discoverable);
- final SettableFuture<String> future = SettableFuture.create();
- final DiscoveryCancellable cancellable = new DiscoveryCancellable(wrapper);
-
- // Create the zk ephemeral node.
- Futures.addCallback(doRegister(wrapper), new FutureCallback<String>() {
- @Override
- public void onSuccess(String result) {
- // Set the sequence node path to cancellable for future cancellation.
- cancellable.setPath(result);
- lock.lock();
- try {
- discoverables.put(wrapper, cancellable);
- } finally {
- lock.unlock();
- }
- LOG.debug("Service registered: {} {}", wrapper, result);
- future.set(result);
- }
-
- @Override
- public void onFailure(Throwable t) {
- if (t instanceof KeeperException.NodeExistsException) {
- handleRegisterFailure(discoverable, future, this, t);
- } else {
- LOG.warn("Failed to register: {}", wrapper, t);
- future.setException(t);
- }
- }
- }, Threads.SAME_THREAD_EXECUTOR);
-
- Futures.getUnchecked(future);
- return cancellable;
- }
-
- @Override
- public Iterable<Discoverable> discover(String service) {
- return services.getUnchecked(service);
- }
-
- /**
- * Handle registration failure.
- *
- * @param discoverable The discoverable to register.
- * @param completion A settable future to set when registration is completed / failed.
- * @param creationCallback A future callback for path creation.
- * @param failureCause The original cause of failure.
- */
- private void handleRegisterFailure(final Discoverable discoverable,
- final SettableFuture<String> completion,
- final FutureCallback<String> creationCallback,
- final Throwable failureCause) {
-
- final String path = getNodePath(discoverable);
- Futures.addCallback(zkClient.exists(path), new FutureCallback<Stat>() {
- @Override
- public void onSuccess(Stat result) {
- if (result == null) {
- // If the node is gone, simply retry.
- LOG.info("Node {} is gone. Retry registration for {}.", path, discoverable);
- retryRegister(discoverable, creationCallback);
- return;
- }
-
- long ephemeralOwner = result.getEphemeralOwner();
- if (ephemeralOwner == 0) {
- // it is not an ephemeral node, something wrong.
- LOG.error("Node {} already exists and is not an ephemeral node. Discoverable registration failed: {}.",
- path, discoverable);
- completion.setException(failureCause);
- return;
- }
- Long sessionId = zkClient.getSessionId();
- if (sessionId == null || ephemeralOwner != sessionId) {
- // This zkClient is not valid or doesn't own the ephemeral node, simply keep retrying.
- LOG.info("Owner of {} is different. Retry registration for {}.", path, discoverable);
- retryRegister(discoverable, creationCallback);
- } else {
- // This client owned the node, treat the registration as completed.
- // This could happen if same client tries to register twice (due to mistake or failure race condition).
- completion.set(path);
- }
- }
-
- @Override
- public void onFailure(Throwable t) {
- // If exists call failed, simply retry creation.
- LOG.warn("Error when getting stats on {}. Retry registration for {}.", path, discoverable);
- retryRegister(discoverable, creationCallback);
- }
- }, Threads.SAME_THREAD_EXECUTOR);
- }
-
- private OperationFuture<String> doRegister(Discoverable discoverable) {
- byte[] discoverableBytes = encode(discoverable);
- return zkClient.create(getNodePath(discoverable), discoverableBytes, CreateMode.EPHEMERAL, true);
- }
-
- private void retryRegister(final Discoverable discoverable, final FutureCallback<String> creationCallback) {
- retryExecutor.schedule(new Runnable() {
-
- @Override
- public void run() {
- Futures.addCallback(doRegister(discoverable), creationCallback, Threads.SAME_THREAD_EXECUTOR);
- }
- }, RETRY_MILLIS, TimeUnit.MILLISECONDS);
- }
-
-
- /**
- * Generate unique node path for a given {@link Discoverable}.
- * @param discoverable An instance of {@link Discoverable}.
- * @return A node name based on the discoverable.
- */
- private String getNodePath(Discoverable discoverable) {
- InetSocketAddress socketAddress = discoverable.getSocketAddress();
- String node = Hashing.md5()
- .newHasher()
- .putBytes(socketAddress.getAddress().getAddress())
- .putInt(socketAddress.getPort())
- .hash().toString();
-
- return String.format("/%s/%s", discoverable.getName(), node);
- }
-
- private Watcher createConnectionWatcher() {
- return new Watcher() {
- // Watcher is invoked from single event thread, hence safe to use normal mutable variable.
- private boolean expired;
-
- @Override
- public void process(WatchedEvent event) {
- if (event.getState() == Event.KeeperState.Expired) {
- LOG.warn("ZK Session expired: {}", zkClient.getConnectString());
- expired = true;
- } else if (event.getState() == Event.KeeperState.SyncConnected && expired) {
- LOG.info("Reconnected after expiration: {}", zkClient.getConnectString());
- expired = false;
-
- // Re-register all services
- lock.lock();
- try {
- for (final Map.Entry<Discoverable, DiscoveryCancellable> entry : discoverables.entries()) {
- LOG.info("Re-registering service: {}", entry.getKey());
-
- // Must be non-blocking in here.
- Futures.addCallback(doRegister(entry.getKey()), new FutureCallback<String>() {
- @Override
- public void onSuccess(String result) {
- // Updates the cancellable to the newly created sequential node.
- entry.getValue().setPath(result);
- LOG.debug("Service re-registered: {} {}", entry.getKey(), result);
- }
-
- @Override
- public void onFailure(Throwable t) {
- // When failed to create the node, there would be no retry and simply make the cancellable do nothing.
- entry.getValue().setPath(null);
- LOG.error("Failed to re-register service: {}", entry.getKey(), t);
- }
- }, Threads.SAME_THREAD_EXECUTOR);
- }
- } finally {
- lock.unlock();
- }
- }
- }
- };
- }
-
- /**
- * Creates a CacheLoader for creating live Iterable for watching instances changes for a given service.
- */
- private CacheLoader<String, Iterable<Discoverable>> createServiceLoader() {
- return new CacheLoader<String, Iterable<Discoverable>>() {
- @Override
- public Iterable<Discoverable> load(String service) throws Exception {
- // The atomic reference is to keep the resulting Iterable live. It always contains a
- // immutable snapshot of the latest detected set of Discoverable.
- final AtomicReference<Iterable<Discoverable>> iterable =
- new AtomicReference<Iterable<Discoverable>>(ImmutableList.<Discoverable>of());
- final String serviceBase = "/" + service;
-
- // Watch for children changes in /service
- ZKOperations.watchChildren(zkClient, serviceBase, new ZKOperations.ChildrenCallback() {
- @Override
- public void updated(NodeChildren nodeChildren) {
- // Fetch data of all children nodes in parallel.
- List<String> children = nodeChildren.getChildren();
- List<OperationFuture<NodeData>> dataFutures = Lists.newArrayListWithCapacity(children.size());
- for (String child : children) {
- dataFutures.add(zkClient.getData(serviceBase + "/" + child));
- }
-
- // Update the service map when all fetching are done.
- final ListenableFuture<List<NodeData>> fetchFuture = Futures.successfulAsList(dataFutures);
- fetchFuture.addListener(new Runnable() {
- @Override
- public void run() {
- ImmutableList.Builder<Discoverable> builder = ImmutableList.builder();
- for (NodeData nodeData : Futures.getUnchecked(fetchFuture)) {
- // For successful fetch, decode the content.
- if (nodeData != null) {
- Discoverable discoverable = decode(nodeData.getData());
- if (discoverable != null) {
- builder.add(discoverable);
- }
- }
- }
- iterable.set(builder.build());
- }
- }, Threads.SAME_THREAD_EXECUTOR);
- }
- });
-
- return new Iterable<Discoverable>() {
- @Override
- public Iterator<Discoverable> iterator() {
- return iterable.get().iterator();
- }
- };
- }
- };
- }
-
- /**
- * Static helper function for decoding array of bytes into a {@link DiscoverableWrapper} object.
- * @param bytes representing serialized {@link DiscoverableWrapper}
- * @return null if bytes are null; else an instance of {@link DiscoverableWrapper}
- */
- private static Discoverable decode(byte[] bytes) {
- if (bytes == null) {
- return null;
- }
- String content = new String(bytes, Charsets.UTF_8);
- return new GsonBuilder().registerTypeAdapter(Discoverable.class, new DiscoverableCodec())
- .create()
- .fromJson(content, Discoverable.class);
- }
-
- /**
- * Static helper function for encoding an instance of {@link DiscoverableWrapper} into array of bytes.
- * @param discoverable An instance of {@link Discoverable}
- * @return array of bytes representing an instance of <code>discoverable</code>
- */
- private static byte[] encode(Discoverable discoverable) {
- return new GsonBuilder().registerTypeAdapter(DiscoverableWrapper.class, new DiscoverableCodec())
- .create()
- .toJson(discoverable, DiscoverableWrapper.class)
- .getBytes(Charsets.UTF_8);
- }
-
- /**
- * Inner class for cancelling (un-register) discovery service.
- */
- private final class DiscoveryCancellable implements Cancellable {
-
- private final Discoverable discoverable;
- private final AtomicBoolean cancelled;
- private volatile String path;
-
- DiscoveryCancellable(Discoverable discoverable) {
- this.discoverable = discoverable;
- this.cancelled = new AtomicBoolean();
- }
-
- /**
- * Set the zk node path representing the ephemeral sequence node of this registered discoverable.
- * Called from ZK event thread when creating of the node completed, either from normal registration or
- * re-registration due to session expiration.
- *
- * @param path The path to ephemeral sequence node.
- */
- void setPath(String path) {
- this.path = path;
- if (cancelled.get() && path != null) {
- // Simply delete the path if it's already cancelled
- // It's for the case when session expire happened and re-registration completed after this has been cancelled.
- // Not bother with the result as if there is error, nothing much we could do.
- zkClient.delete(path);
- }
- }
-
- @Override
- public void cancel() {
- if (!cancelled.compareAndSet(false, true)) {
- return;
- }
-
- // Take a snapshot of the volatile path.
- String path = this.path;
-
- // If it is null, meaning cancel() is called before the ephemeral node is created, hence
- // setPath() will be called in future (through zk callback when creation is completed)
- // so that deletion will be done in setPath().
- if (path == null) {
- return;
- }
-
- // Remove this Cancellable from the map so that upon session expiration won't try to register.
- lock.lock();
- try {
- discoverables.remove(discoverable, this);
- } finally {
- lock.unlock();
- }
-
- // Delete the path. It's ok if the path not exists
- // (e.g. what session expired and before node has been re-created)
- Futures.getUnchecked(ZKOperations.ignoreError(zkClient.delete(path),
- KeeperException.NoNodeException.class, path));
- LOG.debug("Service unregistered: {} {}", discoverable, path);
- }
- }
-
- /**
- * SerDe for converting a {@link DiscoverableWrapper} into a JSON object
- * or from a JSON object into {@link DiscoverableWrapper}.
- */
- private static final class DiscoverableCodec implements JsonSerializer<Discoverable>, JsonDeserializer<Discoverable> {
-
- @Override
- public Discoverable deserialize(JsonElement json, Type typeOfT,
- JsonDeserializationContext context) throws JsonParseException {
- JsonObject jsonObj = json.getAsJsonObject();
- final String service = jsonObj.get("service").getAsString();
- String hostname = jsonObj.get("hostname").getAsString();
- int port = jsonObj.get("port").getAsInt();
- final InetSocketAddress address = new InetSocketAddress(hostname, port);
- return new Discoverable() {
- @Override
- public String getName() {
- return service;
- }
-
- @Override
- public InetSocketAddress getSocketAddress() {
- return address;
- }
- };
- }
-
- @Override
- public JsonElement serialize(Discoverable src, Type typeOfSrc, JsonSerializationContext context) {
- JsonObject jsonObj = new JsonObject();
- jsonObj.addProperty("service", src.getName());
- jsonObj.addProperty("hostname", src.getSocketAddress().getHostName());
- jsonObj.addProperty("port", src.getSocketAddress().getPort());
- return jsonObj;
- }
- }
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/discovery-core/src/main/java/org/apache/twill/discovery/package-info.java
----------------------------------------------------------------------
diff --git a/discovery-core/src/main/java/org/apache/twill/discovery/package-info.java b/discovery-core/src/main/java/org/apache/twill/discovery/package-info.java
deleted file mode 100644
index a1d6e0c..0000000
--- a/discovery-core/src/main/java/org/apache/twill/discovery/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * Classes in this package provides service discovery implementations.
- */
-package org.apache.twill.discovery;
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/discovery-core/src/test/java/org/apache/twill/discovery/InMemoryDiscoveryServiceTest.java
----------------------------------------------------------------------
diff --git a/discovery-core/src/test/java/org/apache/twill/discovery/InMemoryDiscoveryServiceTest.java b/discovery-core/src/test/java/org/apache/twill/discovery/InMemoryDiscoveryServiceTest.java
deleted file mode 100644
index d8cc375..0000000
--- a/discovery-core/src/test/java/org/apache/twill/discovery/InMemoryDiscoveryServiceTest.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.twill.discovery;
-
-import org.apache.twill.common.Cancellable;
-import com.google.common.collect.Iterables;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.net.InetSocketAddress;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Test memory based service discovery service.
- */
-public class InMemoryDiscoveryServiceTest {
- private Cancellable register(DiscoveryService service, final String name, final String host, final int port) {
- return service.register(new Discoverable() {
- @Override
- public String getName() {
- return name;
- }
-
- @Override
- public InetSocketAddress getSocketAddress() {
- return new InetSocketAddress(host, port);
- }
- });
- }
-
- @Test
- public void simpleDiscoverable() throws Exception {
- DiscoveryService discoveryService = new InMemoryDiscoveryService();
- DiscoveryServiceClient discoveryServiceClient = (DiscoveryServiceClient) discoveryService;
-
- // Register one service running on one host:port
- Cancellable cancellable = register(discoveryService, "foo", "localhost", 8090);
- Iterable<Discoverable> discoverables = discoveryServiceClient.discover("foo");
-
- // Discover that registered host:port.
- Assert.assertTrue(Iterables.size(discoverables) == 1);
-
- // Remove the service
- cancellable.cancel();
-
- // There should be no service.
- discoverables = discoveryServiceClient.discover("foo");
- TimeUnit.MILLISECONDS.sleep(100);
- Assert.assertTrue(Iterables.size(discoverables) == 0);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/discovery-core/src/test/java/org/apache/twill/discovery/ZKDiscoveryServiceTest.java
----------------------------------------------------------------------
diff --git a/discovery-core/src/test/java/org/apache/twill/discovery/ZKDiscoveryServiceTest.java b/discovery-core/src/test/java/org/apache/twill/discovery/ZKDiscoveryServiceTest.java
deleted file mode 100644
index feee8db..0000000
--- a/discovery-core/src/test/java/org/apache/twill/discovery/ZKDiscoveryServiceTest.java
+++ /dev/null
@@ -1,253 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.discovery;
-
-import org.apache.twill.common.Cancellable;
-import org.apache.twill.common.Services;
-import org.apache.twill.internal.zookeeper.InMemoryZKServer;
-import org.apache.twill.internal.zookeeper.KillZKSession;
-import org.apache.twill.zookeeper.RetryStrategies;
-import org.apache.twill.zookeeper.ZKClientService;
-import org.apache.twill.zookeeper.ZKClientServices;
-import org.apache.twill.zookeeper.ZKClients;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.Futures;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.net.InetSocketAddress;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Test Zookeeper based discovery service.
- */
-public class ZKDiscoveryServiceTest {
- private static final Logger LOG = LoggerFactory.getLogger(ZKDiscoveryServiceTest.class);
-
- private static InMemoryZKServer zkServer;
- private static ZKClientService zkClient;
-
- @BeforeClass
- public static void beforeClass() {
- zkServer = InMemoryZKServer.builder().setTickTime(100000).build();
- zkServer.startAndWait();
-
- zkClient = ZKClientServices.delegate(
- ZKClients.retryOnFailure(
- ZKClients.reWatchOnExpire(
- ZKClientService.Builder.of(zkServer.getConnectionStr()).build()),
- RetryStrategies.fixDelay(1, TimeUnit.SECONDS)));
- zkClient.startAndWait();
- }
-
- @AfterClass
- public static void afterClass() {
- Futures.getUnchecked(Services.chainStop(zkClient, zkServer));
- }
-
- private Cancellable register(DiscoveryService service, final String name, final String host, final int port) {
- return service.register(new Discoverable() {
- @Override
- public String getName() {
- return name;
- }
-
- @Override
- public InetSocketAddress getSocketAddress() {
- return new InetSocketAddress(host, port);
- }
- });
- }
-
-
- private boolean waitTillExpected(int expected, Iterable<Discoverable> discoverables) throws Exception {
- for (int i = 0; i < 10; ++i) {
- TimeUnit.MILLISECONDS.sleep(10);
- if (Iterables.size(discoverables) == expected) {
- return true;
- }
- }
- return (Iterables.size(discoverables) == expected);
- }
-
- @Test (timeout = 5000)
- public void testDoubleRegister() throws Exception {
- ZKDiscoveryService discoveryService = new ZKDiscoveryService(zkClient);
- DiscoveryServiceClient discoveryServiceClient = discoveryService;
-
- // Register on the same host port, it shouldn't fail.
- Cancellable cancellable = register(discoveryService, "test_double_reg", "localhost", 54321);
- Cancellable cancellable2 = register(discoveryService, "test_double_reg", "localhost", 54321);
-
- Iterable<Discoverable> discoverables = discoveryServiceClient.discover("test_double_reg");
-
- Assert.assertTrue(waitTillExpected(1, discoverables));
-
- cancellable.cancel();
- cancellable2.cancel();
-
- // Register again with two different clients, but killing session of the first one.
- final ZKClientService zkClient2 = ZKClientServices.delegate(
- ZKClients.retryOnFailure(
- ZKClients.reWatchOnExpire(
- ZKClientService.Builder.of(zkServer.getConnectionStr()).build()),
- RetryStrategies.fixDelay(1, TimeUnit.SECONDS)));
- zkClient2.startAndWait();
-
- try {
- ZKDiscoveryService discoveryService2 = new ZKDiscoveryService(zkClient2);
- cancellable2 = register(discoveryService2, "test_multi_client", "localhost", 54321);
-
- // Schedule a thread to shutdown zkClient2.
- new Thread() {
- @Override
- public void run() {
- try {
- TimeUnit.SECONDS.sleep(2);
- zkClient2.stopAndWait();
- } catch (InterruptedException e) {
- LOG.error(e.getMessage(), e);
- }
- }
- }.start();
-
- // This call would block until zkClient2 is shutdown.
- cancellable = register(discoveryService, "test_multi_client", "localhost", 54321);
- cancellable.cancel();
-
- } finally {
- zkClient2.stopAndWait();
- }
- }
-
- @Test
- public void testSessionExpires() throws Exception {
- ZKDiscoveryService discoveryService = new ZKDiscoveryService(zkClient);
- DiscoveryServiceClient discoveryServiceClient = discoveryService;
-
- Cancellable cancellable = register(discoveryService, "test_expires", "localhost", 54321);
-
- Iterable<Discoverable> discoverables = discoveryServiceClient.discover("test_expires");
-
- // Discover that registered host:port.
- Assert.assertTrue(waitTillExpected(1, discoverables));
-
- KillZKSession.kill(zkClient.getZooKeeperSupplier().get(), zkServer.getConnectionStr(), 5000);
-
- // Register one more endpoint to make sure state has been reflected after reconnection
- Cancellable cancellable2 = register(discoveryService, "test_expires", "localhost", 54322);
-
- // Reconnection would trigger re-registration.
- Assert.assertTrue(waitTillExpected(2, discoverables));
-
- cancellable.cancel();
- cancellable2.cancel();
-
- // Verify that both are now gone.
- Assert.assertTrue(waitTillExpected(0, discoverables));
- }
-
- @Test
- public void simpleDiscoverable() throws Exception {
- DiscoveryService discoveryService = new ZKDiscoveryService(zkClient);
- DiscoveryServiceClient discoveryServiceClient = new ZKDiscoveryService(zkClient);
-
- // Register one service running on one host:port
- Cancellable cancellable = register(discoveryService, "foo", "localhost", 8090);
- Iterable<Discoverable> discoverables = discoveryServiceClient.discover("foo");
-
- // Discover that registered host:port.
- Assert.assertTrue(waitTillExpected(1, discoverables));
-
- // Remove the service
- cancellable.cancel();
-
- // There should be no service.
-
- discoverables = discoveryServiceClient.discover("foo");
-
- Assert.assertTrue(waitTillExpected(0, discoverables));
- }
-
- @Test
- public void manySameDiscoverable() throws Exception {
- List<Cancellable> cancellables = Lists.newArrayList();
- DiscoveryService discoveryService = new ZKDiscoveryService(zkClient);
- DiscoveryServiceClient discoveryServiceClient = new ZKDiscoveryService(zkClient);
-
- cancellables.add(register(discoveryService, "manyDiscoverable", "localhost", 1));
- cancellables.add(register(discoveryService, "manyDiscoverable", "localhost", 2));
- cancellables.add(register(discoveryService, "manyDiscoverable", "localhost", 3));
- cancellables.add(register(discoveryService, "manyDiscoverable", "localhost", 4));
- cancellables.add(register(discoveryService, "manyDiscoverable", "localhost", 5));
-
- Iterable<Discoverable> discoverables = discoveryServiceClient.discover("manyDiscoverable");
- Assert.assertTrue(waitTillExpected(5, discoverables));
-
- for (int i = 0; i < 5; i++) {
- cancellables.get(i).cancel();
- Assert.assertTrue(waitTillExpected(4 - i, discoverables));
- }
- }
-
- @Test
- public void multiServiceDiscoverable() throws Exception {
- List<Cancellable> cancellables = Lists.newArrayList();
- DiscoveryService discoveryService = new ZKDiscoveryService(zkClient);
- DiscoveryServiceClient discoveryServiceClient = new ZKDiscoveryService(zkClient);
-
- cancellables.add(register(discoveryService, "service1", "localhost", 1));
- cancellables.add(register(discoveryService, "service1", "localhost", 2));
- cancellables.add(register(discoveryService, "service1", "localhost", 3));
- cancellables.add(register(discoveryService, "service1", "localhost", 4));
- cancellables.add(register(discoveryService, "service1", "localhost", 5));
-
- cancellables.add(register(discoveryService, "service2", "localhost", 1));
- cancellables.add(register(discoveryService, "service2", "localhost", 2));
- cancellables.add(register(discoveryService, "service2", "localhost", 3));
-
- cancellables.add(register(discoveryService, "service3", "localhost", 1));
- cancellables.add(register(discoveryService, "service3", "localhost", 2));
-
- Iterable<Discoverable> discoverables = discoveryServiceClient.discover("service1");
- Assert.assertTrue(waitTillExpected(5, discoverables));
-
- discoverables = discoveryServiceClient.discover("service2");
- Assert.assertTrue(waitTillExpected(3, discoverables));
-
- discoverables = discoveryServiceClient.discover("service3");
- Assert.assertTrue(waitTillExpected(2, discoverables));
-
- cancellables.add(register(discoveryService, "service3", "localhost", 3));
- Assert.assertTrue(waitTillExpected(3, discoverables)); // Shows live iterator.
-
- for (Cancellable cancellable : cancellables) {
- cancellable.cancel();
- }
-
- Assert.assertTrue(waitTillExpected(0, discoveryServiceClient.discover("service1")));
- Assert.assertTrue(waitTillExpected(0, discoveryServiceClient.discover("service2")));
- Assert.assertTrue(waitTillExpected(0, discoveryServiceClient.discover("service3")));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/discovery-core/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/discovery-core/src/test/resources/logback-test.xml b/discovery-core/src/test/resources/logback-test.xml
deleted file mode 100644
index 2615cb4..0000000
--- a/discovery-core/src/test/resources/logback-test.xml
+++ /dev/null
@@ -1,17 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-
-<!-- Default logback configuration for twill library -->
-<configuration>
- <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
- <encoder>
- <pattern>%d{ISO8601} - %-5p [%t:%C{1}@%L] - %m%n</pattern>
- </encoder>
- </appender>
-
- <logger name="org.apache.twill" level="DEBUG" />
-
- <root level="WARN">
- <appender-ref ref="STDOUT"/>
- </root>
-
-</configuration>