You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@twill.apache.org by ch...@apache.org on 2013/12/13 23:27:31 UTC
[15/28] [TWILL-14] Bootstrapping for the site generation.
Reorganization of the source tree happens:
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-core/src/test/java/org/apache/twill/internal/utils/ApplicationBundlerTest.java
----------------------------------------------------------------------
diff --git a/twill-core/src/test/java/org/apache/twill/internal/utils/ApplicationBundlerTest.java b/twill-core/src/test/java/org/apache/twill/internal/utils/ApplicationBundlerTest.java
new file mode 100644
index 0000000..508cadb
--- /dev/null
+++ b/twill-core/src/test/java/org/apache/twill/internal/utils/ApplicationBundlerTest.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.utils;
+
+import org.apache.twill.filesystem.LocalLocationFactory;
+import org.apache.twill.filesystem.Location;
+import org.apache.twill.internal.ApplicationBundler;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.io.ByteStreams;
+import com.google.common.io.Files;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.List;
+import java.util.jar.JarEntry;
+import java.util.jar.JarInputStream;
+
+/**
+ *
+ */
+public class ApplicationBundlerTest {
+
+ @Rule
+ public TemporaryFolder tmpDir = new TemporaryFolder();
+
+ @Test
+ public void testFindDependencies() throws IOException, ClassNotFoundException {
+ Location location = new LocalLocationFactory(tmpDir.newFolder()).create("test.jar");
+
+ // Create a jar file with by tracing dependency
+ ApplicationBundler bundler = new ApplicationBundler(ImmutableList.<String>of());
+ bundler.createBundle(location, ApplicationBundler.class);
+
+ File targetDir = tmpDir.newFolder();
+ unjar(new File(location.toURI()), targetDir);
+
+ // Load the class back, it should be loaded by the custom classloader
+ ClassLoader classLoader = createClassLoader(targetDir);
+ Class<?> clz = classLoader.loadClass(ApplicationBundler.class.getName());
+ Assert.assertSame(classLoader, clz.getClassLoader());
+
+ // For system classes, they shouldn't be packaged, hence loaded by different classloader.
+ clz = classLoader.loadClass(Object.class.getName());
+ Assert.assertNotSame(classLoader, clz.getClassLoader());
+ }
+
+ private void unjar(File jarFile, File targetDir) throws IOException {
+ JarInputStream jarInput = new JarInputStream(new FileInputStream(jarFile));
+ try {
+ JarEntry jarEntry = jarInput.getNextJarEntry();
+ while (jarEntry != null) {
+ File target = new File(targetDir, jarEntry.getName());
+ if (jarEntry.isDirectory()) {
+ target.mkdirs();
+ } else {
+ target.getParentFile().mkdirs();
+ ByteStreams.copy(jarInput, Files.newOutputStreamSupplier(target));
+ }
+
+ jarEntry = jarInput.getNextJarEntry();
+ }
+ } finally {
+ jarInput.close();
+ }
+ }
+
+ private ClassLoader createClassLoader(File dir) throws MalformedURLException {
+ List<URL> urls = Lists.newArrayList();
+ urls.add(new File(dir, "classes").toURI().toURL());
+ File[] libFiles = new File(dir, "lib").listFiles();
+ if (libFiles != null) {
+ for (File file : libFiles) {
+ urls.add(file.toURI().toURL());
+ }
+ }
+ return new URLClassLoader(urls.toArray(new URL[0])) {
+ @Override
+ protected synchronized Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException {
+ // Load class from the given URLs first before delegating to parent.
+ try {
+ return super.findClass(name);
+ } catch (ClassNotFoundException e) {
+ ClassLoader parent = getParent();
+ return parent == null ? ClassLoader.getSystemClassLoader().loadClass(name) : parent.loadClass(name);
+ }
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java
----------------------------------------------------------------------
diff --git a/twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java b/twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java
new file mode 100644
index 0000000..40fc3ed
--- /dev/null
+++ b/twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.kafka.client;
+
+import org.apache.twill.common.Services;
+import org.apache.twill.internal.kafka.EmbeddedKafkaServer;
+import org.apache.twill.internal.kafka.client.Compression;
+import org.apache.twill.internal.kafka.client.SimpleKafkaClient;
+import org.apache.twill.internal.utils.Networks;
+import org.apache.twill.internal.zookeeper.InMemoryZKServer;
+import org.apache.twill.zookeeper.ZKClientService;
+import com.google.common.base.Charsets;
+import com.google.common.base.Preconditions;
+import com.google.common.io.ByteStreams;
+import com.google.common.io.Files;
+import com.google.common.util.concurrent.Futures;
+import org.apache.commons.compress.archivers.ArchiveEntry;
+import org.apache.commons.compress.archivers.ArchiveException;
+import org.apache.commons.compress.archivers.ArchiveInputStream;
+import org.apache.commons.compress.archivers.ArchiveStreamFactory;
+import org.apache.commons.compress.compressors.CompressorException;
+import org.apache.commons.compress.compressors.CompressorStreamFactory;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Iterator;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+/**
+ *
+ */
+public class KafkaTest {
+
+ private static final Logger LOG = LoggerFactory.getLogger(KafkaTest.class);
+
+ @ClassRule
+ public static final TemporaryFolder TMP_FOLDER = new TemporaryFolder();
+
+ private static InMemoryZKServer zkServer;
+ private static EmbeddedKafkaServer kafkaServer;
+ private static ZKClientService zkClientService;
+ private static KafkaClient kafkaClient;
+
+ @BeforeClass
+ public static void init() throws Exception {
+ zkServer = InMemoryZKServer.builder().setDataDir(TMP_FOLDER.newFolder()).build();
+ zkServer.startAndWait();
+
+ // Extract the kafka.tgz and start the kafka server
+ kafkaServer = new EmbeddedKafkaServer(extractKafka(), generateKafkaConfig(zkServer.getConnectionStr()));
+ kafkaServer.startAndWait();
+
+ zkClientService = ZKClientService.Builder.of(zkServer.getConnectionStr()).build();
+
+ kafkaClient = new SimpleKafkaClient(zkClientService);
+ Services.chainStart(zkClientService, kafkaClient).get();
+ }
+
+ @AfterClass
+ public static void finish() throws Exception {
+ Services.chainStop(kafkaClient, zkClientService).get();
+ kafkaServer.stopAndWait();
+ zkServer.stopAndWait();
+ }
+
+ @Test
+ public void testKafkaClient() throws Exception {
+ String topic = "testClient";
+
+ Thread t1 = createPublishThread(kafkaClient, topic, Compression.GZIP, "GZIP Testing message", 10);
+ Thread t2 = createPublishThread(kafkaClient, topic, Compression.NONE, "Testing message", 10);
+
+ t1.start();
+ t2.start();
+
+ Thread t3 = createPublishThread(kafkaClient, topic, Compression.SNAPPY, "Snappy Testing message", 10);
+ t2.join();
+ t3.start();
+
+ Iterator<FetchedMessage> consumer = kafkaClient.consume(topic, 0, 0, 1048576);
+ int count = 0;
+ long startTime = System.nanoTime();
+ while (count < 30 && consumer.hasNext() && secondsPassed(startTime, TimeUnit.NANOSECONDS) < 5) {
+ LOG.info(Charsets.UTF_8.decode(consumer.next().getBuffer()).toString());
+ count++;
+ }
+
+ Assert.assertEquals(30, count);
+ }
+
+ @Test (timeout = 10000)
+ public void testOffset() throws Exception {
+ String topic = "testOffset";
+
+ // Initial earliest offset should be 0.
+ long[] offsets = kafkaClient.getOffset(topic, 0, -2, 10).get();
+ Assert.assertArrayEquals(new long[]{0L}, offsets);
+
+ // Publish some messages
+ Thread publishThread = createPublishThread(kafkaClient, topic, Compression.NONE, "Testing", 2000);
+ publishThread.start();
+ publishThread.join();
+
+ // Fetch earliest offset, should still be 0.
+ offsets = kafkaClient.getOffset(topic, 0, -2, 10).get();
+ Assert.assertArrayEquals(new long[]{0L}, offsets);
+
+ // Fetch latest offset
+ offsets = kafkaClient.getOffset(topic, 0, -1, 10).get();
+ Iterator<FetchedMessage> consumer = kafkaClient.consume(topic, 0, offsets[0], 1048576);
+
+ // Publish one more message, the consumer should see the new message being published.
+ publishThread = createPublishThread(kafkaClient, topic, Compression.NONE, "Testing", 1, 3000);
+ publishThread.start();
+ publishThread.join();
+
+ // Should see the last message being published.
+ Assert.assertTrue(consumer.hasNext());
+ Assert.assertEquals("3000 Testing", Charsets.UTF_8.decode(consumer.next().getBuffer()).toString());
+ }
+
+ private Thread createPublishThread(final KafkaClient kafkaClient, final String topic,
+ final Compression compression, final String message, final int count) {
+ return createPublishThread(kafkaClient, topic, compression, message, count, 0);
+ }
+
+ private Thread createPublishThread(final KafkaClient kafkaClient, final String topic, final Compression compression,
+ final String message, final int count, final int base) {
+ return new Thread() {
+ public void run() {
+ PreparePublish preparePublish = kafkaClient.preparePublish(topic, compression);
+ for (int i = 0; i < count; i++) {
+ preparePublish.add(((base + i) + " " + message).getBytes(Charsets.UTF_8), 0);
+ }
+ Futures.getUnchecked(preparePublish.publish());
+ }
+ };
+ }
+
+ private long secondsPassed(long startTime, TimeUnit startUnit) {
+ return TimeUnit.SECONDS.convert(System.nanoTime() - TimeUnit.NANOSECONDS.convert(startTime, startUnit),
+ TimeUnit.NANOSECONDS);
+ }
+
+ private static File extractKafka() throws IOException, ArchiveException, CompressorException {
+ File kafkaExtract = TMP_FOLDER.newFolder();
+ InputStream kakfaResource = KafkaTest.class.getClassLoader().getResourceAsStream("kafka-0.7.2.tgz");
+ ArchiveInputStream archiveInput = new ArchiveStreamFactory()
+ .createArchiveInputStream(ArchiveStreamFactory.TAR,
+ new CompressorStreamFactory()
+ .createCompressorInputStream(CompressorStreamFactory.GZIP, kakfaResource));
+
+ try {
+ ArchiveEntry entry = archiveInput.getNextEntry();
+ while (entry != null) {
+ File file = new File(kafkaExtract, entry.getName());
+ if (entry.isDirectory()) {
+ file.mkdirs();
+ } else {
+ ByteStreams.copy(archiveInput, Files.newOutputStreamSupplier(file));
+ }
+ entry = archiveInput.getNextEntry();
+ }
+ } finally {
+ archiveInput.close();
+ }
+ return kafkaExtract;
+ }
+
+ private static Properties generateKafkaConfig(String zkConnectStr) throws IOException {
+ int port = Networks.getRandomPort();
+ Preconditions.checkState(port > 0, "Failed to get random port.");
+
+ Properties prop = new Properties();
+ prop.setProperty("log.dir", TMP_FOLDER.newFolder().getAbsolutePath());
+ prop.setProperty("zk.connect", zkConnectStr);
+ prop.setProperty("num.threads", "8");
+ prop.setProperty("port", Integer.toString(port));
+ prop.setProperty("log.flush.interval", "1000");
+ prop.setProperty("max.socket.request.bytes", "104857600");
+ prop.setProperty("log.cleanup.interval.mins", "1");
+ prop.setProperty("log.default.flush.scheduler.interval.ms", "1000");
+ prop.setProperty("zk.connectiontimeout.ms", "1000000");
+ prop.setProperty("socket.receive.buffer", "1048576");
+ prop.setProperty("enable.zookeeper", "true");
+ prop.setProperty("log.retention.hours", "24");
+ prop.setProperty("brokerid", "0");
+ prop.setProperty("socket.send.buffer", "1048576");
+ prop.setProperty("num.partitions", "1");
+ // Use a really small file size to force some flush to happen
+ prop.setProperty("log.file.size", "1024");
+ prop.setProperty("log.default.flush.interval.ms", "1000");
+ return prop;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-core/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/twill-core/src/test/resources/logback-test.xml b/twill-core/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..3c36660
--- /dev/null
+++ b/twill-core/src/test/resources/logback-test.xml
@@ -0,0 +1,18 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!-- Default logback configuration for twill library -->
+<configuration>
+ <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <pattern>%d{ISO8601} - %-5p [%t:%C{1}@%L] - %m%n</pattern>
+ </encoder>
+ </appender>
+
+ <logger name="org.apache.hadoop" level="WARN" />
+ <logger name="org.apache.zookeeper" level="WARN" />
+
+ <root level="INFO">
+ <appender-ref ref="STDOUT"/>
+ </root>
+
+</configuration>
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-discovery-api/pom.xml
----------------------------------------------------------------------
diff --git a/twill-discovery-api/pom.xml b/twill-discovery-api/pom.xml
new file mode 100644
index 0000000..e41b214
--- /dev/null
+++ b/twill-discovery-api/pom.xml
@@ -0,0 +1,39 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>twill-parent</artifactId>
+ <groupId>org.apache.twill</groupId>
+ <version>0.1.0-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>twill-discovery-api</artifactId>
+ <name>Twill discovery service API</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>twill-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-discovery-api/src/main/java/org/apache/twill/discovery/Discoverable.java
----------------------------------------------------------------------
diff --git a/twill-discovery-api/src/main/java/org/apache/twill/discovery/Discoverable.java b/twill-discovery-api/src/main/java/org/apache/twill/discovery/Discoverable.java
new file mode 100644
index 0000000..a5529fe
--- /dev/null
+++ b/twill-discovery-api/src/main/java/org/apache/twill/discovery/Discoverable.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.twill.discovery;
+
+import java.net.InetSocketAddress;
+
+/**
+ * Discoverable defines the attributes of service to be discovered.
+ */
+public interface Discoverable {
+
+ /**
+ * @return Name of the service
+ */
+ String getName();
+
+ /**
+ * @return An {@link InetSocketAddress} representing the host+port of the service.
+ */
+ InetSocketAddress getSocketAddress();
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-discovery-api/src/main/java/org/apache/twill/discovery/DiscoveryService.java
----------------------------------------------------------------------
diff --git a/twill-discovery-api/src/main/java/org/apache/twill/discovery/DiscoveryService.java b/twill-discovery-api/src/main/java/org/apache/twill/discovery/DiscoveryService.java
new file mode 100644
index 0000000..a26fff8
--- /dev/null
+++ b/twill-discovery-api/src/main/java/org/apache/twill/discovery/DiscoveryService.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.discovery;
+
+
+import org.apache.twill.common.Cancellable;
+
+/**
+ * DiscoveryService defines interface for registering {@link Discoverable}.
+ */
+public interface DiscoveryService {
+
+ /**
+ * Registers a {@link Discoverable} service.
+ * @param discoverable Information of the service provider that could be discovered.
+ * @return A {@link Cancellable} for un-registration.
+ */
+ Cancellable register(Discoverable discoverable);
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-discovery-api/src/main/java/org/apache/twill/discovery/DiscoveryServiceClient.java
----------------------------------------------------------------------
diff --git a/twill-discovery-api/src/main/java/org/apache/twill/discovery/DiscoveryServiceClient.java b/twill-discovery-api/src/main/java/org/apache/twill/discovery/DiscoveryServiceClient.java
new file mode 100644
index 0000000..89cf269
--- /dev/null
+++ b/twill-discovery-api/src/main/java/org/apache/twill/discovery/DiscoveryServiceClient.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.discovery;
+
+/**
+ * Interface for {@link DiscoveryServiceClient} to discover services registered with {@link DiscoveryService}.
+ */
+public interface DiscoveryServiceClient {
+
+ /**
+ * Retrieves a list of {@link Discoverable} for the a service with the given name.
+ *
+ * @param name Name of the service
+ * @return A live {@link Iterable} that on each call to {@link Iterable#iterator()} returns
+ * an {@link java.util.Iterator Iterator} that reflects the latest set of
+ * available {@link Discoverable} services.
+ */
+ Iterable<Discoverable> discover(String name);
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-discovery-core/pom.xml
----------------------------------------------------------------------
diff --git a/twill-discovery-core/pom.xml b/twill-discovery-core/pom.xml
new file mode 100644
index 0000000..2612138
--- /dev/null
+++ b/twill-discovery-core/pom.xml
@@ -0,0 +1,52 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>twill-parent</artifactId>
+ <groupId>org.apache.twill</groupId>
+ <version>0.1.0-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>twill-discovery-core</artifactId>
+ <name>Twill discovery service implementations</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>twill-discovery-api</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>twill-zookeeper</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.google.code.gson</groupId>
+ <artifactId>gson</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ </dependency>
+ </dependencies>
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-discovery-core/src/main/java/org/apache/twill/discovery/DiscoverableWrapper.java
----------------------------------------------------------------------
diff --git a/twill-discovery-core/src/main/java/org/apache/twill/discovery/DiscoverableWrapper.java b/twill-discovery-core/src/main/java/org/apache/twill/discovery/DiscoverableWrapper.java
new file mode 100644
index 0000000..5fa97d1
--- /dev/null
+++ b/twill-discovery-core/src/main/java/org/apache/twill/discovery/DiscoverableWrapper.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.discovery;
+
+import java.net.InetSocketAddress;
+
+/**
+ * Wrapper for a discoverable.
+ */
+final class DiscoverableWrapper implements Discoverable {
+ private final String name;
+ private final InetSocketAddress address;
+
+ DiscoverableWrapper(Discoverable discoverable) {
+ this.name = discoverable.getName();
+ this.address = discoverable.getSocketAddress();
+ }
+
+ @Override
+ public String getName() {
+ return name;
+ }
+
+ @Override
+ public InetSocketAddress getSocketAddress() {
+ return address;
+ }
+
+ @Override
+ public String toString() {
+ return "{name=" + name + ", address=" + address;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ Discoverable other = (Discoverable) o;
+
+ return name.equals(other.getName()) && address.equals(other.getSocketAddress());
+ }
+
+ @Override
+ public int hashCode() {
+ int result = name.hashCode();
+ result = 31 * result + address.hashCode();
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-discovery-core/src/main/java/org/apache/twill/discovery/InMemoryDiscoveryService.java
----------------------------------------------------------------------
diff --git a/twill-discovery-core/src/main/java/org/apache/twill/discovery/InMemoryDiscoveryService.java b/twill-discovery-core/src/main/java/org/apache/twill/discovery/InMemoryDiscoveryService.java
new file mode 100644
index 0000000..7a9e984
--- /dev/null
+++ b/twill-discovery-core/src/main/java/org/apache/twill/discovery/InMemoryDiscoveryService.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.discovery;
+
+import org.apache.twill.common.Cancellable;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Multimap;
+
+import java.util.Iterator;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * A simple in memory implementation of {@link DiscoveryService} and {@link DiscoveryServiceClient}.
+ */
+public class InMemoryDiscoveryService implements DiscoveryService, DiscoveryServiceClient {
+
+ private final Multimap<String, Discoverable> services = HashMultimap.create();
+ private final Lock lock = new ReentrantLock();
+
+ @Override
+ public Cancellable register(final Discoverable discoverable) {
+ lock.lock();
+ try {
+ final Discoverable wrapper = new DiscoverableWrapper(discoverable);
+ services.put(wrapper.getName(), wrapper);
+ return new Cancellable() {
+ @Override
+ public void cancel() {
+ lock.lock();
+ try {
+ services.remove(wrapper.getName(), wrapper);
+ } finally {
+ lock.unlock();
+ }
+ }
+ };
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public Iterable<Discoverable> discover(final String name) {
+ return new Iterable<Discoverable>() {
+ @Override
+ public Iterator<Discoverable> iterator() {
+ lock.lock();
+ try {
+ return ImmutableList.copyOf(services.get(name)).iterator();
+ } finally {
+ lock.unlock();
+ }
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-discovery-core/src/main/java/org/apache/twill/discovery/ZKDiscoveryService.java
----------------------------------------------------------------------
diff --git a/twill-discovery-core/src/main/java/org/apache/twill/discovery/ZKDiscoveryService.java b/twill-discovery-core/src/main/java/org/apache/twill/discovery/ZKDiscoveryService.java
new file mode 100644
index 0000000..e2f9bc0
--- /dev/null
+++ b/twill-discovery-core/src/main/java/org/apache/twill/discovery/ZKDiscoveryService.java
@@ -0,0 +1,511 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.discovery;
+
+import org.apache.twill.common.Cancellable;
+import org.apache.twill.common.Threads;
+import org.apache.twill.zookeeper.NodeChildren;
+import org.apache.twill.zookeeper.NodeData;
+import org.apache.twill.zookeeper.OperationFuture;
+import org.apache.twill.zookeeper.ZKClient;
+import org.apache.twill.zookeeper.ZKClients;
+import org.apache.twill.zookeeper.ZKOperations;
+import com.google.common.base.Charsets;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Multimap;
+import com.google.common.hash.Hashing;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonDeserializationContext;
+import com.google.gson.JsonDeserializer;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParseException;
+import com.google.gson.JsonSerializationContext;
+import com.google.gson.JsonSerializer;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Type;
+import java.net.InetSocketAddress;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * Zookeeper implementation of {@link DiscoveryService} and {@link DiscoveryServiceClient}.
+ * <p>
+ * Discoverable services are registered within Zookeeper under the namespace 'discoverable' by default.
+ * If you would like to change the namespace under which the services are registered then you can pass
+ * in the namespace during construction of {@link ZKDiscoveryService}.
+ * </p>
+ *
+ * <p>
+ * Following is a simple example of how {@link ZKDiscoveryService} can be used for registering services
+ * and also for discovering the registered services.
+ * <blockquote>
+ * <pre>
+ * {@code
+ *
+ * DiscoveryService service = new ZKDiscoveryService(zkClient);
+ * service.register(new Discoverable() {
+ * @Override
+ * public String getName() {
+ * return 'service-name';
+ * }
+ *
+ * @Override
+ * public InetSocketAddress getSocketAddress() {
+ * return new InetSocketAddress(hostname, port);
+ * }
+ * });
+ * ...
+ * ...
+ * Iterable<Discoverable> services = service.discovery("service-name");
+ * ...
+ * }
+ * </pre>
+ * </blockquote>
+ * </p>
+ */
+public class ZKDiscoveryService implements DiscoveryService, DiscoveryServiceClient {
+ private static final Logger LOG = LoggerFactory.getLogger(ZKDiscoveryService.class);
+ private static final String NAMESPACE = "/discoverable";
+
+ private static final long RETRY_MILLIS = 1000;
+
+ // In memory map for recreating ephemeral nodes after session expires.
+ // It map from discoverable to the corresponding Cancellable
+ private final Multimap<Discoverable, DiscoveryCancellable> discoverables;
+ private final Lock lock;
+
+ private final LoadingCache<String, Iterable<Discoverable>> services;
+ private final ZKClient zkClient;
+ private final ScheduledExecutorService retryExecutor;
+
+ /**
+ * Constructs ZKDiscoveryService using the provided zookeeper client for storing service registry.
+ * @param zkClient The {@link ZKClient} for interacting with zookeeper.
+ */
+ public ZKDiscoveryService(ZKClient zkClient) {
+ this(zkClient, NAMESPACE);
+ }
+
+ /**
+ * Constructs ZKDiscoveryService using the provided zookeeper client for storing service registry under namepsace.
+ * @param zkClient of zookeeper quorum
+ * @param namespace under which the service registered would be stored in zookeeper.
+ * If namespace is {@code null}, no namespace will be used.
+ */
+ public ZKDiscoveryService(ZKClient zkClient, String namespace) {
+ this.discoverables = HashMultimap.create();
+ this.lock = new ReentrantLock();
+ this.retryExecutor = Executors.newSingleThreadScheduledExecutor(
+ Threads.createDaemonThreadFactory("zk-discovery-retry"));
+ this.zkClient = namespace == null ? zkClient : ZKClients.namespace(zkClient, namespace);
+ this.services = CacheBuilder.newBuilder().build(createServiceLoader());
+ this.zkClient.addConnectionWatcher(createConnectionWatcher());
+ }
+
+ /**
+ * Registers a {@link Discoverable} in zookeeper.
+ * <p>
+ * Registering a {@link Discoverable} will create a node <base>/<service-name>
+ * in zookeeper as a ephemeral node. If the node already exists (timeout associated with emphemeral, then a runtime
+ * exception is thrown to make sure that a service with an intent to register is not started without registering.
+ * When a runtime is thrown, expectation is that the process being started with fail and would be started again
+ * by the monitoring service.
+ * </p>
+ * @param discoverable Information of the service provider that could be discovered.
+ * @return An instance of {@link Cancellable}
+ */
+ @Override
+ public Cancellable register(final Discoverable discoverable) {
+ final Discoverable wrapper = new DiscoverableWrapper(discoverable);
+ final SettableFuture<String> future = SettableFuture.create();
+ final DiscoveryCancellable cancellable = new DiscoveryCancellable(wrapper);
+
+ // Create the zk ephemeral node.
+ Futures.addCallback(doRegister(wrapper), new FutureCallback<String>() {
+ @Override
+ public void onSuccess(String result) {
+ // Set the sequence node path to cancellable for future cancellation.
+ cancellable.setPath(result);
+ lock.lock();
+ try {
+ discoverables.put(wrapper, cancellable);
+ } finally {
+ lock.unlock();
+ }
+ LOG.debug("Service registered: {} {}", wrapper, result);
+ future.set(result);
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ if (t instanceof KeeperException.NodeExistsException) {
+ handleRegisterFailure(discoverable, future, this, t);
+ } else {
+ LOG.warn("Failed to register: {}", wrapper, t);
+ future.setException(t);
+ }
+ }
+ }, Threads.SAME_THREAD_EXECUTOR);
+
+ Futures.getUnchecked(future);
+ return cancellable;
+ }
+
+ @Override
+ public Iterable<Discoverable> discover(String service) {
+ return services.getUnchecked(service);
+ }
+
+ /**
+ * Handle registration failure.
+ *
+ * @param discoverable The discoverable to register.
+ * @param completion A settable future to set when registration is completed / failed.
+ * @param creationCallback A future callback for path creation.
+ * @param failureCause The original cause of failure.
+ */
+ private void handleRegisterFailure(final Discoverable discoverable,
+ final SettableFuture<String> completion,
+ final FutureCallback<String> creationCallback,
+ final Throwable failureCause) {
+
+ final String path = getNodePath(discoverable);
+ Futures.addCallback(zkClient.exists(path), new FutureCallback<Stat>() {
+ @Override
+ public void onSuccess(Stat result) {
+ if (result == null) {
+ // If the node is gone, simply retry.
+ LOG.info("Node {} is gone. Retry registration for {}.", path, discoverable);
+ retryRegister(discoverable, creationCallback);
+ return;
+ }
+
+ long ephemeralOwner = result.getEphemeralOwner();
+ if (ephemeralOwner == 0) {
+ // it is not an ephemeral node, something wrong.
+ LOG.error("Node {} already exists and is not an ephemeral node. Discoverable registration failed: {}.",
+ path, discoverable);
+ completion.setException(failureCause);
+ return;
+ }
+ Long sessionId = zkClient.getSessionId();
+ if (sessionId == null || ephemeralOwner != sessionId) {
+ // This zkClient is not valid or doesn't own the ephemeral node, simply keep retrying.
+ LOG.info("Owner of {} is different. Retry registration for {}.", path, discoverable);
+ retryRegister(discoverable, creationCallback);
+ } else {
+ // This client owned the node, treat the registration as completed.
+ // This could happen if same client tries to register twice (due to mistake or failure race condition).
+ completion.set(path);
+ }
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ // If exists call failed, simply retry creation.
+ LOG.warn("Error when getting stats on {}. Retry registration for {}.", path, discoverable);
+ retryRegister(discoverable, creationCallback);
+ }
+ }, Threads.SAME_THREAD_EXECUTOR);
+ }
+
+ private OperationFuture<String> doRegister(Discoverable discoverable) {
+ byte[] discoverableBytes = encode(discoverable);
+ return zkClient.create(getNodePath(discoverable), discoverableBytes, CreateMode.EPHEMERAL, true);
+ }
+
+ private void retryRegister(final Discoverable discoverable, final FutureCallback<String> creationCallback) {
+ retryExecutor.schedule(new Runnable() {
+
+ @Override
+ public void run() {
+ Futures.addCallback(doRegister(discoverable), creationCallback, Threads.SAME_THREAD_EXECUTOR);
+ }
+ }, RETRY_MILLIS, TimeUnit.MILLISECONDS);
+ }
+
+
+ /**
+ * Generate unique node path for a given {@link Discoverable}.
+ * @param discoverable An instance of {@link Discoverable}.
+ * @return A node name based on the discoverable.
+ */
+ private String getNodePath(Discoverable discoverable) {
+ InetSocketAddress socketAddress = discoverable.getSocketAddress();
+ String node = Hashing.md5()
+ .newHasher()
+ .putBytes(socketAddress.getAddress().getAddress())
+ .putInt(socketAddress.getPort())
+ .hash().toString();
+
+ return String.format("/%s/%s", discoverable.getName(), node);
+ }
+
+ private Watcher createConnectionWatcher() {
+ return new Watcher() {
+ // Watcher is invoked from single event thread, hence safe to use normal mutable variable.
+ private boolean expired;
+
+ @Override
+ public void process(WatchedEvent event) {
+ if (event.getState() == Event.KeeperState.Expired) {
+ LOG.warn("ZK Session expired: {}", zkClient.getConnectString());
+ expired = true;
+ } else if (event.getState() == Event.KeeperState.SyncConnected && expired) {
+ LOG.info("Reconnected after expiration: {}", zkClient.getConnectString());
+ expired = false;
+
+ // Re-register all services
+ lock.lock();
+ try {
+ for (final Map.Entry<Discoverable, DiscoveryCancellable> entry : discoverables.entries()) {
+ LOG.info("Re-registering service: {}", entry.getKey());
+
+ // Must be non-blocking in here.
+ Futures.addCallback(doRegister(entry.getKey()), new FutureCallback<String>() {
+ @Override
+ public void onSuccess(String result) {
+ // Updates the cancellable to the newly created sequential node.
+ entry.getValue().setPath(result);
+ LOG.debug("Service re-registered: {} {}", entry.getKey(), result);
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ // When failed to create the node, there would be no retry and simply make the cancellable do nothing.
+ entry.getValue().setPath(null);
+ LOG.error("Failed to re-register service: {}", entry.getKey(), t);
+ }
+ }, Threads.SAME_THREAD_EXECUTOR);
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+ }
+ };
+ }
+
+ /**
+ * Creates a CacheLoader for creating live Iterable for watching instances changes for a given service.
+ */
+ private CacheLoader<String, Iterable<Discoverable>> createServiceLoader() {
+ return new CacheLoader<String, Iterable<Discoverable>>() {
+ @Override
+ public Iterable<Discoverable> load(String service) throws Exception {
+ // The atomic reference is to keep the resulting Iterable live. It always contains a
+ // immutable snapshot of the latest detected set of Discoverable.
+ final AtomicReference<Iterable<Discoverable>> iterable =
+ new AtomicReference<Iterable<Discoverable>>(ImmutableList.<Discoverable>of());
+ final String serviceBase = "/" + service;
+
+ // Watch for children changes in /service
+ ZKOperations.watchChildren(zkClient, serviceBase, new ZKOperations.ChildrenCallback() {
+ @Override
+ public void updated(NodeChildren nodeChildren) {
+ // Fetch data of all children nodes in parallel.
+ List<String> children = nodeChildren.getChildren();
+ List<OperationFuture<NodeData>> dataFutures = Lists.newArrayListWithCapacity(children.size());
+ for (String child : children) {
+ dataFutures.add(zkClient.getData(serviceBase + "/" + child));
+ }
+
+ // Update the service map when all fetching are done.
+ final ListenableFuture<List<NodeData>> fetchFuture = Futures.successfulAsList(dataFutures);
+ fetchFuture.addListener(new Runnable() {
+ @Override
+ public void run() {
+ ImmutableList.Builder<Discoverable> builder = ImmutableList.builder();
+ for (NodeData nodeData : Futures.getUnchecked(fetchFuture)) {
+ // For successful fetch, decode the content.
+ if (nodeData != null) {
+ Discoverable discoverable = decode(nodeData.getData());
+ if (discoverable != null) {
+ builder.add(discoverable);
+ }
+ }
+ }
+ iterable.set(builder.build());
+ }
+ }, Threads.SAME_THREAD_EXECUTOR);
+ }
+ });
+
+ return new Iterable<Discoverable>() {
+ @Override
+ public Iterator<Discoverable> iterator() {
+ return iterable.get().iterator();
+ }
+ };
+ }
+ };
+ }
+
+ /**
+ * Static helper function for decoding array of bytes into a {@link DiscoverableWrapper} object.
+ * @param bytes representing serialized {@link DiscoverableWrapper}
+ * @return null if bytes are null; else an instance of {@link DiscoverableWrapper}
+ */
+ private static Discoverable decode(byte[] bytes) {
+ if (bytes == null) {
+ return null;
+ }
+ String content = new String(bytes, Charsets.UTF_8);
+ return new GsonBuilder().registerTypeAdapter(Discoverable.class, new DiscoverableCodec())
+ .create()
+ .fromJson(content, Discoverable.class);
+ }
+
+ /**
+ * Static helper function for encoding an instance of {@link DiscoverableWrapper} into array of bytes.
+ * @param discoverable An instance of {@link Discoverable}
+ * @return array of bytes representing an instance of <code>discoverable</code>
+ */
+ private static byte[] encode(Discoverable discoverable) {
+ return new GsonBuilder().registerTypeAdapter(DiscoverableWrapper.class, new DiscoverableCodec())
+ .create()
+ .toJson(discoverable, DiscoverableWrapper.class)
+ .getBytes(Charsets.UTF_8);
+ }
+
+ /**
+ * Inner class for cancelling (un-register) discovery service.
+ */
+ private final class DiscoveryCancellable implements Cancellable {
+
+ private final Discoverable discoverable;
+ private final AtomicBoolean cancelled;
+ private volatile String path;
+
+ DiscoveryCancellable(Discoverable discoverable) {
+ this.discoverable = discoverable;
+ this.cancelled = new AtomicBoolean();
+ }
+
+ /**
+ * Set the zk node path representing the ephemeral sequence node of this registered discoverable.
+ * Called from ZK event thread when creating of the node completed, either from normal registration or
+ * re-registration due to session expiration.
+ *
+ * @param path The path to ephemeral sequence node.
+ */
+ void setPath(String path) {
+ this.path = path;
+ if (cancelled.get() && path != null) {
+ // Simply delete the path if it's already cancelled
+ // It's for the case when session expire happened and re-registration completed after this has been cancelled.
+ // Not bother with the result as if there is error, nothing much we could do.
+ zkClient.delete(path);
+ }
+ }
+
+ @Override
+ public void cancel() {
+ if (!cancelled.compareAndSet(false, true)) {
+ return;
+ }
+
+ // Take a snapshot of the volatile path.
+ String path = this.path;
+
+ // If it is null, meaning cancel() is called before the ephemeral node is created, hence
+ // setPath() will be called in future (through zk callback when creation is completed)
+ // so that deletion will be done in setPath().
+ if (path == null) {
+ return;
+ }
+
+ // Remove this Cancellable from the map so that upon session expiration won't try to register.
+ lock.lock();
+ try {
+ discoverables.remove(discoverable, this);
+ } finally {
+ lock.unlock();
+ }
+
+ // Delete the path. It's ok if the path not exists
+ // (e.g. what session expired and before node has been re-created)
+ Futures.getUnchecked(ZKOperations.ignoreError(zkClient.delete(path),
+ KeeperException.NoNodeException.class, path));
+ LOG.debug("Service unregistered: {} {}", discoverable, path);
+ }
+ }
+
+ /**
+ * SerDe for converting a {@link DiscoverableWrapper} into a JSON object
+ * or from a JSON object into {@link DiscoverableWrapper}.
+ */
+ private static final class DiscoverableCodec implements JsonSerializer<Discoverable>, JsonDeserializer<Discoverable> {
+
+ @Override
+ public Discoverable deserialize(JsonElement json, Type typeOfT,
+ JsonDeserializationContext context) throws JsonParseException {
+ JsonObject jsonObj = json.getAsJsonObject();
+ final String service = jsonObj.get("service").getAsString();
+ String hostname = jsonObj.get("hostname").getAsString();
+ int port = jsonObj.get("port").getAsInt();
+ final InetSocketAddress address = new InetSocketAddress(hostname, port);
+ return new Discoverable() {
+ @Override
+ public String getName() {
+ return service;
+ }
+
+ @Override
+ public InetSocketAddress getSocketAddress() {
+ return address;
+ }
+ };
+ }
+
+ @Override
+ public JsonElement serialize(Discoverable src, Type typeOfSrc, JsonSerializationContext context) {
+ JsonObject jsonObj = new JsonObject();
+ jsonObj.addProperty("service", src.getName());
+ jsonObj.addProperty("hostname", src.getSocketAddress().getHostName());
+ jsonObj.addProperty("port", src.getSocketAddress().getPort());
+ return jsonObj;
+ }
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-discovery-core/src/main/java/org/apache/twill/discovery/package-info.java
----------------------------------------------------------------------
diff --git a/twill-discovery-core/src/main/java/org/apache/twill/discovery/package-info.java b/twill-discovery-core/src/main/java/org/apache/twill/discovery/package-info.java
new file mode 100644
index 0000000..a1d6e0c
--- /dev/null
+++ b/twill-discovery-core/src/main/java/org/apache/twill/discovery/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Classes in this package provides service discovery implementations.
+ */
+package org.apache.twill.discovery;
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-discovery-core/src/test/java/org/apache/twill/discovery/InMemoryDiscoveryServiceTest.java
----------------------------------------------------------------------
diff --git a/twill-discovery-core/src/test/java/org/apache/twill/discovery/InMemoryDiscoveryServiceTest.java b/twill-discovery-core/src/test/java/org/apache/twill/discovery/InMemoryDiscoveryServiceTest.java
new file mode 100644
index 0000000..d8cc375
--- /dev/null
+++ b/twill-discovery-core/src/test/java/org/apache/twill/discovery/InMemoryDiscoveryServiceTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.twill.discovery;
+
+import org.apache.twill.common.Cancellable;
+import com.google.common.collect.Iterables;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Test memory based service discovery service.
+ */
+public class InMemoryDiscoveryServiceTest {
+ private Cancellable register(DiscoveryService service, final String name, final String host, final int port) {
+ return service.register(new Discoverable() {
+ @Override
+ public String getName() {
+ return name;
+ }
+
+ @Override
+ public InetSocketAddress getSocketAddress() {
+ return new InetSocketAddress(host, port);
+ }
+ });
+ }
+
+ @Test
+ public void simpleDiscoverable() throws Exception {
+ DiscoveryService discoveryService = new InMemoryDiscoveryService();
+ DiscoveryServiceClient discoveryServiceClient = (DiscoveryServiceClient) discoveryService;
+
+ // Register one service running on one host:port
+ Cancellable cancellable = register(discoveryService, "foo", "localhost", 8090);
+ Iterable<Discoverable> discoverables = discoveryServiceClient.discover("foo");
+
+ // Discover that registered host:port.
+ Assert.assertTrue(Iterables.size(discoverables) == 1);
+
+ // Remove the service
+ cancellable.cancel();
+
+ // There should be no service.
+ discoverables = discoveryServiceClient.discover("foo");
+ TimeUnit.MILLISECONDS.sleep(100);
+ Assert.assertTrue(Iterables.size(discoverables) == 0);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-discovery-core/src/test/java/org/apache/twill/discovery/ZKDiscoveryServiceTest.java
----------------------------------------------------------------------
diff --git a/twill-discovery-core/src/test/java/org/apache/twill/discovery/ZKDiscoveryServiceTest.java b/twill-discovery-core/src/test/java/org/apache/twill/discovery/ZKDiscoveryServiceTest.java
new file mode 100644
index 0000000..feee8db
--- /dev/null
+++ b/twill-discovery-core/src/test/java/org/apache/twill/discovery/ZKDiscoveryServiceTest.java
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.discovery;
+
+import org.apache.twill.common.Cancellable;
+import org.apache.twill.common.Services;
+import org.apache.twill.internal.zookeeper.InMemoryZKServer;
+import org.apache.twill.internal.zookeeper.KillZKSession;
+import org.apache.twill.zookeeper.RetryStrategies;
+import org.apache.twill.zookeeper.ZKClientService;
+import org.apache.twill.zookeeper.ZKClientServices;
+import org.apache.twill.zookeeper.ZKClients;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.Futures;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Test Zookeeper based discovery service.
+ */
+public class ZKDiscoveryServiceTest {
+ private static final Logger LOG = LoggerFactory.getLogger(ZKDiscoveryServiceTest.class);
+
+ private static InMemoryZKServer zkServer;
+ private static ZKClientService zkClient;
+
+ @BeforeClass
+ public static void beforeClass() {
+ zkServer = InMemoryZKServer.builder().setTickTime(100000).build();
+ zkServer.startAndWait();
+
+ zkClient = ZKClientServices.delegate(
+ ZKClients.retryOnFailure(
+ ZKClients.reWatchOnExpire(
+ ZKClientService.Builder.of(zkServer.getConnectionStr()).build()),
+ RetryStrategies.fixDelay(1, TimeUnit.SECONDS)));
+ zkClient.startAndWait();
+ }
+
+ @AfterClass
+ public static void afterClass() {
+ Futures.getUnchecked(Services.chainStop(zkClient, zkServer));
+ }
+
+ private Cancellable register(DiscoveryService service, final String name, final String host, final int port) {
+ return service.register(new Discoverable() {
+ @Override
+ public String getName() {
+ return name;
+ }
+
+ @Override
+ public InetSocketAddress getSocketAddress() {
+ return new InetSocketAddress(host, port);
+ }
+ });
+ }
+
+
+ private boolean waitTillExpected(int expected, Iterable<Discoverable> discoverables) throws Exception {
+ for (int i = 0; i < 10; ++i) {
+ TimeUnit.MILLISECONDS.sleep(10);
+ if (Iterables.size(discoverables) == expected) {
+ return true;
+ }
+ }
+ return (Iterables.size(discoverables) == expected);
+ }
+
+ @Test (timeout = 5000)
+ public void testDoubleRegister() throws Exception {
+ ZKDiscoveryService discoveryService = new ZKDiscoveryService(zkClient);
+ DiscoveryServiceClient discoveryServiceClient = discoveryService;
+
+ // Register on the same host port, it shouldn't fail.
+ Cancellable cancellable = register(discoveryService, "test_double_reg", "localhost", 54321);
+ Cancellable cancellable2 = register(discoveryService, "test_double_reg", "localhost", 54321);
+
+ Iterable<Discoverable> discoverables = discoveryServiceClient.discover("test_double_reg");
+
+ Assert.assertTrue(waitTillExpected(1, discoverables));
+
+ cancellable.cancel();
+ cancellable2.cancel();
+
+ // Register again with two different clients, but killing session of the first one.
+ final ZKClientService zkClient2 = ZKClientServices.delegate(
+ ZKClients.retryOnFailure(
+ ZKClients.reWatchOnExpire(
+ ZKClientService.Builder.of(zkServer.getConnectionStr()).build()),
+ RetryStrategies.fixDelay(1, TimeUnit.SECONDS)));
+ zkClient2.startAndWait();
+
+ try {
+ ZKDiscoveryService discoveryService2 = new ZKDiscoveryService(zkClient2);
+ cancellable2 = register(discoveryService2, "test_multi_client", "localhost", 54321);
+
+ // Schedule a thread to shutdown zkClient2.
+ new Thread() {
+ @Override
+ public void run() {
+ try {
+ TimeUnit.SECONDS.sleep(2);
+ zkClient2.stopAndWait();
+ } catch (InterruptedException e) {
+ LOG.error(e.getMessage(), e);
+ }
+ }
+ }.start();
+
+ // This call would block until zkClient2 is shutdown.
+ cancellable = register(discoveryService, "test_multi_client", "localhost", 54321);
+ cancellable.cancel();
+
+ } finally {
+ zkClient2.stopAndWait();
+ }
+ }
+
+ @Test
+ public void testSessionExpires() throws Exception {
+ ZKDiscoveryService discoveryService = new ZKDiscoveryService(zkClient);
+ DiscoveryServiceClient discoveryServiceClient = discoveryService;
+
+ Cancellable cancellable = register(discoveryService, "test_expires", "localhost", 54321);
+
+ Iterable<Discoverable> discoverables = discoveryServiceClient.discover("test_expires");
+
+ // Discover that registered host:port.
+ Assert.assertTrue(waitTillExpected(1, discoverables));
+
+ KillZKSession.kill(zkClient.getZooKeeperSupplier().get(), zkServer.getConnectionStr(), 5000);
+
+ // Register one more endpoint to make sure state has been reflected after reconnection
+ Cancellable cancellable2 = register(discoveryService, "test_expires", "localhost", 54322);
+
+ // Reconnection would trigger re-registration.
+ Assert.assertTrue(waitTillExpected(2, discoverables));
+
+ cancellable.cancel();
+ cancellable2.cancel();
+
+ // Verify that both are now gone.
+ Assert.assertTrue(waitTillExpected(0, discoverables));
+ }
+
+ @Test
+ public void simpleDiscoverable() throws Exception {
+ DiscoveryService discoveryService = new ZKDiscoveryService(zkClient);
+ DiscoveryServiceClient discoveryServiceClient = new ZKDiscoveryService(zkClient);
+
+ // Register one service running on one host:port
+ Cancellable cancellable = register(discoveryService, "foo", "localhost", 8090);
+ Iterable<Discoverable> discoverables = discoveryServiceClient.discover("foo");
+
+ // Discover that registered host:port.
+ Assert.assertTrue(waitTillExpected(1, discoverables));
+
+ // Remove the service
+ cancellable.cancel();
+
+ // There should be no service.
+
+ discoverables = discoveryServiceClient.discover("foo");
+
+ Assert.assertTrue(waitTillExpected(0, discoverables));
+ }
+
+ @Test
+ public void manySameDiscoverable() throws Exception {
+ List<Cancellable> cancellables = Lists.newArrayList();
+ DiscoveryService discoveryService = new ZKDiscoveryService(zkClient);
+ DiscoveryServiceClient discoveryServiceClient = new ZKDiscoveryService(zkClient);
+
+ cancellables.add(register(discoveryService, "manyDiscoverable", "localhost", 1));
+ cancellables.add(register(discoveryService, "manyDiscoverable", "localhost", 2));
+ cancellables.add(register(discoveryService, "manyDiscoverable", "localhost", 3));
+ cancellables.add(register(discoveryService, "manyDiscoverable", "localhost", 4));
+ cancellables.add(register(discoveryService, "manyDiscoverable", "localhost", 5));
+
+ Iterable<Discoverable> discoverables = discoveryServiceClient.discover("manyDiscoverable");
+ Assert.assertTrue(waitTillExpected(5, discoverables));
+
+ for (int i = 0; i < 5; i++) {
+ cancellables.get(i).cancel();
+ Assert.assertTrue(waitTillExpected(4 - i, discoverables));
+ }
+ }
+
+ @Test
+ public void multiServiceDiscoverable() throws Exception {
+ List<Cancellable> cancellables = Lists.newArrayList();
+ DiscoveryService discoveryService = new ZKDiscoveryService(zkClient);
+ DiscoveryServiceClient discoveryServiceClient = new ZKDiscoveryService(zkClient);
+
+ cancellables.add(register(discoveryService, "service1", "localhost", 1));
+ cancellables.add(register(discoveryService, "service1", "localhost", 2));
+ cancellables.add(register(discoveryService, "service1", "localhost", 3));
+ cancellables.add(register(discoveryService, "service1", "localhost", 4));
+ cancellables.add(register(discoveryService, "service1", "localhost", 5));
+
+ cancellables.add(register(discoveryService, "service2", "localhost", 1));
+ cancellables.add(register(discoveryService, "service2", "localhost", 2));
+ cancellables.add(register(discoveryService, "service2", "localhost", 3));
+
+ cancellables.add(register(discoveryService, "service3", "localhost", 1));
+ cancellables.add(register(discoveryService, "service3", "localhost", 2));
+
+ Iterable<Discoverable> discoverables = discoveryServiceClient.discover("service1");
+ Assert.assertTrue(waitTillExpected(5, discoverables));
+
+ discoverables = discoveryServiceClient.discover("service2");
+ Assert.assertTrue(waitTillExpected(3, discoverables));
+
+ discoverables = discoveryServiceClient.discover("service3");
+ Assert.assertTrue(waitTillExpected(2, discoverables));
+
+ cancellables.add(register(discoveryService, "service3", "localhost", 3));
+ Assert.assertTrue(waitTillExpected(3, discoverables)); // Shows live iterator.
+
+ for (Cancellable cancellable : cancellables) {
+ cancellable.cancel();
+ }
+
+ Assert.assertTrue(waitTillExpected(0, discoveryServiceClient.discover("service1")));
+ Assert.assertTrue(waitTillExpected(0, discoveryServiceClient.discover("service2")));
+ Assert.assertTrue(waitTillExpected(0, discoveryServiceClient.discover("service3")));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-discovery-core/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/twill-discovery-core/src/test/resources/logback-test.xml b/twill-discovery-core/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..2615cb4
--- /dev/null
+++ b/twill-discovery-core/src/test/resources/logback-test.xml
@@ -0,0 +1,17 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!-- Default logback configuration for twill library -->
+<configuration>
+ <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <pattern>%d{ISO8601} - %-5p [%t:%C{1}@%L] - %m%n</pattern>
+ </encoder>
+ </appender>
+
+ <logger name="org.apache.twill" level="DEBUG" />
+
+ <root level="WARN">
+ <appender-ref ref="STDOUT"/>
+ </root>
+
+</configuration>
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-yarn/pom.xml
----------------------------------------------------------------------
diff --git a/twill-yarn/pom.xml b/twill-yarn/pom.xml
new file mode 100644
index 0000000..b11bc7a
--- /dev/null
+++ b/twill-yarn/pom.xml
@@ -0,0 +1,127 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>twill-parent</artifactId>
+ <groupId>org.apache.twill</groupId>
+ <version>0.1.0-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>twill-yarn</artifactId>
+ <name>Twill Apache Hadoop YARN library</name>
+
+ <properties>
+ <output.dir>target/classes</output.dir>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>twill-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>twill-discovery-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>jcl-over-slf4j</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-client</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-minicluster</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <outputDirectory>${output.dir}</outputDirectory>
+ </build>
+
+ <profiles>
+ <profile>
+ <id>hadoop-2.0</id>
+ <properties>
+ <output.dir>${hadoop20.output.dir}</output.dir>
+ </properties>
+ </profile>
+ <profile>
+ <id>hadoop-2.1</id>
+ <build>
+ <resources>
+ <resource>
+ <directory>${hadoop20.output.dir}</directory>
+ </resource>
+ <resource>
+ <directory>src/main/resources</directory>
+ </resource>
+ </resources>
+ </build>
+ </profile>
+ <profile>
+ <id>hadoop-2.2</id>
+ <build>
+ <resources>
+ <resource>
+ <directory>${hadoop20.output.dir}</directory>
+ </resource>
+ <resource>
+ <directory>src/main/resources</directory>
+ </resource>
+ </resources>
+ </build>
+ </profile>
+ </profiles>
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAMClient.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAMClient.java b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAMClient.java
new file mode 100644
index 0000000..d98dee1
--- /dev/null
+++ b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAMClient.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn;
+
+import org.apache.twill.internal.ProcessLauncher;
+import org.apache.twill.internal.appmaster.RunnableProcessLauncher;
+import org.apache.twill.internal.yarn.ports.AMRMClient;
+import org.apache.twill.internal.yarn.ports.AMRMClientImpl;
+import org.apache.twill.internal.yarn.ports.AllocationResponse;
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Multimap;
+import com.google.common.util.concurrent.AbstractIdleService;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetSocketAddress;
+import java.net.URL;
+import java.util.List;
+import java.util.UUID;
+
+/**
+ *
+ */
+public final class Hadoop20YarnAMClient extends AbstractIdleService implements YarnAMClient {
+
+ private static final Logger LOG = LoggerFactory.getLogger(Hadoop20YarnAMClient.class);
+ private static final Function<ContainerStatus, YarnContainerStatus> STATUS_TRANSFORM;
+
+ static {
+ STATUS_TRANSFORM = new Function<ContainerStatus, YarnContainerStatus>() {
+ @Override
+ public YarnContainerStatus apply(ContainerStatus status) {
+ return new Hadoop20YarnContainerStatus(status);
+ }
+ };
+ }
+
+ private final ContainerId containerId;
+ private final Multimap<String, AMRMClient.ContainerRequest> containerRequests;
+ private final AMRMClient amrmClient;
+ private final YarnNMClient nmClient;
+ private InetSocketAddress trackerAddr;
+ private URL trackerUrl;
+ private Resource maxCapability;
+ private Resource minCapability;
+
+ public Hadoop20YarnAMClient(Configuration conf) {
+ String masterContainerId = System.getenv().get(ApplicationConstants.AM_CONTAINER_ID_ENV);
+ Preconditions.checkArgument(masterContainerId != null,
+ "Missing %s from environment", ApplicationConstants.AM_CONTAINER_ID_ENV);
+ this.containerId = ConverterUtils.toContainerId(masterContainerId);
+ this.containerRequests = ArrayListMultimap.create();
+
+ this.amrmClient = new AMRMClientImpl(containerId.getApplicationAttemptId());
+ this.amrmClient.init(conf);
+ this.nmClient = new Hadoop20YarnNMClient(YarnRPC.create(conf), conf);
+ }
+
+ @Override
+ protected void startUp() throws Exception {
+ Preconditions.checkNotNull(trackerAddr, "Tracker address not set.");
+ Preconditions.checkNotNull(trackerUrl, "Tracker URL not set.");
+
+ amrmClient.start();
+
+ RegisterApplicationMasterResponse response = amrmClient.registerApplicationMaster(trackerAddr.getHostName(),
+ trackerAddr.getPort(),
+ trackerUrl.toString());
+ maxCapability = response.getMaximumResourceCapability();
+ minCapability = response.getMinimumResourceCapability();
+ }
+
+ @Override
+ protected void shutDown() throws Exception {
+ amrmClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, null, trackerUrl.toString());
+ amrmClient.stop();
+ }
+
+ @Override
+ public ContainerId getContainerId() {
+ return containerId;
+ }
+
+ @Override
+ public String getHost() {
+ return System.getenv().get(ApplicationConstants.NM_HOST_ENV);
+ }
+
+ @Override
+ public void setTracker(InetSocketAddress trackerAddr, URL trackerUrl) {
+ this.trackerAddr = trackerAddr;
+ this.trackerUrl = trackerUrl;
+ }
+
+ @Override
+ public synchronized void allocate(float progress, AllocateHandler handler) throws Exception {
+ AllocationResponse response = amrmClient.allocate(progress);
+ List<ProcessLauncher<YarnContainerInfo>> launchers
+ = Lists.newArrayListWithCapacity(response.getAllocatedContainers().size());
+
+ for (Container container : response.getAllocatedContainers()) {
+ launchers.add(new RunnableProcessLauncher(new Hadoop20YarnContainerInfo(container), nmClient));
+ }
+
+ if (!launchers.isEmpty()) {
+ handler.acquired(launchers);
+
+ // If no process has been launched through the given launcher, return the container.
+ for (ProcessLauncher<YarnContainerInfo> l : launchers) {
+ // This cast always works.
+ RunnableProcessLauncher launcher = (RunnableProcessLauncher) l;
+ if (!launcher.isLaunched()) {
+ Container container = launcher.getContainerInfo().getContainer();
+ LOG.info("Nothing to run in container, releasing it: {}", container);
+ amrmClient.releaseAssignedContainer(container.getId());
+ }
+ }
+ }
+
+ List<YarnContainerStatus> completed = ImmutableList.copyOf(
+ Iterables.transform(response.getCompletedContainersStatuses(), STATUS_TRANSFORM));
+ if (!completed.isEmpty()) {
+ handler.completed(completed);
+ }
+ }
+
+ @Override
+ public ContainerRequestBuilder addContainerRequest(Resource capability) {
+ return addContainerRequest(capability, 1);
+ }
+
+ @Override
+ public ContainerRequestBuilder addContainerRequest(Resource capability, int count) {
+ return new ContainerRequestBuilder(adjustCapability(capability), count) {
+ @Override
+ public String apply() {
+ synchronized (Hadoop20YarnAMClient.this) {
+ String id = UUID.randomUUID().toString();
+
+ String[] hosts = this.hosts.isEmpty() ? null : this.hosts.toArray(new String[this.hosts.size()]);
+ String[] racks = this.racks.isEmpty() ? null : this.racks.toArray(new String[this.racks.size()]);
+
+ for (int i = 0; i < count; i++) {
+ AMRMClient.ContainerRequest request = new AMRMClient.ContainerRequest(capability, hosts, racks,
+ priority, 1);
+ containerRequests.put(id, request);
+ amrmClient.addContainerRequest(request);
+ }
+
+ return id;
+ }
+ }
+ };
+ }
+
+ @Override
+ public synchronized void completeContainerRequest(String id) {
+ for (AMRMClient.ContainerRequest request : containerRequests.removeAll(id)) {
+ amrmClient.removeContainerRequest(request);
+ }
+ }
+
+ private Resource adjustCapability(Resource resource) {
+ int cores = YarnUtils.getVirtualCores(resource);
+ int updatedCores = Math.max(Math.min(cores, YarnUtils.getVirtualCores(maxCapability)),
+ YarnUtils.getVirtualCores(minCapability));
+ // Try and set the virtual cores, which older versions of YARN don't support this.
+ if (cores != updatedCores && YarnUtils.setVirtualCores(resource, updatedCores)) {
+ LOG.info("Adjust virtual cores requirement from {} to {}.", cores, updatedCores);
+ }
+
+ int updatedMemory = Math.min(resource.getMemory(), maxCapability.getMemory());
+ int minMemory = minCapability.getMemory();
+ updatedMemory = (int) Math.ceil(((double) updatedMemory / minMemory)) * minMemory;
+
+ if (resource.getMemory() != updatedMemory) {
+ resource.setMemory(updatedMemory);
+ LOG.info("Adjust memory requirement from {} to {} MB.", resource.getMemory(), updatedMemory);
+ }
+
+ return resource;
+ }
+}