You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@twill.apache.org by ch...@apache.org on 2013/12/12 22:59:45 UTC
[03/28] Making maven site works.
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/yarn/src/test/java/org/apache/twill/yarn/ResourceReportTestRun.java
----------------------------------------------------------------------
diff --git a/yarn/src/test/java/org/apache/twill/yarn/ResourceReportTestRun.java b/yarn/src/test/java/org/apache/twill/yarn/ResourceReportTestRun.java
deleted file mode 100644
index 131f90a..0000000
--- a/yarn/src/test/java/org/apache/twill/yarn/ResourceReportTestRun.java
+++ /dev/null
@@ -1,268 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.yarn;
-
-import org.apache.twill.api.ResourceReport;
-import org.apache.twill.api.ResourceSpecification;
-import org.apache.twill.api.TwillApplication;
-import org.apache.twill.api.TwillController;
-import org.apache.twill.api.TwillRunResources;
-import org.apache.twill.api.TwillRunner;
-import org.apache.twill.api.TwillSpecification;
-import org.apache.twill.api.logging.PrinterLogHandler;
-import org.apache.twill.common.ServiceListenerAdapter;
-import org.apache.twill.common.Threads;
-import org.apache.twill.discovery.Discoverable;
-import org.apache.twill.internal.EnvKeys;
-import com.google.common.base.Charsets;
-import com.google.common.collect.Maps;
-import com.google.common.io.LineReader;
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.OutputStreamWriter;
-import java.io.PrintWriter;
-import java.net.Socket;
-import java.net.URISyntaxException;
-import java.util.Collection;
-import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-/**
- * Using echo server to test resource reports.
- * This test is executed by {@link org.apache.twill.yarn.YarnTestSuite}.
- */
-public class ResourceReportTestRun {
-
- private static final Logger LOG = LoggerFactory.getLogger(ResourceReportTestRun.class);
-
- private class ResourceApplication implements TwillApplication {
- @Override
- public TwillSpecification configure() {
- return TwillSpecification.Builder.with()
- .setName("ResourceApplication")
- .withRunnable()
- .add("echo1", new EchoServer(), ResourceSpecification.Builder.with()
- .setVirtualCores(1)
- .setMemory(128, ResourceSpecification.SizeUnit.MEGA)
- .setInstances(2).build()).noLocalFiles()
- .add("echo2", new EchoServer(), ResourceSpecification.Builder.with()
- .setVirtualCores(2)
- .setMemory(256, ResourceSpecification.SizeUnit.MEGA)
- .setInstances(1).build()).noLocalFiles()
- .anyOrder()
- .build();
- }
- }
-
- @Test
- public void testRunnablesGetAllowedResourcesInEnv() throws InterruptedException, IOException,
- TimeoutException, ExecutionException {
- TwillRunner runner = YarnTestSuite.getTwillRunner();
-
- ResourceSpecification resourceSpec = ResourceSpecification.Builder.with()
- .setVirtualCores(1)
- .setMemory(2048, ResourceSpecification.SizeUnit.MEGA)
- .setInstances(1)
- .build();
- TwillController controller = runner.prepare(new EnvironmentEchoServer(), resourceSpec)
- .addLogHandler(new PrinterLogHandler(new PrintWriter(System.out, true)))
- .withApplicationArguments("envecho")
- .withArguments("EnvironmentEchoServer", "echo2")
- .start();
-
- final CountDownLatch running = new CountDownLatch(1);
- controller.addListener(new ServiceListenerAdapter() {
- @Override
- public void running() {
- running.countDown();
- }
- }, Threads.SAME_THREAD_EXECUTOR);
-
- Assert.assertTrue(running.await(30, TimeUnit.SECONDS));
-
- Iterable<Discoverable> envEchoServices = controller.discoverService("envecho");
- Assert.assertTrue(YarnTestSuite.waitForSize(envEchoServices, 1, 30));
-
- // TODO: check virtual cores once yarn adds the ability
- Map<String, String> expectedValues = Maps.newHashMap();
- expectedValues.put(EnvKeys.YARN_CONTAINER_MEMORY_MB, "2048");
- expectedValues.put(EnvKeys.TWILL_INSTANCE_COUNT, "1");
-
- // check environment of the runnable.
- Discoverable discoverable = envEchoServices.iterator().next();
- for (Map.Entry<String, String> expected : expectedValues.entrySet()) {
- Socket socket = new Socket(discoverable.getSocketAddress().getHostName(),
- discoverable.getSocketAddress().getPort());
- try {
- PrintWriter writer = new PrintWriter(new OutputStreamWriter(socket.getOutputStream(), Charsets.UTF_8), true);
- LineReader reader = new LineReader(new InputStreamReader(socket.getInputStream(), Charsets.UTF_8));
- writer.println(expected.getKey());
- Assert.assertEquals(expected.getValue(), reader.readLine());
- } finally {
- socket.close();
- }
- }
-
- controller.stop().get(30, TimeUnit.SECONDS);
- // Sleep a bit before exiting.
- TimeUnit.SECONDS.sleep(2);
- }
-
- @Test
- public void testResourceReportWithFailingContainers() throws InterruptedException, IOException,
- TimeoutException, ExecutionException {
- TwillRunner runner = YarnTestSuite.getTwillRunner();
-
- ResourceSpecification resourceSpec = ResourceSpecification.Builder.with()
- .setVirtualCores(1)
- .setMemory(128, ResourceSpecification.SizeUnit.MEGA)
- .setInstances(2)
- .build();
- TwillController controller = runner.prepare(new BuggyServer(), resourceSpec)
- .addLogHandler(new PrinterLogHandler(new PrintWriter(System.out, true)))
- .withApplicationArguments("echo")
- .withArguments("BuggyServer", "echo2")
- .start();
-
- final CountDownLatch running = new CountDownLatch(1);
- controller.addListener(new ServiceListenerAdapter() {
- @Override
- public void running() {
- running.countDown();
- }
- }, Threads.SAME_THREAD_EXECUTOR);
-
- Assert.assertTrue(running.await(30, TimeUnit.SECONDS));
-
- Iterable<Discoverable> echoServices = controller.discoverService("echo");
- Assert.assertTrue(YarnTestSuite.waitForSize(echoServices, 2, 60));
- // check that we have 2 runnables.
- ResourceReport report = controller.getResourceReport();
- Assert.assertEquals(2, report.getRunnableResources("BuggyServer").size());
-
- // cause a divide by 0 in one server
- Discoverable discoverable = echoServices.iterator().next();
- Socket socket = new Socket(discoverable.getSocketAddress().getAddress(),
- discoverable.getSocketAddress().getPort());
- try {
- PrintWriter writer = new PrintWriter(new OutputStreamWriter(socket.getOutputStream(), Charsets.UTF_8), true);
- writer.println("0");
- } finally {
- socket.close();
- }
-
- // takes some time for app master to find out the container completed...
- TimeUnit.SECONDS.sleep(5);
- // check that we have 1 runnable, not 2.
- report = controller.getResourceReport();
- Assert.assertEquals(1, report.getRunnableResources("BuggyServer").size());
-
- controller.stop().get(30, TimeUnit.SECONDS);
- // Sleep a bit before exiting.
- TimeUnit.SECONDS.sleep(2);
- }
-
- @Test
- public void testResourceReport() throws InterruptedException, ExecutionException, IOException,
- URISyntaxException, TimeoutException {
- TwillRunner runner = YarnTestSuite.getTwillRunner();
-
- TwillController controller = runner.prepare(new ResourceApplication())
- .addLogHandler(new PrinterLogHandler(new PrintWriter(System.out, true)))
- .withApplicationArguments("echo")
- .withArguments("echo1", "echo1")
- .withArguments("echo2", "echo2")
- .start();
-
- final CountDownLatch running = new CountDownLatch(1);
- controller.addListener(new ServiceListenerAdapter() {
- @Override
- public void running() {
- running.countDown();
- }
- }, Threads.SAME_THREAD_EXECUTOR);
-
- Assert.assertTrue(running.await(30, TimeUnit.SECONDS));
-
- // wait for 3 echo servers to come up
- Iterable<Discoverable> echoServices = controller.discoverService("echo");
- Assert.assertTrue(YarnTestSuite.waitForSize(echoServices, 3, 60));
- ResourceReport report = controller.getResourceReport();
- // make sure resources for echo1 and echo2 are there
- Map<String, Collection<TwillRunResources>> usedResources = report.getResources();
- Assert.assertEquals(2, usedResources.keySet().size());
- Assert.assertTrue(usedResources.containsKey("echo1"));
- Assert.assertTrue(usedResources.containsKey("echo2"));
-
- Collection<TwillRunResources> echo1Resources = usedResources.get("echo1");
- // 2 instances of echo1
- Assert.assertEquals(2, echo1Resources.size());
- // TODO: check cores after hadoop-2.1.0
- for (TwillRunResources resources : echo1Resources) {
- Assert.assertEquals(128, resources.getMemoryMB());
- }
-
- Collection<TwillRunResources> echo2Resources = usedResources.get("echo2");
- // 2 instances of echo1
- Assert.assertEquals(1, echo2Resources.size());
- // TODO: check cores after hadoop-2.1.0
- for (TwillRunResources resources : echo2Resources) {
- Assert.assertEquals(256, resources.getMemoryMB());
- }
-
- // Decrease number of instances of echo1 from 2 to 1
- controller.changeInstances("echo1", 1);
- echoServices = controller.discoverService("echo1");
- Assert.assertTrue(YarnTestSuite.waitForSize(echoServices, 1, 60));
- report = controller.getResourceReport();
-
- // make sure resources for echo1 and echo2 are there
- usedResources = report.getResources();
- Assert.assertEquals(2, usedResources.keySet().size());
- Assert.assertTrue(usedResources.containsKey("echo1"));
- Assert.assertTrue(usedResources.containsKey("echo2"));
-
- echo1Resources = usedResources.get("echo1");
- // 1 instance of echo1 now
- Assert.assertEquals(1, echo1Resources.size());
- // TODO: check cores after hadoop-2.1.0
- for (TwillRunResources resources : echo1Resources) {
- Assert.assertEquals(128, resources.getMemoryMB());
- }
-
- echo2Resources = usedResources.get("echo2");
- // 2 instances of echo1
- Assert.assertEquals(1, echo2Resources.size());
- // TODO: check cores after hadoop-2.1.0
- for (TwillRunResources resources : echo2Resources) {
- Assert.assertEquals(256, resources.getMemoryMB());
- }
-
- controller.stop().get(30, TimeUnit.SECONDS);
- // Sleep a bit before exiting.
- TimeUnit.SECONDS.sleep(2);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/yarn/src/test/java/org/apache/twill/yarn/SocketServer.java
----------------------------------------------------------------------
diff --git a/yarn/src/test/java/org/apache/twill/yarn/SocketServer.java b/yarn/src/test/java/org/apache/twill/yarn/SocketServer.java
deleted file mode 100644
index 5148ed2..0000000
--- a/yarn/src/test/java/org/apache/twill/yarn/SocketServer.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.yarn;
-
-import org.apache.twill.api.AbstractTwillRunnable;
-import org.apache.twill.api.TwillContext;
-import org.apache.twill.api.TwillContext;
-import org.apache.twill.common.Cancellable;
-import com.google.common.base.Charsets;
-import com.google.common.base.Throwables;
-import com.google.common.collect.ImmutableList;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.OutputStreamWriter;
-import java.io.PrintWriter;
-import java.net.ServerSocket;
-import java.net.Socket;
-import java.net.SocketException;
-import java.util.List;
-
-/**
- * Boilerplate for a server that announces itself and talks to clients through a socket.
- */
-public abstract class SocketServer extends AbstractTwillRunnable {
-
- private static final Logger LOG = LoggerFactory.getLogger(SocketServer.class);
-
- protected volatile boolean running;
- protected volatile Thread runThread;
- protected ServerSocket serverSocket;
- protected Cancellable canceller;
-
- @Override
- public void initialize(TwillContext context) {
- super.initialize(context);
- running = true;
- try {
- serverSocket = new ServerSocket(0);
- LOG.info("Server started: " + serverSocket.getLocalSocketAddress() +
- ", id: " + context.getInstanceId() +
- ", count: " + context.getInstanceCount());
-
- final List<Cancellable> cancellables = ImmutableList.of(
- context.announce(context.getApplicationArguments()[0], serverSocket.getLocalPort()),
- context.announce(context.getArguments()[0], serverSocket.getLocalPort())
- );
- canceller = new Cancellable() {
- @Override
- public void cancel() {
- for (Cancellable c : cancellables) {
- c.cancel();
- }
- }
- };
- } catch (IOException e) {
- throw Throwables.propagate(e);
- }
- }
-
- @Override
- public void run() {
- try {
- runThread = Thread.currentThread();
- while (running) {
- try {
- Socket socket = serverSocket.accept();
- try {
- BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), Charsets.UTF_8));
- PrintWriter writer = new PrintWriter(new OutputStreamWriter(socket.getOutputStream()), true);
- handleRequest(reader, writer);
- } finally {
- socket.close();
- }
- } catch (SocketException e) {
- LOG.info("Socket exception: " + e);
- }
- }
- } catch (Exception e) {
- LOG.error(e.getMessage(), e);
- }
- }
-
- @Override
- public void stop() {
- LOG.info("Stopping server");
- canceller.cancel();
- running = false;
- Thread t = runThread;
- if (t != null) {
- t.interrupt();
- }
- try {
- serverSocket.close();
- } catch (IOException e) {
- LOG.error("Exception while closing socket.", e);
- throw Throwables.propagate(e);
- }
- serverSocket = null;
- }
-
- @Override
- public void destroy() {
- try {
- if (serverSocket != null) {
- serverSocket.close();
- }
- } catch (IOException e) {
- LOG.error("Exception while closing socket.", e);
- throw Throwables.propagate(e);
- }
- }
-
- abstract public void handleRequest(BufferedReader reader, PrintWriter writer) throws IOException;
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/yarn/src/test/java/org/apache/twill/yarn/TaskCompletedTestRun.java
----------------------------------------------------------------------
diff --git a/yarn/src/test/java/org/apache/twill/yarn/TaskCompletedTestRun.java b/yarn/src/test/java/org/apache/twill/yarn/TaskCompletedTestRun.java
deleted file mode 100644
index 5a93271..0000000
--- a/yarn/src/test/java/org/apache/twill/yarn/TaskCompletedTestRun.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.yarn;
-
-import org.apache.twill.api.AbstractTwillRunnable;
-import org.apache.twill.api.ResourceSpecification;
-import org.apache.twill.api.TwillController;
-import org.apache.twill.api.TwillRunner;
-import org.apache.twill.api.logging.PrinterLogHandler;
-import org.apache.twill.common.ServiceListenerAdapter;
-import org.apache.twill.common.Threads;
-import com.google.common.base.Throwables;
-import com.google.common.util.concurrent.Service;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.PrintWriter;
-import java.util.Random;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Testing application master will shutdown itself when all tasks are completed.
- * This test is executed by {@link YarnTestSuite}.
- */
-public class TaskCompletedTestRun {
-
- public static final class SleepTask extends AbstractTwillRunnable {
-
- @Override
- public void run() {
- // Randomly sleep for 3-5 seconds.
- try {
- TimeUnit.SECONDS.sleep(new Random().nextInt(3) + 3);
- } catch (InterruptedException e) {
- throw Throwables.propagate(e);
- }
- }
-
- @Override
- public void stop() {
- // No-op
- }
- }
-
- @Test
- public void testTaskCompleted() throws InterruptedException {
- TwillRunner twillRunner = YarnTestSuite.getTwillRunner();
- TwillController controller = twillRunner.prepare(new SleepTask(),
- ResourceSpecification.Builder.with()
- .setVirtualCores(1)
- .setMemory(512, ResourceSpecification.SizeUnit.MEGA)
- .setInstances(3).build())
- .addLogHandler(new PrinterLogHandler(new PrintWriter(System.out, true)))
- .start();
-
- final CountDownLatch runLatch = new CountDownLatch(1);
- final CountDownLatch stopLatch = new CountDownLatch(1);
- controller.addListener(new ServiceListenerAdapter() {
-
- @Override
- public void running() {
- runLatch.countDown();
- }
-
- @Override
- public void terminated(Service.State from) {
- stopLatch.countDown();
- }
- }, Threads.SAME_THREAD_EXECUTOR);
-
- Assert.assertTrue(runLatch.await(1, TimeUnit.MINUTES));
-
- Assert.assertTrue(stopLatch.await(1, TimeUnit.MINUTES));
-
- TimeUnit.SECONDS.sleep(2);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/yarn/src/test/java/org/apache/twill/yarn/TwillSpecificationTest.java
----------------------------------------------------------------------
diff --git a/yarn/src/test/java/org/apache/twill/yarn/TwillSpecificationTest.java b/yarn/src/test/java/org/apache/twill/yarn/TwillSpecificationTest.java
deleted file mode 100644
index 8be907b..0000000
--- a/yarn/src/test/java/org/apache/twill/yarn/TwillSpecificationTest.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.yarn;
-
-import org.apache.twill.api.AbstractTwillRunnable;
-import org.apache.twill.api.TwillSpecification;
-import com.google.common.collect.ImmutableSet;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.List;
-
-/**
- *
- */
-public class TwillSpecificationTest {
-
- /**
- * Dummy for test.
- */
- public static final class DummyRunnable extends AbstractTwillRunnable {
-
- @Override
- public void stop() {
- // no-op
- }
-
- @Override
- public void run() {
- // no-op
- }
- }
-
- @Test
- public void testAnyOrder() {
- TwillSpecification spec =
- TwillSpecification.Builder.with()
- .setName("Testing")
- .withRunnable()
- .add("r1", new DummyRunnable()).noLocalFiles()
- .add("r2", new DummyRunnable()).noLocalFiles()
- .add("r3", new DummyRunnable()).noLocalFiles()
- .anyOrder()
- .build();
-
- Assert.assertEquals(3, spec.getRunnables().size());
- List<TwillSpecification.Order> orders = spec.getOrders();
- Assert.assertEquals(1, orders.size());
- Assert.assertEquals(ImmutableSet.of("r1", "r2", "r3"), orders.get(0).getNames());
- }
-
- @Test
- public void testOrder() {
- TwillSpecification spec =
- TwillSpecification.Builder.with()
- .setName("Testing")
- .withRunnable()
- .add("r1", new DummyRunnable()).noLocalFiles()
- .add("r2", new DummyRunnable()).noLocalFiles()
- .add("r3", new DummyRunnable()).noLocalFiles()
- .add("r4", new DummyRunnable()).noLocalFiles()
- .withOrder().begin("r1", "r2").nextWhenStarted("r3")
- .build();
-
- Assert.assertEquals(4, spec.getRunnables().size());
- List<TwillSpecification.Order> orders = spec.getOrders();
- Assert.assertEquals(3, orders.size());
- Assert.assertEquals(ImmutableSet.of("r1", "r2"), orders.get(0).getNames());
- Assert.assertEquals(ImmutableSet.of("r3"), orders.get(1).getNames());
- Assert.assertEquals(ImmutableSet.of("r4"), orders.get(2).getNames());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/yarn/src/test/java/org/apache/twill/yarn/YarnTestSuite.java
----------------------------------------------------------------------
diff --git a/yarn/src/test/java/org/apache/twill/yarn/YarnTestSuite.java b/yarn/src/test/java/org/apache/twill/yarn/YarnTestSuite.java
deleted file mode 100644
index b55d620..0000000
--- a/yarn/src/test/java/org/apache/twill/yarn/YarnTestSuite.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.yarn;
-
-import org.apache.twill.api.TwillRunner;
-import org.apache.twill.api.TwillRunnerService;
-import org.apache.twill.filesystem.LocalLocationFactory;
-import org.apache.twill.internal.zookeeper.InMemoryZKServer;
-import org.apache.twill.internal.yarn.YarnUtils;
-import com.google.common.collect.Iterables;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.server.MiniYARNCluster;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Suite;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Test suite for all tests with mini yarn cluster.
- */
-@RunWith(Suite.class)
-@Suite.SuiteClasses({
- EchoServerTestRun.class,
- ResourceReportTestRun.class,
- TaskCompletedTestRun.class,
- DistributeShellTestRun.class,
- LocalFileTestRun.class,
- FailureRestartTestRun.class,
- ProvisionTimeoutTestRun.class
- })
-public class YarnTestSuite {
- private static final Logger LOG = LoggerFactory.getLogger(YarnTestSuite.class);
-
- @ClassRule
- public static TemporaryFolder tmpFolder = new TemporaryFolder();
-
- private static InMemoryZKServer zkServer;
- private static MiniYARNCluster cluster;
- private static TwillRunnerService runnerService;
- private static YarnConfiguration config;
-
- @BeforeClass
- public static final void init() throws IOException {
- // Starts Zookeeper
- zkServer = InMemoryZKServer.builder().build();
- zkServer.startAndWait();
-
- // Start YARN mini cluster
- config = new YarnConfiguration(new Configuration());
-
- if (YarnUtils.isHadoop20()) {
- config.set("yarn.resourcemanager.scheduler.class",
- "org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler");
- } else {
- config.set("yarn.resourcemanager.scheduler.class",
- "org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler");
- config.set("yarn.scheduler.capacity.resource-calculator",
- "org.apache.hadoop.yarn.util.resource.DominantResourceCalculator");
- }
- config.set("yarn.minicluster.fixed.ports", "true");
- config.set("yarn.nodemanager.vmem-pmem-ratio", "20.1");
- config.set("yarn.nodemanager.vmem-check-enabled", "false");
- config.set("yarn.scheduler.minimum-allocation-mb", "128");
- config.set("yarn.nodemanager.delete.debug-delay-sec", "3600");
-
- cluster = new MiniYARNCluster("test-cluster", 1, 1, 1);
- cluster.init(config);
- cluster.start();
-
- runnerService = createTwillRunnerService();
- runnerService.startAndWait();
- }
-
- @AfterClass
- public static final void finish() {
- runnerService.stopAndWait();
- cluster.stop();
- zkServer.stopAndWait();
- }
-
- public static final TwillRunner getTwillRunner() {
- return runnerService;
- }
-
- /**
- * Creates an unstarted instance of {@link org.apache.twill.api.TwillRunnerService}.
- */
- public static final TwillRunnerService createTwillRunnerService() throws IOException {
- return new YarnTwillRunnerService(config, zkServer.getConnectionStr() + "/twill",
- new LocalLocationFactory(tmpFolder.newFolder()));
- }
-
- public static final <T> boolean waitForSize(Iterable<T> iterable, int count, int limit) throws InterruptedException {
- int trial = 0;
- int size = Iterables.size(iterable);
- while (size != count && trial < limit) {
- LOG.info("Waiting for {} size {} == {}", iterable, size, count);
- TimeUnit.SECONDS.sleep(1);
- trial++;
- size = Iterables.size(iterable);
- }
- return trial < limit;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/yarn/src/test/resources/header.txt
----------------------------------------------------------------------
diff --git a/yarn/src/test/resources/header.txt b/yarn/src/test/resources/header.txt
deleted file mode 100644
index b6e25e6..0000000
--- a/yarn/src/test/resources/header.txt
+++ /dev/null
@@ -1 +0,0 @@
-Local file header
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/yarn/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/yarn/src/test/resources/logback-test.xml b/yarn/src/test/resources/logback-test.xml
deleted file mode 100644
index 2615cb4..0000000
--- a/yarn/src/test/resources/logback-test.xml
+++ /dev/null
@@ -1,17 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-
-<!-- Default logback configuration for twill library -->
-<configuration>
- <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
- <encoder>
- <pattern>%d{ISO8601} - %-5p [%t:%C{1}@%L] - %m%n</pattern>
- </encoder>
- </appender>
-
- <logger name="org.apache.twill" level="DEBUG" />
-
- <root level="WARN">
- <appender-ref ref="STDOUT"/>
- </root>
-
-</configuration>
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/zookeeper/pom.xml
----------------------------------------------------------------------
diff --git a/zookeeper/pom.xml b/zookeeper/pom.xml
deleted file mode 100644
index e76ee50..0000000
--- a/zookeeper/pom.xml
+++ /dev/null
@@ -1,67 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <parent>
- <artifactId>twill-parent</artifactId>
- <groupId>org.apache.twill</groupId>
- <version>0.1.0-SNAPSHOT</version>
- </parent>
- <modelVersion>4.0.0</modelVersion>
-
- <artifactId>twill-zookeeper</artifactId>
- <name>Twill ZooKeeper client library</name>
-
- <dependencies>
- <dependency>
- <groupId>${project.groupId}</groupId>
- <artifactId>twill-common</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- </dependency>
- <dependency>
- <groupId>com.google.code.findbugs</groupId>
- <artifactId>jsr305</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.zookeeper</groupId>
- <artifactId>zookeeper</artifactId>
- </dependency>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- </dependency>
- <dependency>
- <groupId>ch.qos.logback</groupId>
- <artifactId>logback-core</artifactId>
- </dependency>
- <dependency>
- <groupId>ch.qos.logback</groupId>
- <artifactId>logback-classic</artifactId>
- </dependency>
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- </dependency>
- </dependencies>
-</project>
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/zookeeper/src/main/java/org/apache/twill/internal/zookeeper/BasicNodeChildren.java
----------------------------------------------------------------------
diff --git a/zookeeper/src/main/java/org/apache/twill/internal/zookeeper/BasicNodeChildren.java b/zookeeper/src/main/java/org/apache/twill/internal/zookeeper/BasicNodeChildren.java
deleted file mode 100644
index 9e4f55f..0000000
--- a/zookeeper/src/main/java/org/apache/twill/internal/zookeeper/BasicNodeChildren.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.zookeeper;
-
-import org.apache.twill.zookeeper.NodeChildren;
-import com.google.common.base.Objects;
-import org.apache.zookeeper.data.Stat;
-
-import java.util.List;
-
-/**
- *
- */
-final class BasicNodeChildren implements NodeChildren {
-
- private final Stat stat;
- private final List<String> children;
-
- BasicNodeChildren(List<String> children, Stat stat) {
- this.stat = stat;
- this.children = children;
- }
-
- @Override
- public Stat getStat() {
- return stat;
- }
-
- @Override
- public List<String> getChildren() {
- return children;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || !(o instanceof NodeChildren)) {
- return false;
- }
-
- NodeChildren that = (NodeChildren) o;
- return stat.equals(that.getStat()) && children.equals(that.getChildren());
- }
-
- @Override
- public int hashCode() {
- return Objects.hashCode(children, stat);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/zookeeper/src/main/java/org/apache/twill/internal/zookeeper/BasicNodeData.java
----------------------------------------------------------------------
diff --git a/zookeeper/src/main/java/org/apache/twill/internal/zookeeper/BasicNodeData.java b/zookeeper/src/main/java/org/apache/twill/internal/zookeeper/BasicNodeData.java
deleted file mode 100644
index 98a3a66..0000000
--- a/zookeeper/src/main/java/org/apache/twill/internal/zookeeper/BasicNodeData.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.zookeeper;
-
-import org.apache.twill.zookeeper.NodeData;
-import com.google.common.base.Objects;
-import org.apache.zookeeper.data.Stat;
-
-import java.util.Arrays;
-
-/**
- * A straightforward implementation for {@link NodeData}.
- */
-final class BasicNodeData implements NodeData {
-
- private final byte[] data;
- private final Stat stat;
-
- BasicNodeData(byte[] data, Stat stat) {
- this.data = data;
- this.stat = stat;
- }
-
- @Override
- public Stat getStat() {
- return stat;
- }
-
- @Override
- public byte[] getData() {
- return data;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || !(o instanceof NodeData)) {
- return false;
- }
-
- BasicNodeData that = (BasicNodeData) o;
-
- return stat.equals(that.getStat()) && Arrays.equals(data, that.getData());
- }
-
- @Override
- public int hashCode() {
- return Objects.hashCode(data, stat);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/zookeeper/src/main/java/org/apache/twill/internal/zookeeper/DefaultZKClientService.java
----------------------------------------------------------------------
diff --git a/zookeeper/src/main/java/org/apache/twill/internal/zookeeper/DefaultZKClientService.java b/zookeeper/src/main/java/org/apache/twill/internal/zookeeper/DefaultZKClientService.java
deleted file mode 100644
index c52fb08..0000000
--- a/zookeeper/src/main/java/org/apache/twill/internal/zookeeper/DefaultZKClientService.java
+++ /dev/null
@@ -1,525 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.zookeeper;
-
-import org.apache.twill.common.Threads;
-import org.apache.twill.zookeeper.NodeChildren;
-import org.apache.twill.zookeeper.NodeData;
-import org.apache.twill.zookeeper.OperationFuture;
-import org.apache.twill.zookeeper.ZKClientService;
-import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Supplier;
-import com.google.common.util.concurrent.AbstractService;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.Service;
-import org.apache.zookeeper.AsyncCallback;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooDefs;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.data.ACL;
-import org.apache.zookeeper.data.Stat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Nullable;
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-
-/**
- * The base implementation of {@link ZKClientService}.
- */
-public final class DefaultZKClientService implements ZKClientService {
-
- private static final Logger LOG = LoggerFactory.getLogger(DefaultZKClientService.class);
-
- private final String zkStr;
- private final int sessionTimeout;
- private final List<Watcher> connectionWatchers;
- private final AtomicReference<ZooKeeper> zooKeeper;
- private final Function<String, List<ACL>> aclMapper;
- private final Service serviceDelegate;
- private ExecutorService eventExecutor;
-
- public DefaultZKClientService(String zkStr, int sessionTimeout, Watcher connectionWatcher) {
- this.zkStr = zkStr;
- this.sessionTimeout = sessionTimeout;
- this.connectionWatchers = new CopyOnWriteArrayList<Watcher>();
- addConnectionWatcher(connectionWatcher);
-
- this.zooKeeper = new AtomicReference<ZooKeeper>();
-
- // TODO (terence): Add ACL
- aclMapper = new Function<String, List<ACL>>() {
- @Override
- public List<ACL> apply(String input) {
- return ZooDefs.Ids.OPEN_ACL_UNSAFE;
- }
- };
- serviceDelegate = new ServiceDelegate();
- }
-
- @Override
- public Long getSessionId() {
- ZooKeeper zk = zooKeeper.get();
- return zk == null ? null : zk.getSessionId();
- }
-
- @Override
- public String getConnectString() {
- return zkStr;
- }
-
- @Override
- public void addConnectionWatcher(Watcher watcher) {
- if (watcher != null) {
- connectionWatchers.add(wrapWatcher(watcher));
- }
- }
-
- @Override
- public OperationFuture<String> create(String path, byte[] data, CreateMode createMode) {
- return create(path, data, createMode, true);
- }
-
- @Override
- public OperationFuture<String> create(String path, @Nullable byte[] data,
- CreateMode createMode, boolean createParent) {
- return doCreate(path, data, createMode, createParent, false);
- }
-
- private OperationFuture<String> doCreate(final String path,
- @Nullable final byte[] data,
- final CreateMode createMode,
- final boolean createParent,
- final boolean ignoreNodeExists) {
- final SettableOperationFuture<String> createFuture = SettableOperationFuture.create(path, eventExecutor);
- getZooKeeper().create(path, data, aclMapper.apply(path), createMode, Callbacks.STRING, createFuture);
- if (!createParent) {
- return createFuture;
- }
-
- // If create parent is request, return a different future
- final SettableOperationFuture<String> result = SettableOperationFuture.create(path, eventExecutor);
- // Watch for changes in the original future
- Futures.addCallback(createFuture, new FutureCallback<String>() {
- @Override
- public void onSuccess(String path) {
- // Propagate if creation was successful
- result.set(path);
- }
-
- @Override
- public void onFailure(Throwable t) {
- // See if the failure can be handled
- if (updateFailureResult(t, result, path, ignoreNodeExists)) {
- return;
- }
- // Create the parent node
- String parentPath = getParent(path);
- if (parentPath.isEmpty()) {
- result.setException(t);
- return;
- }
- // Watch for parent creation complete
- Futures.addCallback(
- doCreate(parentPath, null, CreateMode.PERSISTENT, createParent, true), new FutureCallback<String>() {
- @Override
- public void onSuccess(String parentPath) {
- // Create the requested path again
- Futures.addCallback(
- doCreate(path, data, createMode, false, ignoreNodeExists), new FutureCallback<String>() {
- @Override
- public void onSuccess(String pathResult) {
- result.set(pathResult);
- }
-
- @Override
- public void onFailure(Throwable t) {
- // handle the failure
- updateFailureResult(t, result, path, ignoreNodeExists);
- }
- });
- }
-
- @Override
- public void onFailure(Throwable t) {
- result.setException(t);
- }
- });
- }
-
- /**
- * Updates the result future based on the given {@link Throwable}.
- * @param t Cause of the failure
- * @param result Future to be updated
- * @param path Request path for the operation
- * @return {@code true} if it is a failure, {@code false} otherwise.
- */
- private boolean updateFailureResult(Throwable t, SettableOperationFuture<String> result,
- String path, boolean ignoreNodeExists) {
- // Propagate if there is error
- if (!(t instanceof KeeperException)) {
- result.setException(t);
- return true;
- }
- KeeperException.Code code = ((KeeperException) t).code();
- // Node already exists, simply return success if it allows for ignoring node exists (for parent node creation).
- if (ignoreNodeExists && code == KeeperException.Code.NODEEXISTS) {
- // The requested path could be used because it only applies to non-sequential node
- result.set(path);
- return false;
- }
- if (code != KeeperException.Code.NONODE) {
- result.setException(t);
- return true;
- }
- return false;
- }
-
- /**
- * Gets the parent of the given path.
- * @param path Path for computing its parent
- * @return Parent of the given path, or empty string if the given path is the root path already.
- */
- private String getParent(String path) {
- String parentPath = path.substring(0, path.lastIndexOf('/'));
- return (parentPath.isEmpty() && !"/".equals(path)) ? "/" : parentPath;
- }
- });
-
- return result;
- }
-
- @Override
- public OperationFuture<Stat> exists(String path) {
- return exists(path, null);
- }
-
- @Override
- public OperationFuture<Stat> exists(String path, Watcher watcher) {
- SettableOperationFuture<Stat> result = SettableOperationFuture.create(path, eventExecutor);
- getZooKeeper().exists(path, wrapWatcher(watcher), Callbacks.STAT_NONODE, result);
- return result;
- }
-
- @Override
- public OperationFuture<NodeChildren> getChildren(String path) {
- return getChildren(path, null);
- }
-
- @Override
- public OperationFuture<NodeChildren> getChildren(String path, Watcher watcher) {
- SettableOperationFuture<NodeChildren> result = SettableOperationFuture.create(path, eventExecutor);
- getZooKeeper().getChildren(path, wrapWatcher(watcher), Callbacks.CHILDREN, result);
- return result;
- }
-
- @Override
- public OperationFuture<NodeData> getData(String path) {
- return getData(path, null);
- }
-
- @Override
- public OperationFuture<NodeData> getData(String path, Watcher watcher) {
- SettableOperationFuture<NodeData> result = SettableOperationFuture.create(path, eventExecutor);
- getZooKeeper().getData(path, wrapWatcher(watcher), Callbacks.DATA, result);
-
- return result;
- }
-
- @Override
- public OperationFuture<Stat> setData(String path, byte[] data) {
- return setData(path, data, -1);
- }
-
- @Override
- public OperationFuture<Stat> setData(String dataPath, byte[] data, int version) {
- SettableOperationFuture<Stat> result = SettableOperationFuture.create(dataPath, eventExecutor);
- getZooKeeper().setData(dataPath, data, version, Callbacks.STAT, result);
- return result;
- }
-
- @Override
- public OperationFuture<String> delete(String path) {
- return delete(path, -1);
- }
-
- @Override
- public OperationFuture<String> delete(String deletePath, int version) {
- SettableOperationFuture<String> result = SettableOperationFuture.create(deletePath, eventExecutor);
- getZooKeeper().delete(deletePath, version, Callbacks.VOID, result);
- return result;
- }
-
- @Override
- public Supplier<ZooKeeper> getZooKeeperSupplier() {
- return new Supplier<ZooKeeper>() {
- @Override
- public ZooKeeper get() {
- return getZooKeeper();
- }
- };
- }
-
- @Override
- public ListenableFuture<State> start() {
- return serviceDelegate.start();
- }
-
- @Override
- public State startAndWait() {
- return serviceDelegate.startAndWait();
- }
-
- @Override
- public boolean isRunning() {
- return serviceDelegate.isRunning();
- }
-
- @Override
- public State state() {
- return serviceDelegate.state();
- }
-
- @Override
- public ListenableFuture<State> stop() {
- return serviceDelegate.stop();
- }
-
- @Override
- public State stopAndWait() {
- return serviceDelegate.stopAndWait();
- }
-
- @Override
- public void addListener(Listener listener, Executor executor) {
- serviceDelegate.addListener(listener, executor);
- }
-
- /**
- * @return Current {@link ZooKeeper} client.
- */
- private ZooKeeper getZooKeeper() {
- ZooKeeper zk = zooKeeper.get();
- Preconditions.checkArgument(zk != null, "Not connected to zooKeeper.");
- return zk;
- }
-
- /**
- * Wraps the given watcher to be called from the event executor.
- * @param watcher Watcher to be wrapped
- * @return The wrapped Watcher
- */
- private Watcher wrapWatcher(final Watcher watcher) {
- if (watcher == null) {
- return null;
- }
- return new Watcher() {
- @Override
- public void process(final WatchedEvent event) {
- eventExecutor.execute(new Runnable() {
- @Override
- public void run() {
- try {
- watcher.process(event);
- } catch (Throwable t) {
- LOG.error("Watcher throws exception.", t);
- }
- }
- });
- }
- };
- }
-
- private final class ServiceDelegate extends AbstractService implements Watcher {
-
- @Override
- protected void doStart() {
- // A single thread executor
- eventExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(),
- Threads.createDaemonThreadFactory("zk-client-EventThread")) {
- @Override
- protected void terminated() {
- super.terminated();
- notifyStopped();
- }
- };
-
- try {
- zooKeeper.set(new ZooKeeper(zkStr, sessionTimeout, this));
- } catch (IOException e) {
- notifyFailed(e);
- }
- }
-
- @Override
- protected void doStop() {
- ZooKeeper zk = zooKeeper.getAndSet(null);
- if (zk != null) {
- try {
- zk.close();
- } catch (InterruptedException e) {
- notifyFailed(e);
- } finally {
- eventExecutor.shutdown();
- }
- }
- }
-
- @Override
- public void process(WatchedEvent event) {
- try {
- if (event.getState() == Event.KeeperState.SyncConnected && state() == State.STARTING) {
- LOG.info("Connected to ZooKeeper: " + zkStr);
- notifyStarted();
- return;
- }
- if (event.getState() == Event.KeeperState.Expired) {
- LOG.info("ZooKeeper session expired: " + zkStr);
-
- // When connection expired, simply reconnect again
- Thread t = new Thread(new Runnable() {
- @Override
- public void run() {
- try {
- zooKeeper.set(new ZooKeeper(zkStr, sessionTimeout, ServiceDelegate.this));
- } catch (IOException e) {
- zooKeeper.set(null);
- notifyFailed(e);
- }
- }
- }, "zk-reconnect");
- t.setDaemon(true);
- t.start();
- }
- } finally {
- if (event.getType() == Event.EventType.None && !connectionWatchers.isEmpty()) {
- for (Watcher connectionWatcher : connectionWatchers) {
- connectionWatcher.process(event);
- }
- }
- }
- }
- }
-
- /**
- * Collection of generic callbacks that simply reflect results into OperationFuture.
- */
- private static final class Callbacks {
- static final AsyncCallback.StringCallback STRING = new AsyncCallback.StringCallback() {
- @Override
- @SuppressWarnings("unchecked")
- public void processResult(int rc, String path, Object ctx, String name) {
- SettableOperationFuture<String> result = (SettableOperationFuture<String>) ctx;
- KeeperException.Code code = KeeperException.Code.get(rc);
- if (code == KeeperException.Code.OK) {
- result.set((name == null || name.isEmpty()) ? path : name);
- return;
- }
- result.setException(KeeperException.create(code, result.getRequestPath()));
- }
- };
-
- static final AsyncCallback.StatCallback STAT = new AsyncCallback.StatCallback() {
- @Override
- @SuppressWarnings("unchecked")
- public void processResult(int rc, String path, Object ctx, Stat stat) {
- SettableOperationFuture<Stat> result = (SettableOperationFuture<Stat>) ctx;
- KeeperException.Code code = KeeperException.Code.get(rc);
- if (code == KeeperException.Code.OK) {
- result.set(stat);
- return;
- }
- result.setException(KeeperException.create(code, result.getRequestPath()));
- }
- };
-
- /**
- * A stat callback that treats NONODE as success.
- */
- static final AsyncCallback.StatCallback STAT_NONODE = new AsyncCallback.StatCallback() {
- @Override
- @SuppressWarnings("unchecked")
- public void processResult(int rc, String path, Object ctx, Stat stat) {
- SettableOperationFuture<Stat> result = (SettableOperationFuture<Stat>) ctx;
- KeeperException.Code code = KeeperException.Code.get(rc);
- if (code == KeeperException.Code.OK || code == KeeperException.Code.NONODE) {
- result.set(stat);
- return;
- }
- result.setException(KeeperException.create(code, result.getRequestPath()));
- }
- };
-
- static final AsyncCallback.Children2Callback CHILDREN = new AsyncCallback.Children2Callback() {
- @Override
- @SuppressWarnings("unchecked")
- public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
- SettableOperationFuture<NodeChildren> result = (SettableOperationFuture<NodeChildren>) ctx;
- KeeperException.Code code = KeeperException.Code.get(rc);
- if (code == KeeperException.Code.OK) {
- result.set(new BasicNodeChildren(children, stat));
- return;
- }
- result.setException(KeeperException.create(code, result.getRequestPath()));
- }
- };
-
- static final AsyncCallback.DataCallback DATA = new AsyncCallback.DataCallback() {
- @Override
- @SuppressWarnings("unchecked")
- public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
- SettableOperationFuture<NodeData> result = (SettableOperationFuture<NodeData>) ctx;
- KeeperException.Code code = KeeperException.Code.get(rc);
- if (code == KeeperException.Code.OK) {
- result.set(new BasicNodeData(data, stat));
- return;
- }
- result.setException(KeeperException.create(code, result.getRequestPath()));
- }
- };
-
- static final AsyncCallback.VoidCallback VOID = new AsyncCallback.VoidCallback() {
- @Override
- @SuppressWarnings("unchecked")
- public void processResult(int rc, String path, Object ctx) {
- SettableOperationFuture<String> result = (SettableOperationFuture<String>) ctx;
- KeeperException.Code code = KeeperException.Code.get(rc);
- if (code == KeeperException.Code.OK) {
- result.set(result.getRequestPath());
- return;
- }
- // Otherwise, it is an error
- result.setException(KeeperException.create(code, result.getRequestPath()));
- }
- };
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/zookeeper/src/main/java/org/apache/twill/internal/zookeeper/FailureRetryZKClient.java
----------------------------------------------------------------------
diff --git a/zookeeper/src/main/java/org/apache/twill/internal/zookeeper/FailureRetryZKClient.java b/zookeeper/src/main/java/org/apache/twill/internal/zookeeper/FailureRetryZKClient.java
deleted file mode 100644
index 65ceadb..0000000
--- a/zookeeper/src/main/java/org/apache/twill/internal/zookeeper/FailureRetryZKClient.java
+++ /dev/null
@@ -1,240 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.zookeeper;
-
-import org.apache.twill.common.Threads;
-import org.apache.twill.zookeeper.ForwardingZKClient;
-import org.apache.twill.zookeeper.NodeChildren;
-import org.apache.twill.zookeeper.NodeData;
-import org.apache.twill.zookeeper.OperationFuture;
-import org.apache.twill.zookeeper.RetryStrategy;
-import org.apache.twill.zookeeper.RetryStrategy.OperationType;
-import org.apache.twill.zookeeper.ZKClient;
-import com.google.common.base.Supplier;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.data.Stat;
-
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * A {@link ZKClient} that will invoke {@link RetryStrategy} on operation failure.
- * This {@link ZKClient} works by delegating calls to another {@link ZKClient}
- * and listen for the result. If the result is a failure, and is
- * {@link RetryUtils#canRetry(org.apache.zookeeper.KeeperException.Code) retryable}, the given {@link RetryStrategy}
- * will be called to determine the next retry time, or give up, depending on the value returned by the strategy.
- */
-public final class FailureRetryZKClient extends ForwardingZKClient {
-
- private static final ScheduledExecutorService SCHEDULER = Executors.newSingleThreadScheduledExecutor(
- Threads.createDaemonThreadFactory("retry-zkclient"));
- private final RetryStrategy retryStrategy;
-
- public FailureRetryZKClient(ZKClient delegate, RetryStrategy retryStrategy) {
- super(delegate);
- this.retryStrategy = retryStrategy;
- }
-
- @Override
- public OperationFuture<String> create(String path, byte[] data, CreateMode createMode) {
- return create(path, data, createMode, true);
- }
-
- @Override
- public OperationFuture<String> create(final String path, final byte[] data,
- final CreateMode createMode, final boolean createParent) {
-
- // No retry for any SEQUENTIAL node, as some algorithms depends on only one sequential node being created.
- if (createMode == CreateMode.PERSISTENT_SEQUENTIAL || createMode == CreateMode.EPHEMERAL_SEQUENTIAL) {
- return super.create(path, data, createMode, createParent);
- }
-
- final SettableOperationFuture<String> result = SettableOperationFuture.create(path, Threads.SAME_THREAD_EXECUTOR);
- Futures.addCallback(super.create(path, data, createMode, createParent),
- new OperationFutureCallback<String>(OperationType.CREATE, System.currentTimeMillis(),
- path, result, new Supplier<OperationFuture<String>>() {
- @Override
- public OperationFuture<String> get() {
- return FailureRetryZKClient.super.create(path, data, createMode, createParent);
- }
- }));
- return result;
- }
-
- @Override
- public OperationFuture<Stat> exists(String path) {
- return exists(path, null);
- }
-
- @Override
- public OperationFuture<Stat> exists(final String path, final Watcher watcher) {
- final SettableOperationFuture<Stat> result = SettableOperationFuture.create(path, Threads.SAME_THREAD_EXECUTOR);
- Futures.addCallback(super.exists(path, watcher),
- new OperationFutureCallback<Stat>(OperationType.EXISTS, System.currentTimeMillis(),
- path, result, new Supplier<OperationFuture<Stat>>() {
- @Override
- public OperationFuture<Stat> get() {
- return FailureRetryZKClient.super.exists(path, watcher);
- }
- }));
- return result;
- }
-
- @Override
- public OperationFuture<NodeChildren> getChildren(String path) {
- return getChildren(path, null);
- }
-
- @Override
- public OperationFuture<NodeChildren> getChildren(final String path, final Watcher watcher) {
- final SettableOperationFuture<NodeChildren> result = SettableOperationFuture.create(path,
- Threads.SAME_THREAD_EXECUTOR);
- Futures.addCallback(super.getChildren(path, watcher),
- new OperationFutureCallback<NodeChildren>(OperationType.GET_CHILDREN,
- System.currentTimeMillis(), path, result,
- new Supplier<OperationFuture<NodeChildren>>() {
- @Override
- public OperationFuture<NodeChildren> get() {
- return FailureRetryZKClient.super.getChildren(path, watcher);
- }
- }));
- return result;
- }
-
- @Override
- public OperationFuture<NodeData> getData(String path) {
- return getData(path, null);
- }
-
- @Override
- public OperationFuture<NodeData> getData(final String path, final Watcher watcher) {
- final SettableOperationFuture<NodeData> result = SettableOperationFuture.create(path, Threads.SAME_THREAD_EXECUTOR);
- Futures.addCallback(super.getData(path, watcher),
- new OperationFutureCallback<NodeData>(OperationType.GET_DATA, System.currentTimeMillis(),
- path, result, new Supplier<OperationFuture<NodeData>>() {
- @Override
- public OperationFuture<NodeData> get() {
- return FailureRetryZKClient.super.getData(path, watcher);
- }
- }));
- return result;
- }
-
- @Override
- public OperationFuture<Stat> setData(String path, byte[] data) {
- return setData(path, data, -1);
- }
-
- @Override
- public OperationFuture<Stat> setData(final String dataPath, final byte[] data, final int version) {
- final SettableOperationFuture<Stat> result = SettableOperationFuture.create(dataPath, Threads.SAME_THREAD_EXECUTOR);
- Futures.addCallback(super.setData(dataPath, data, version),
- new OperationFutureCallback<Stat>(OperationType.SET_DATA, System.currentTimeMillis(),
- dataPath, result, new Supplier<OperationFuture<Stat>>() {
- @Override
- public OperationFuture<Stat> get() {
- return FailureRetryZKClient.super.setData(dataPath, data, version);
- }
- }));
- return result;
- }
-
- @Override
- public OperationFuture<String> delete(String path) {
- return delete(path, -1);
- }
-
- @Override
- public OperationFuture<String> delete(final String deletePath, final int version) {
- final SettableOperationFuture<String> result = SettableOperationFuture.create(deletePath,
- Threads.SAME_THREAD_EXECUTOR);
- Futures.addCallback(super.delete(deletePath, version),
- new OperationFutureCallback<String>(OperationType.DELETE, System.currentTimeMillis(),
- deletePath, result, new Supplier<OperationFuture<String>>
- () {
- @Override
- public OperationFuture<String> get() {
- return FailureRetryZKClient.super.delete(deletePath, version);
- }
- }));
- return result;
- }
-
- /**
- * Callback to watch for operation result and trigger retry if necessary.
- * @param <V> Type of operation result.
- */
- private final class OperationFutureCallback<V> implements FutureCallback<V> {
-
- private final OperationType type;
- private final long startTime;
- private final String path;
- private final SettableOperationFuture<V> result;
- private final Supplier<OperationFuture<V>> retryAction;
- private final AtomicInteger failureCount;
-
- private OperationFutureCallback(OperationType type, long startTime, String path,
- SettableOperationFuture<V> result, Supplier<OperationFuture<V>> retryAction) {
- this.type = type;
- this.startTime = startTime;
- this.path = path;
- this.result = result;
- this.retryAction = retryAction;
- this.failureCount = new AtomicInteger(0);
- }
-
- @Override
- public void onSuccess(V result) {
- this.result.set(result);
- }
-
- @Override
- public void onFailure(Throwable t) {
- if (!doRetry(t)) {
- result.setException(t);
- }
- }
-
- private boolean doRetry(Throwable t) {
- if (!RetryUtils.canRetry(t)) {
- return false;
- }
-
- // Determine the relay delay
- long nextRetry = retryStrategy.nextRetry(failureCount.incrementAndGet(), startTime, type, path);
- if (nextRetry < 0) {
- return false;
- }
-
- // Schedule the retry.
- SCHEDULER.schedule(new Runnable() {
- @Override
- public void run() {
- Futures.addCallback(retryAction.get(), OperationFutureCallback.this);
- }
- }, nextRetry, TimeUnit.MILLISECONDS);
-
- return true;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/zookeeper/src/main/java/org/apache/twill/internal/zookeeper/InMemoryZKServer.java
----------------------------------------------------------------------
diff --git a/zookeeper/src/main/java/org/apache/twill/internal/zookeeper/InMemoryZKServer.java b/zookeeper/src/main/java/org/apache/twill/internal/zookeeper/InMemoryZKServer.java
deleted file mode 100644
index c4eed59..0000000
--- a/zookeeper/src/main/java/org/apache/twill/internal/zookeeper/InMemoryZKServer.java
+++ /dev/null
@@ -1,198 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.zookeeper;
-
-import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
-import com.google.common.io.Files;
-import com.google.common.util.concurrent.AbstractIdleService;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.Service;
-import org.apache.zookeeper.server.ServerCnxnFactory;
-import org.apache.zookeeper.server.ZooKeeperServer;
-import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.UnknownHostException;
-import java.util.concurrent.Executor;
-
-/**
- *
- */
-public final class InMemoryZKServer implements Service {
-
- private static final Logger LOG = LoggerFactory.getLogger(InMemoryZKServer.class);
-
- private final File dataDir;
- private final int tickTime;
- private final boolean autoClean;
- private final int port;
- private final Service delegateService = new AbstractIdleService() {
- @Override
- protected void startUp() throws Exception {
- ZooKeeperServer zkServer = new ZooKeeperServer();
- FileTxnSnapLog ftxn = new FileTxnSnapLog(dataDir, dataDir);
- zkServer.setTxnLogFactory(ftxn);
- zkServer.setTickTime(tickTime);
-
- factory = ServerCnxnFactory.createFactory();
- factory.configure(getAddress(port), -1);
- factory.startup(zkServer);
-
- LOG.info("In memory ZK started: " + getConnectionStr());
- }
-
- @Override
- protected void shutDown() throws Exception {
- try {
- factory.shutdown();
- } finally {
- if (autoClean) {
- cleanDir(dataDir);
- }
- }
- }
- };
-
- private ServerCnxnFactory factory;
-
- public static Builder builder() {
- return new Builder();
- }
-
- private InMemoryZKServer(File dataDir, int tickTime, boolean autoClean, int port) {
- if (dataDir == null) {
- dataDir = Files.createTempDir();
- autoClean = true;
- } else {
- Preconditions.checkArgument(dataDir.isDirectory() || dataDir.mkdirs() || dataDir.isDirectory());
- }
-
- this.dataDir = dataDir;
- this.tickTime = tickTime;
- this.autoClean = autoClean;
- this.port = port;
- }
-
- public String getConnectionStr() {
- InetSocketAddress addr = factory.getLocalAddress();
- return String.format("%s:%d", addr.getHostName(), addr.getPort());
- }
-
- public InetSocketAddress getLocalAddress() {
- return factory.getLocalAddress();
- }
-
- private InetSocketAddress getAddress(int port) {
- try {
-// return new InetSocketAddress(InetAddress.getByAddress(new byte[] {127, 0, 0, 1}), port < 0 ? 0 : port);
- return new InetSocketAddress(InetAddress.getLocalHost(), port < 0 ? 0 : port);
- } catch (UnknownHostException e) {
- throw Throwables.propagate(e);
- }
- }
-
- private void cleanDir(File dir) {
- File[] files = dir.listFiles();
- if (files == null) {
- return;
- }
- for (File file : files) {
- if (file.isDirectory()) {
- cleanDir(file);
- }
- file.delete();
- }
- }
-
- @Override
- public ListenableFuture<State> start() {
- return delegateService.start();
- }
-
- @Override
- public State startAndWait() {
- return delegateService.startAndWait();
- }
-
- @Override
- public boolean isRunning() {
- return delegateService.isRunning();
- }
-
- @Override
- public State state() {
- return delegateService.state();
- }
-
- @Override
- public ListenableFuture<State> stop() {
- return delegateService.stop();
- }
-
- @Override
- public State stopAndWait() {
- return delegateService.stopAndWait();
- }
-
- @Override
- public void addListener(Listener listener, Executor executor) {
- delegateService.addListener(listener, executor);
- }
-
- /**
- * Builder for creating instance of {@link InMemoryZKServer}.
- */
- public static final class Builder {
- private File dataDir;
- private boolean autoCleanDataDir = false;
- private int tickTime = ZooKeeperServer.DEFAULT_TICK_TIME;
- private int port = -1;
-
- public Builder setDataDir(File dataDir) {
- this.dataDir = dataDir;
- return this;
- }
-
- public Builder setAutoCleanDataDir(boolean auto) {
- this.autoCleanDataDir = auto;
- return this;
- }
-
- public Builder setTickTime(int tickTime) {
- this.tickTime = tickTime;
- return this;
- }
-
- public Builder setPort(int port) {
- this.port = port;
- return this;
- }
-
- public InMemoryZKServer build() {
- return new InMemoryZKServer(dataDir, tickTime, autoCleanDataDir, port);
- }
-
- private Builder() {
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/zookeeper/src/main/java/org/apache/twill/internal/zookeeper/KillZKSession.java
----------------------------------------------------------------------
diff --git a/zookeeper/src/main/java/org/apache/twill/internal/zookeeper/KillZKSession.java b/zookeeper/src/main/java/org/apache/twill/internal/zookeeper/KillZKSession.java
deleted file mode 100644
index bc01f08..0000000
--- a/zookeeper/src/main/java/org/apache/twill/internal/zookeeper/KillZKSession.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.zookeeper;
-
-import com.google.common.base.Preconditions;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooKeeper;
-
-import java.io.IOException;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Utility class for killing ZK client to simulate failures during testing.
- */
-public final class KillZKSession {
-
- /**
- * Utility classes should have a public constructor or a default constructor
- * hence made it private.
- */
- private KillZKSession() {}
-
- /**
- * Kills a Zookeeper client to simulate failure scenarious during testing.
- * Callee will provide the amount of time to wait before it's considered failure
- * to kill a client.
- *
- * @param client that needs to be killed.
- * @param connectionString of Quorum
- * @param maxMs time in millisecond specifying the max time to kill a client.
- * @throws IOException When there is IO error
- * @throws InterruptedException When call has been interrupted.
- */
- public static void kill(ZooKeeper client, String connectionString,
- int maxMs) throws IOException, InterruptedException {
- final CountDownLatch latch = new CountDownLatch(1);
- ZooKeeper zk = new ZooKeeper(connectionString, maxMs, new Watcher() {
- @Override
- public void process(WatchedEvent event) {
- if (event.getState() == Event.KeeperState.SyncConnected) {
- latch.countDown();
- }
- }
- }, client.getSessionId(), client.getSessionPasswd());
-
- try {
- Preconditions.checkState(latch.await(maxMs, TimeUnit.MILLISECONDS), "Fail to kill ZK connection.");
- } finally {
- zk.close();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/zookeeper/src/main/java/org/apache/twill/internal/zookeeper/NamespaceZKClient.java
----------------------------------------------------------------------
diff --git a/zookeeper/src/main/java/org/apache/twill/internal/zookeeper/NamespaceZKClient.java b/zookeeper/src/main/java/org/apache/twill/internal/zookeeper/NamespaceZKClient.java
deleted file mode 100644
index 1a82e4b..0000000
--- a/zookeeper/src/main/java/org/apache/twill/internal/zookeeper/NamespaceZKClient.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.zookeeper;
-
-import org.apache.twill.common.Threads;
-import org.apache.twill.zookeeper.ForwardingZKClient;
-import org.apache.twill.zookeeper.NodeChildren;
-import org.apache.twill.zookeeper.NodeData;
-import org.apache.twill.zookeeper.OperationFuture;
-import org.apache.twill.zookeeper.ZKClient;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.data.Stat;
-
-import javax.annotation.Nullable;
-
-/**
- * A {@link ZKClient} that namespace every paths.
- */
-public final class NamespaceZKClient extends ForwardingZKClient {
- // This class extends from ForwardingZKClient but overrides every method is for letting the
- // ZKClientServices delegate logic works.
-
- private final String namespace;
- private final ZKClient delegate;
- private final String connectString;
-
- public NamespaceZKClient(ZKClient delegate, String namespace) {
- super(delegate);
- this.namespace = namespace;
- this.delegate = delegate;
- this.connectString = delegate.getConnectString() + namespace;
- }
-
- @Override
- public Long getSessionId() {
- return delegate.getSessionId();
- }
-
- @Override
- public String getConnectString() {
- return connectString;
- }
-
- @Override
- public void addConnectionWatcher(Watcher watcher) {
- delegate.addConnectionWatcher(watcher);
- }
-
- @Override
- public OperationFuture<String> create(String path, @Nullable byte[] data, CreateMode createMode) {
- return relayPath(delegate.create(namespace + path, data, createMode), this.<String>createFuture(path));
- }
-
- @Override
- public OperationFuture<String> create(String path, @Nullable byte[] data, CreateMode createMode,
- boolean createParent) {
- return relayPath(delegate.create(namespace + path, data, createMode, createParent),
- this.<String>createFuture(path));
- }
-
- @Override
- public OperationFuture<Stat> exists(String path) {
- return relayFuture(delegate.exists(namespace + path), this.<Stat>createFuture(path));
- }
-
- @Override
- public OperationFuture<Stat> exists(String path, @Nullable Watcher watcher) {
- return relayFuture(delegate.exists(namespace + path, watcher), this.<Stat>createFuture(path));
- }
-
- @Override
- public OperationFuture<NodeChildren> getChildren(String path) {
- return relayFuture(delegate.getChildren(namespace + path), this.<NodeChildren>createFuture(path));
- }
-
- @Override
- public OperationFuture<NodeChildren> getChildren(String path, @Nullable Watcher watcher) {
- return relayFuture(delegate.getChildren(namespace + path, watcher), this.<NodeChildren>createFuture(path));
- }
-
- @Override
- public OperationFuture<NodeData> getData(String path) {
- return relayFuture(delegate.getData(namespace + path), this.<NodeData>createFuture(path));
- }
-
- @Override
- public OperationFuture<NodeData> getData(String path, @Nullable Watcher watcher) {
- return relayFuture(delegate.getData(namespace + path, watcher), this.<NodeData>createFuture(path));
- }
-
- @Override
- public OperationFuture<Stat> setData(String path, byte[] data) {
- return relayFuture(delegate.setData(namespace + path, data), this.<Stat>createFuture(path));
- }
-
- @Override
- public OperationFuture<Stat> setData(String dataPath, byte[] data, int version) {
- return relayFuture(delegate.setData(namespace + dataPath, data, version), this.<Stat>createFuture(dataPath));
- }
-
- @Override
- public OperationFuture<String> delete(String path) {
- return relayPath(delegate.delete(namespace + path), this.<String>createFuture(path));
- }
-
- @Override
- public OperationFuture<String> delete(String deletePath, int version) {
- return relayPath(delegate.delete(namespace + deletePath, version), this.<String>createFuture(deletePath));
- }
-
- private <V> SettableOperationFuture<V> createFuture(String path) {
- return SettableOperationFuture.create(namespace + path, Threads.SAME_THREAD_EXECUTOR);
- }
-
- private <V> OperationFuture<V> relayFuture(final OperationFuture<V> from, final SettableOperationFuture<V> to) {
- Futures.addCallback(from, new FutureCallback<V>() {
- @Override
- public void onSuccess(V result) {
- to.set(result);
- }
-
- @Override
- public void onFailure(Throwable t) {
- to.setException(t);
- }
- });
- return to;
- }
-
- private OperationFuture<String> relayPath(final OperationFuture<String> from,
- final SettableOperationFuture<String> to) {
- from.addListener(new Runnable() {
- @Override
- public void run() {
- try {
- String path = from.get();
- to.set(path.substring(namespace.length()));
- } catch (Exception e) {
- to.setException(e.getCause());
- }
- }
- }, Threads.SAME_THREAD_EXECUTOR);
- return to;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/zookeeper/src/main/java/org/apache/twill/internal/zookeeper/RetryUtils.java
----------------------------------------------------------------------
diff --git a/zookeeper/src/main/java/org/apache/twill/internal/zookeeper/RetryUtils.java b/zookeeper/src/main/java/org/apache/twill/internal/zookeeper/RetryUtils.java
deleted file mode 100644
index fb42491..0000000
--- a/zookeeper/src/main/java/org/apache/twill/internal/zookeeper/RetryUtils.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.zookeeper;
-
-import org.apache.zookeeper.KeeperException;
-
-/**
- * Utility class for help determining operation retry condition.
- */
-final class RetryUtils {
-
- /**
- * Tells if a given operation error code can be retried or not.
- * @param code The error code of the operation.
- * @return {@code true} if the operation can be retried.
- */
- public static boolean canRetry(KeeperException.Code code) {
- return (code == KeeperException.Code.CONNECTIONLOSS
- || code == KeeperException.Code.OPERATIONTIMEOUT
- || code == KeeperException.Code.SESSIONEXPIRED
- || code == KeeperException.Code.SESSIONMOVED);
- }
-
- /**
- * Tells if a given operation exception can be retried or not.
- * @param t The exception raised by an operation.
- * @return {@code true} if the operation can be retried.
- */
- public static boolean canRetry(Throwable t) {
- return t instanceof KeeperException && canRetry(((KeeperException) t).code());
- }
-
- private RetryUtils() {
- }
-}