You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@twill.apache.org by ch...@apache.org on 2016/01/26 20:16:19 UTC
[01/22] incubator-twill git commit: Add hsaputra for Henry Saputra as
entry in the master pom, as well as adding for Albert and Poorna.
Repository: incubator-twill
Updated Branches:
refs/heads/site 81a5e462b -> 87e2b4652
Add hsaputra for Henry Saputra as entry in the master pom, as well as adding for Albert and Poorna.
This closes #61 from GitHub.
Signed-off-by: hsaputra <hs...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/incubator-twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-twill/commit/d2d90817
Tree: http://git-wip-us.apache.org/repos/asf/incubator-twill/tree/d2d90817
Diff: http://git-wip-us.apache.org/repos/asf/incubator-twill/diff/d2d90817
Branch: refs/heads/site
Commit: d2d908172cd7fe3f37ce440a5f7ffc01b43625a6
Parents: a303e3b
Author: hsaputra <hs...@apache.org>
Authored: Wed Aug 5 12:04:11 2015 -0700
Committer: hsaputra <hs...@apache.org>
Committed: Thu Aug 6 16:53:58 2015 -0700
----------------------------------------------------------------------
pom.xml | 10 ++++++++++
1 file changed, 10 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/d2d90817/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 7685b58..4daa68d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -114,6 +114,7 @@
</roles>
</developer>
<developer>
+ <id>ashau</id>
<name>Albert Shau</name>
<email>ashau@apache.org</email>
<roles>
@@ -121,12 +122,21 @@
</roles>
</developer>
<developer>
+ <id>poorna</id>
<name>Poorna Chandra</name>
<email>poorna@apache.org</email>
<roles>
<role>Committer</role>
</roles>
</developer>
+ <developer>
+ <id>hsaputra</id>
+ <name>Henry Saputra</name>
+ <email>hsaputra@apache.org</email>
+ <roles>
+ <role>Committer</role>
+ </roles>
+ </developer>
</developers>
<modules>
[14/22] incubator-twill git commit: (TWILL-158) Added FileContext
Location and LocationFactory
Posted by ch...@apache.org.
(TWILL-158) Added FileContext Location and LocationFactory
- Added new unit-tests.
- Minor improvement on HDFSLocationFactory to handle creation URI correctly
This closes #73 on GitHub.
Signed-off-by: Terence Yim <ch...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/incubator-twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-twill/commit/388a6d92
Tree: http://git-wip-us.apache.org/repos/asf/incubator-twill/tree/388a6d92
Diff: http://git-wip-us.apache.org/repos/asf/incubator-twill/diff/388a6d92
Branch: refs/heads/site
Commit: 388a6d922dd89dd8e1f5a9fed5aefe265ef2eeed
Parents: b80f9b8
Author: Terence Yim <ch...@apache.org>
Authored: Thu Dec 24 12:38:03 2015 +0800
Committer: Terence Yim <ch...@apache.org>
Committed: Tue Jan 5 14:54:24 2016 -0800
----------------------------------------------------------------------
.../twill/filesystem/LocalLocationFactory.java | 4 +-
.../twill/filesystem/FileContextLocation.java | 219 +++++++++++++++++++
.../filesystem/FileContextLocationFactory.java | 119 ++++++++++
.../apache/twill/filesystem/HDFSLocation.java | 2 +-
.../twill/filesystem/HDFSLocationFactory.java | 18 +-
.../filesystem/FileContextLocationTest.java | 50 +++++
.../twill/filesystem/HDFSLocationTest.java | 6 +-
.../twill/filesystem/LocalLocationTest.java | 17 +-
.../twill/filesystem/LocationTestBase.java | 68 +++++-
9 files changed, 477 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/388a6d92/twill-common/src/main/java/org/apache/twill/filesystem/LocalLocationFactory.java
----------------------------------------------------------------------
diff --git a/twill-common/src/main/java/org/apache/twill/filesystem/LocalLocationFactory.java b/twill-common/src/main/java/org/apache/twill/filesystem/LocalLocationFactory.java
index 82847b2..8e7ab8b 100644
--- a/twill-common/src/main/java/org/apache/twill/filesystem/LocalLocationFactory.java
+++ b/twill-common/src/main/java/org/apache/twill/filesystem/LocalLocationFactory.java
@@ -31,7 +31,7 @@ public final class LocalLocationFactory implements LocationFactory {
* Constructs a LocalLocationFactory that Location created will be relative to system root.
*/
public LocalLocationFactory() {
- this(new File("/"));
+ this(new File(File.separator));
}
public LocalLocationFactory(File basePath) {
@@ -40,7 +40,7 @@ public final class LocalLocationFactory implements LocationFactory {
@Override
public Location create(String path) {
- return new LocalLocation(this, new File(basePath, path));
+ return create(new File(basePath, path).toURI());
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/388a6d92/twill-yarn/src/main/java/org/apache/twill/filesystem/FileContextLocation.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/filesystem/FileContextLocation.java b/twill-yarn/src/main/java/org/apache/twill/filesystem/FileContextLocation.java
new file mode 100644
index 0000000..f92954e
--- /dev/null
+++ b/twill-yarn/src/main/java/org/apache/twill/filesystem/FileContextLocation.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.filesystem;
+
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Options;
+import org.apache.hadoop.fs.ParentNotDirectoryException;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.HAUtil;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.UUID;
+import javax.annotation.Nullable;
+
+/**
+ * An implementation of {@link Location} using {@link FileContext}.
+ */
+final class FileContextLocation implements Location {
+
+ private final FileContextLocationFactory locationFactory;
+ private final FileContext fc;
+ private final Path path;
+
+ FileContextLocation(FileContextLocationFactory locationFactory, FileContext fc, Path path) {
+ this.locationFactory = locationFactory;
+ this.fc = fc;
+ this.path = path;
+ }
+
+ @Override
+ public boolean exists() throws IOException {
+ return fc.util().exists(path);
+ }
+
+ @Override
+ public String getName() {
+ return path.getName();
+ }
+
+ @Override
+ public boolean createNew() throws IOException {
+ try {
+ fc.create(path, EnumSet.of(CreateFlag.CREATE), Options.CreateOpts.createParent()).close();
+ return true;
+ } catch (FileAlreadyExistsException e) {
+ return false;
+ }
+ }
+
+ @Override
+ public InputStream getInputStream() throws IOException {
+ return fc.open(path);
+ }
+
+ @Override
+ public OutputStream getOutputStream() throws IOException {
+ return fc.create(path, EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE), Options.CreateOpts.createParent());
+ }
+
+ @Override
+ public OutputStream getOutputStream(String permission) throws IOException {
+ return fc.create(path, EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE),
+ Options.CreateOpts.perms(new FsPermission(permission)),
+ Options.CreateOpts.createParent());
+ }
+
+ @Override
+ public Location append(String child) throws IOException {
+ if (child.startsWith("/")) {
+ child = child.substring(1);
+ }
+ return new FileContextLocation(locationFactory, fc, new Path(URI.create(path.toUri() + "/" + child)));
+ }
+
+ @Override
+ public Location getTempFile(String suffix) throws IOException {
+ Path path = new Path(
+ URI.create(this.path.toUri() + "." + UUID.randomUUID() + (suffix == null ? TEMP_FILE_SUFFIX : suffix)));
+ return new FileContextLocation(locationFactory, fc, path);
+ }
+
+ @Override
+ public URI toURI() {
+ // In HA mode, the path URI returned by path created through FileContext is incompatible with the FileSystem,
+ // which is used inside Hadoop. It is due to the fact that FileContext is not HA aware and it always
+ // append "port" to the path URI, while the DistributedFileSystem always use the cluster logical
+ // name, which doesn't allow having port in it.
+ URI uri = path.toUri();
+ if (HAUtil.isLogicalUri(locationFactory.getConfiguration(), uri)) {
+ try {
+ // Need to strip out the port if in HA
+ return new URI(uri.getScheme(), uri.getUserInfo(), uri.getHost(),
+ -1, uri.getPath(), uri.getQuery(), uri.getFragment());
+ } catch (URISyntaxException e) {
+ // Shouldn't happen
+ throw Throwables.propagate(e);
+ }
+ }
+
+ return uri;
+ }
+
+ @Override
+ public boolean delete() throws IOException {
+ return delete(false);
+ }
+
+ @Override
+ public boolean delete(boolean recursive) throws IOException {
+ return fc.delete(path, recursive);
+ }
+
+ @Nullable
+ @Override
+ public Location renameTo(Location destination) throws IOException {
+ Path targetPath = new Path(destination.toURI());
+ try {
+ fc.rename(path, targetPath, Options.Rename.OVERWRITE);
+ return new FileContextLocation(locationFactory, fc, targetPath);
+ } catch (FileAlreadyExistsException | FileNotFoundException | ParentNotDirectoryException e) {
+ return null;
+ }
+ }
+
+ @Override
+ public boolean mkdirs() throws IOException {
+ try {
+ fc.mkdir(path, null, true);
+ return true;
+ } catch (FileAlreadyExistsException e) {
+ return false;
+ }
+ }
+
+ @Override
+ public long length() throws IOException {
+ return fc.getFileStatus(path).getLen();
+ }
+
+ @Override
+ public long lastModified() throws IOException {
+ return fc.getFileStatus(path).getModificationTime();
+ }
+
+ @Override
+ public boolean isDirectory() throws IOException {
+ try {
+ return fc.getFileStatus(path).isDirectory();
+ } catch (FileNotFoundException e) {
+ return false;
+ }
+ }
+
+ @Override
+ public List<Location> list() throws IOException {
+ RemoteIterator<FileStatus> statuses = fc.listStatus(path);
+ ImmutableList.Builder<Location> result = ImmutableList.builder();
+ while (statuses.hasNext()) {
+ FileStatus status = statuses.next();
+ if (!Objects.equals(path, status.getPath())) {
+ result.add(new FileContextLocation(locationFactory, fc, status.getPath()));
+ }
+ }
+ return result.build();
+
+ }
+
+ @Override
+ public LocationFactory getLocationFactory() {
+ return locationFactory;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ FileContextLocation that = (FileContextLocation) o;
+ return Objects.equals(path, that.path);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(path);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/388a6d92/twill-yarn/src/main/java/org/apache/twill/filesystem/FileContextLocationFactory.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/filesystem/FileContextLocationFactory.java b/twill-yarn/src/main/java/org/apache/twill/filesystem/FileContextLocationFactory.java
new file mode 100644
index 0000000..d64be71
--- /dev/null
+++ b/twill-yarn/src/main/java/org/apache/twill/filesystem/FileContextLocationFactory.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.filesystem;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.UnsupportedFileSystemException;
+
+import java.net.URI;
+import java.util.Objects;
+
+/**
+ * A {@link LocationFactory} implementation that uses {@link FileContext} to create {@link Location}.
+ */
+public class FileContextLocationFactory implements LocationFactory {
+
+ private final Configuration configuration;
+ private final FileContext fc;
+ private final Path pathBase;
+
+ /**
+ * Same as {@link #FileContextLocationFactory(Configuration, String) FileContextLocationFactory(configuration, "/")}.
+ */
+ public FileContextLocationFactory(Configuration configuration) {
+ this(configuration, "/");
+ }
+
+ /**
+ * Creates a new instance.
+ *
+ * @param configuration the hadoop configuration
+ * @param pathBase base path for all non-absolute location created through this {@link LocationFactory}.
+ */
+ public FileContextLocationFactory(Configuration configuration, String pathBase) {
+ this.configuration = configuration;
+ this.fc = createFileContext(configuration);
+ this.pathBase = new Path(pathBase.startsWith("/") ? pathBase : "/" + pathBase);
+ }
+
+ @Override
+ public Location create(String path) {
+ if (path.startsWith("/")) {
+ path = path.substring(1);
+ }
+ Path locationPath;
+ if (path.isEmpty()) {
+ locationPath = pathBase;
+ } else {
+ locationPath = new Path(path);
+ }
+ locationPath = locationPath.makeQualified(fc.getDefaultFileSystem().getUri(), pathBase);
+ return new FileContextLocation(this, fc, locationPath);
+ }
+
+ @Override
+ public Location create(URI uri) {
+ URI contextURI = fc.getWorkingDirectory().toUri();
+ if (Objects.equals(contextURI.getScheme(), uri.getScheme())
+ && Objects.equals(contextURI.getAuthority(), uri.getAuthority())) {
+ // A full URI
+ return new FileContextLocation(this, fc, new Path(uri));
+ }
+
+ if (uri.isAbsolute()) {
+ // Needs to be of the same scheme
+ Preconditions.checkArgument(Objects.equals(contextURI.getScheme(), uri.getScheme()),
+ "Only URI with '%s' scheme is supported", contextURI.getScheme());
+ Path locationPath = new Path(uri).makeQualified(fc.getDefaultFileSystem().getUri(), pathBase);
+ return new FileContextLocation(this, fc, locationPath);
+ }
+
+ return create(uri.getPath());
+ }
+
+ @Override
+ public Location getHomeLocation() {
+ return new FileContextLocation(this, fc, fc.getHomeDirectory());
+ }
+
+ /**
+ * Returns the {@link FileContext} used by this {@link LocationFactory}.
+ */
+ public FileContext getFileContext() {
+ return fc;
+ }
+
+ /**
+ * Returns the {@link Configuration} used by this {@link LocationFactory}.
+ */
+ public Configuration getConfiguration() {
+ return configuration;
+ }
+
+ private static FileContext createFileContext(Configuration configuration) {
+ try {
+ return FileContext.getFileContext(configuration);
+ } catch (UnsupportedFileSystemException e) {
+ throw Throwables.propagate(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/388a6d92/twill-yarn/src/main/java/org/apache/twill/filesystem/HDFSLocation.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/filesystem/HDFSLocation.java b/twill-yarn/src/main/java/org/apache/twill/filesystem/HDFSLocation.java
index 818fe23..aa29384 100644
--- a/twill-yarn/src/main/java/org/apache/twill/filesystem/HDFSLocation.java
+++ b/twill-yarn/src/main/java/org/apache/twill/filesystem/HDFSLocation.java
@@ -35,7 +35,7 @@ import java.util.List;
import java.util.UUID;
/**
- * A concrete implementation of {@link Location} for the HDFS filesystem.
+ * A concrete implementation of {@link Location} for the HDFS filesystem using {@link FileSystem}.
*/
final class HDFSLocation implements Location {
private final FileSystem fs;
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/388a6d92/twill-yarn/src/main/java/org/apache/twill/filesystem/HDFSLocationFactory.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/filesystem/HDFSLocationFactory.java b/twill-yarn/src/main/java/org/apache/twill/filesystem/HDFSLocationFactory.java
index 65146a8..728de32 100644
--- a/twill-yarn/src/main/java/org/apache/twill/filesystem/HDFSLocationFactory.java
+++ b/twill-yarn/src/main/java/org/apache/twill/filesystem/HDFSLocationFactory.java
@@ -17,6 +17,7 @@
*/
package org.apache.twill.filesystem;
+import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -24,10 +25,14 @@ import org.apache.hadoop.fs.Path;
import java.io.IOException;
import java.net.URI;
+import java.util.Objects;
/**
- * A {@link LocationFactory} that creates HDFS {@link Location}.
+ * A {@link LocationFactory} that creates HDFS {@link Location} using {@link FileSystem}.
+ *
+ * @deprecated Deprecated since 0.7.0. Use {@link FileContextLocationFactory} instead.
*/
+@Deprecated
public final class HDFSLocationFactory implements LocationFactory {
private final FileSystem fileSystem;
@@ -63,14 +68,21 @@ public final class HDFSLocationFactory implements LocationFactory {
@Override
public Location create(URI uri) {
- if (!uri.toString().startsWith(fileSystem.getUri().toString())) {
+ URI fsURI = fileSystem.getUri();
+ if (Objects.equals(fsURI.getScheme(), uri.getScheme())
+ && Objects.equals(fsURI.getAuthority(), uri.getAuthority())) {
// It's a full URI
return new HDFSLocation(this, new Path(uri));
}
+
if (uri.isAbsolute()) {
+ // Needs to be of the same scheme
+ Preconditions.checkArgument(Objects.equals(fsURI.getScheme(), uri.getScheme()),
+ "Only URI with '%s' scheme is supported", fsURI.getScheme());
return new HDFSLocation(this, new Path(fileSystem.getUri() + uri.getPath()));
}
- return new HDFSLocation(this, new Path(fileSystem.getUri() + "/" + pathBase + "/" + uri.getPath()));
+
+ return create(uri.getPath());
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/388a6d92/twill-yarn/src/test/java/org/apache/twill/filesystem/FileContextLocationTest.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/filesystem/FileContextLocationTest.java b/twill-yarn/src/test/java/org/apache/twill/filesystem/FileContextLocationTest.java
new file mode 100644
index 0000000..e4c3774
--- /dev/null
+++ b/twill-yarn/src/test/java/org/apache/twill/filesystem/FileContextLocationTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.filesystem;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+import java.io.IOException;
+
+/**
+ *
+ */
+public class FileContextLocationTest extends LocationTestBase {
+
+ private static MiniDFSCluster dfsCluster;
+
+ @BeforeClass
+ public static void init() throws IOException {
+ Configuration conf = new Configuration();
+ conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, tmpFolder.newFolder().getAbsolutePath());
+ dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+ }
+
+ @AfterClass
+ public static void finish() {
+ dfsCluster.shutdown();
+ }
+
+ @Override
+ protected LocationFactory createLocationFactory(String pathBase) throws Exception {
+ return new FileContextLocationFactory(dfsCluster.getFileSystem().getConf(), pathBase);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/388a6d92/twill-yarn/src/test/java/org/apache/twill/filesystem/HDFSLocationTest.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/filesystem/HDFSLocationTest.java b/twill-yarn/src/test/java/org/apache/twill/filesystem/HDFSLocationTest.java
index 20f7403..d57d49f 100644
--- a/twill-yarn/src/test/java/org/apache/twill/filesystem/HDFSLocationTest.java
+++ b/twill-yarn/src/test/java/org/apache/twill/filesystem/HDFSLocationTest.java
@@ -30,14 +30,12 @@ import java.io.IOException;
public class HDFSLocationTest extends LocationTestBase {
private static MiniDFSCluster dfsCluster;
- private static LocationFactory locationFactory;
@BeforeClass
public static void init() throws IOException {
Configuration conf = new Configuration();
conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, tmpFolder.newFolder().getAbsolutePath());
dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
- locationFactory = new HDFSLocationFactory(dfsCluster.getFileSystem());
}
@AfterClass
@@ -46,7 +44,7 @@ public class HDFSLocationTest extends LocationTestBase {
}
@Override
- protected LocationFactory getLocationFactory() {
- return locationFactory;
+ protected LocationFactory createLocationFactory(String pathBase) throws Exception {
+ return new HDFSLocationFactory(dfsCluster.getFileSystem(), pathBase);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/388a6d92/twill-yarn/src/test/java/org/apache/twill/filesystem/LocalLocationTest.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/filesystem/LocalLocationTest.java b/twill-yarn/src/test/java/org/apache/twill/filesystem/LocalLocationTest.java
index 3f6d931..ba21beb 100644
--- a/twill-yarn/src/test/java/org/apache/twill/filesystem/LocalLocationTest.java
+++ b/twill-yarn/src/test/java/org/apache/twill/filesystem/LocalLocationTest.java
@@ -17,24 +17,17 @@
*/
package org.apache.twill.filesystem;
-import org.junit.BeforeClass;
-
-import java.io.IOException;
+import java.io.File;
/**
*
*/
public class LocalLocationTest extends LocationTestBase {
- private static LocationFactory locationFactory;
-
- @BeforeClass
- public static void init() throws IOException {
- locationFactory = new LocalLocationFactory(tmpFolder.newFolder());
- }
-
@Override
- protected LocationFactory getLocationFactory() {
- return locationFactory;
+ protected LocationFactory createLocationFactory(String pathBase) throws Exception {
+ File basePath = new File(tmpFolder.newFolder(), pathBase);
+ basePath.mkdirs();
+ return new LocalLocationFactory(basePath);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/388a6d92/twill-yarn/src/test/java/org/apache/twill/filesystem/LocationTestBase.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/filesystem/LocationTestBase.java b/twill-yarn/src/test/java/org/apache/twill/filesystem/LocationTestBase.java
index ee591e7..e01115b 100644
--- a/twill-yarn/src/test/java/org/apache/twill/filesystem/LocationTestBase.java
+++ b/twill-yarn/src/test/java/org/apache/twill/filesystem/LocationTestBase.java
@@ -17,13 +17,23 @@
*/
package org.apache.twill.filesystem;
+import com.google.common.base.Charsets;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.io.CharStreams;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.Reader;
+import java.io.Writer;
import java.net.URI;
+import java.net.URISyntaxException;
import java.util.List;
/**
@@ -34,10 +44,60 @@ public abstract class LocationTestBase {
@ClassRule
public static TemporaryFolder tmpFolder = new TemporaryFolder();
+ private final LoadingCache<String, LocationFactory> locationFactoryCache = CacheBuilder.newBuilder()
+ .build(new CacheLoader<String, LocationFactory>() {
+ @Override
+ public LocationFactory load(String key) throws Exception {
+ return createLocationFactory(key);
+ }
+ });
+
+ @Test
+ public void testBasic() throws Exception {
+ LocationFactory factory = locationFactoryCache.getUnchecked("basic");
+ URI baseURI = factory.create("/").toURI();
+
+ // Test basic location construction
+ Assert.assertEquals(factory.create("/file"), factory.create("/file"));
+ Assert.assertEquals(factory.create("/file2"),
+ factory.create(URI.create(baseURI.getScheme() + ":" + baseURI.getPath() + "/file2")));
+ Assert.assertEquals(factory.create("/file3"),
+ factory.create(
+ new URI(baseURI.getScheme(), baseURI.getAuthority(),
+ baseURI.getPath() + "/file3", null, null)));
+ Assert.assertEquals(factory.create("/"), factory.create("/"));
+ Assert.assertEquals(factory.create("/"), factory.create(URI.create(baseURI.getScheme() + ":" + baseURI.getPath())));
+
+ Assert.assertEquals(factory.create("/"),
+ factory.create(new URI(baseURI.getScheme(), baseURI.getAuthority(),
+ baseURI.getPath(), null, null)));
+
+ // Test file creation and rename
+ Location location = factory.create("/file");
+ Assert.assertTrue(location.createNew());
+ Assert.assertTrue(location.exists());
+
+ Location location2 = factory.create("/file2");
+ String message = "Testing Message";
+ try (Writer writer = new OutputStreamWriter(location2.getOutputStream(), Charsets.UTF_8)) {
+ writer.write(message);
+ }
+ long length = location2.length();
+ long lastModified = location2.lastModified();
+
+ location2.renameTo(location);
+
+ Assert.assertFalse(location2.exists());
+ try (Reader reader = new InputStreamReader(location.getInputStream(), Charsets.UTF_8)) {
+ Assert.assertEquals(message, CharStreams.toString(reader));
+ }
+ Assert.assertEquals(length, location.length());
+ Assert.assertEquals(lastModified, location.lastModified());
+ }
@Test
public void testDelete() throws IOException {
- LocationFactory factory = getLocationFactory();
+ LocationFactory factory = locationFactoryCache.getUnchecked("delete");
Location base = factory.create("test").getTempFile(".tmp");
Assert.assertTrue(base.mkdirs());
@@ -57,7 +117,7 @@ public abstract class LocationTestBase {
@Test
public void testHelper() {
- LocationFactory factory = LocationFactories.namespace(getLocationFactory(), "testhelper");
+ LocationFactory factory = LocationFactories.namespace(locationFactoryCache.getUnchecked("helper"), "testhelper");
Location location = factory.create("test");
Assert.assertTrue(location.toURI().getPath().endsWith("testhelper/test"));
@@ -68,7 +128,7 @@ public abstract class LocationTestBase {
@Test
public void testList() throws IOException {
- LocationFactory factory = getLocationFactory();
+ LocationFactory factory = locationFactoryCache.getUnchecked("list");
Location dir = factory.create("dir");
@@ -107,5 +167,5 @@ public abstract class LocationTestBase {
}
}
- protected abstract LocationFactory getLocationFactory();
+ protected abstract LocationFactory createLocationFactory(String pathBase) throws Exception;
}
[19/22] incubator-twill git commit: Merge branch 'branch-0.7.0'
Posted by ch...@apache.org.
Merge branch 'branch-0.7.0'
Project: http://git-wip-us.apache.org/repos/asf/incubator-twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-twill/commit/c6b3f0d4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-twill/tree/c6b3f0d4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-twill/diff/c6b3f0d4
Branch: refs/heads/site
Commit: c6b3f0d43aff7cd4095edc79f3452eb640b37f79
Parents: 900e382 047e011
Author: Terence Yim <ch...@apache.org>
Authored: Mon Jan 11 10:35:30 2016 -0800
Committer: Terence Yim <ch...@apache.org>
Committed: Mon Jan 11 10:35:30 2016 -0800
----------------------------------------------------------------------
pom.xml | 2 +-
twill-api/pom.xml | 2 +-
twill-common/pom.xml | 2 +-
twill-core/pom.xml | 2 +-
twill-discovery-api/pom.xml | 2 +-
twill-discovery-core/pom.xml | 2 +-
twill-examples/echo/pom.xml | 2 +-
twill-examples/pom.xml | 2 +-
twill-examples/yarn/pom.xml | 2 +-
twill-ext/pom.xml | 2 +-
twill-java8-test/pom.xml | 2 +-
twill-yarn/pom.xml | 2 +-
twill-zookeeper/pom.xml | 2 +-
13 files changed, 13 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
[10/22] incubator-twill git commit: (TWILL-155) Sort file listed from
directory in TwillLauncher
Posted by ch...@apache.org.
(TWILL-155) Sort file listed from directory in TwillLauncher
This gives a deterministic behavior on the ClassLoader created
by TwillLauncher
This closes #71 on GitHub
Signed-off-by: Terence Yim <ch...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/incubator-twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-twill/commit/87b063ca
Tree: http://git-wip-us.apache.org/repos/asf/incubator-twill/tree/87b063ca
Diff: http://git-wip-us.apache.org/repos/asf/incubator-twill/diff/87b063ca
Branch: refs/heads/site
Commit: 87b063cac642d4a605679049faa4e662b71cfc46
Parents: 161032a
Author: Terence Yim <ch...@apache.org>
Authored: Wed Nov 11 14:03:27 2015 -0800
Committer: Terence Yim <ch...@apache.org>
Committed: Wed Nov 11 22:20:05 2015 -0800
----------------------------------------------------------------------
.../apache/twill/launcher/TwillLauncher.java | 36 +++++++++++++-------
1 file changed, 24 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/87b063ca/twill-core/src/main/java/org/apache/twill/launcher/TwillLauncher.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/launcher/TwillLauncher.java b/twill-core/src/main/java/org/apache/twill/launcher/TwillLauncher.java
index 3405709..171b22a 100644
--- a/twill-core/src/main/java/org/apache/twill/launcher/TwillLauncher.java
+++ b/twill-core/src/main/java/org/apache/twill/launcher/TwillLauncher.java
@@ -36,6 +36,7 @@ import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.jar.JarEntry;
@@ -142,18 +143,15 @@ public final class TwillLauncher {
private static URLClassLoader createClassLoader(File dir, boolean useClassPath) {
try {
- List<URL> urls = new ArrayList<URL>();
+ List<URL> urls = new ArrayList<>();
urls.add(dir.toURI().toURL());
urls.add(new File(dir, "classes").toURI().toURL());
urls.add(new File(dir, "resources").toURI().toURL());
File libDir = new File(dir, "lib");
- File[] files = libDir.listFiles();
- if (files != null) {
- for (File file : files) {
- if (file.getName().endsWith(".jar")) {
- urls.add(file.toURI().toURL());
- }
+ for (File file : listFiles(libDir)) {
+ if (file.getName().endsWith(".jar")) {
+ urls.add(file.toURI().toURL());
}
}
@@ -163,7 +161,7 @@ public final class TwillLauncher {
addClassPathsToList(urls, Constants.APPLICATION_CLASSPATH);
- return new URLClassLoader(urls.toArray(new URL[0]));
+ return new URLClassLoader(urls.toArray(new URL[urls.size()]));
} catch (Exception e) {
throw new IllegalStateException(e);
@@ -190,12 +188,12 @@ public final class TwillLauncher {
if (classpath.endsWith("/*")) {
// Grab all .jar files
File dir = new File(classpath.substring(0, classpath.length() - 2));
- File[] files = dir.listFiles();
- if (files == null || files.length == 0) {
+ List<File> files = listFiles(dir);
+ if (files.isEmpty()) {
return singleItem(dir.toURI().toURL());
}
- List<URL> result = new ArrayList<URL>(files.length);
+ List<URL> result = new ArrayList<>(files.size());
for (File file : files) {
if (file.getName().endsWith(".jar")) {
result.add(file.toURI().toURL());
@@ -208,7 +206,7 @@ public final class TwillLauncher {
}
private static Collection<URL> singleItem(URL url) {
- List<URL> result = new ArrayList<URL>(1);
+ List<URL> result = new ArrayList<>(1);
result.add(url);
return result;
}
@@ -233,4 +231,18 @@ public final class TwillLauncher {
}
dir.delete();
}
+
+ /**
+ * Returns a sorted list of {@link File} under the given directory. The list will be empty if
+ * the given directory is empty, not exist or not a directory.
+ */
+ private static List<File> listFiles(File dir) {
+ File[] files = dir.listFiles();
+ if (files == null || files.length == 0) {
+ return Collections.emptyList();
+ }
+ List<File> fileList = Arrays.asList(files);
+ Collections.sort(fileList);
+ return fileList;
+ }
}
[02/22] incubator-twill git commit: Merge branch 'master' into site
Posted by ch...@apache.org.
Merge branch 'master' into site
Project: http://git-wip-us.apache.org/repos/asf/incubator-twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-twill/commit/4b7c8481
Tree: http://git-wip-us.apache.org/repos/asf/incubator-twill/tree/4b7c8481
Diff: http://git-wip-us.apache.org/repos/asf/incubator-twill/diff/4b7c8481
Branch: refs/heads/site
Commit: 4b7c8481f60a5fc46faaee04cf4c48ba277ead82
Parents: 81a5e46 d2d9081
Author: Terence Yim <ch...@apache.org>
Authored: Thu Aug 6 17:07:57 2015 -0700
Committer: Terence Yim <ch...@apache.org>
Committed: Thu Aug 6 17:07:57 2015 -0700
----------------------------------------------------------------------
pom.xml | 10 ++++++++++
1 file changed, 10 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4b7c8481/pom.xml
----------------------------------------------------------------------
[13/22] incubator-twill git commit: (TWILL-160) Don’t create parent directory when getting InputStream from a LocalLocation
Posted by ch...@apache.org.
(TWILL-160) Don’t create parent directory when getting InputStream from a LocalLocation
This closes #75 on Github
Signed-off-by: Terence Yim <ch...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/incubator-twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-twill/commit/b80f9b80
Tree: http://git-wip-us.apache.org/repos/asf/incubator-twill/tree/b80f9b80
Diff: http://git-wip-us.apache.org/repos/asf/incubator-twill/diff/b80f9b80
Branch: refs/heads/site
Commit: b80f9b8036b59603f227ba1ae43aa099d5329135
Parents: 65c730b
Author: Terence Yim <ch...@apache.org>
Authored: Mon Jan 4 12:57:18 2016 -0800
Committer: Terence Yim <ch...@apache.org>
Committed: Tue Jan 5 11:21:05 2016 -0800
----------------------------------------------------------------------
.../src/main/java/org/apache/twill/filesystem/LocalLocation.java | 4 ----
1 file changed, 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/b80f9b80/twill-common/src/main/java/org/apache/twill/filesystem/LocalLocation.java
----------------------------------------------------------------------
diff --git a/twill-common/src/main/java/org/apache/twill/filesystem/LocalLocation.java b/twill-common/src/main/java/org/apache/twill/filesystem/LocalLocation.java
index a873545..037f4f9 100644
--- a/twill-common/src/main/java/org/apache/twill/filesystem/LocalLocation.java
+++ b/twill-common/src/main/java/org/apache/twill/filesystem/LocalLocation.java
@@ -70,10 +70,6 @@ final class LocalLocation implements Location {
*/
@Override
public InputStream getInputStream() throws IOException {
- File parent = file.getParentFile();
- if (!parent.exists()) {
- parent.mkdirs();
- }
return new FileInputStream(file);
}
[18/22] incubator-twill git commit: Prepare for releasing
0.7.0-incubating
Posted by ch...@apache.org.
Prepare for releasing 0.7.0-incubating
Project: http://git-wip-us.apache.org/repos/asf/incubator-twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-twill/commit/047e011f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-twill/tree/047e011f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-twill/diff/047e011f
Branch: refs/heads/site
Commit: 047e011f25066805eb820ff661364937ef7ef043
Parents: 900e382
Author: Terence Yim <ch...@apache.org>
Authored: Mon Jan 11 10:17:37 2016 -0800
Committer: Terence Yim <ch...@apache.org>
Committed: Mon Jan 11 10:17:37 2016 -0800
----------------------------------------------------------------------
pom.xml | 2 +-
twill-api/pom.xml | 2 +-
twill-common/pom.xml | 2 +-
twill-core/pom.xml | 2 +-
twill-discovery-api/pom.xml | 2 +-
twill-discovery-core/pom.xml | 2 +-
twill-examples/echo/pom.xml | 2 +-
twill-examples/pom.xml | 2 +-
twill-examples/yarn/pom.xml | 2 +-
twill-ext/pom.xml | 2 +-
twill-java8-test/pom.xml | 2 +-
twill-yarn/pom.xml | 2 +-
twill-zookeeper/pom.xml | 2 +-
13 files changed, 13 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/047e011f/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 4daa68d..c116322 100644
--- a/pom.xml
+++ b/pom.xml
@@ -29,7 +29,7 @@
<groupId>org.apache.twill</groupId>
<artifactId>twill-parent</artifactId>
- <version>0.7.0-incubating-SNAPSHOT</version>
+ <version>0.7.0-incubating</version>
<packaging>pom</packaging>
<name>Apache Twill</name>
<url>http://twill.incubator.apache.org</url>
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/047e011f/twill-api/pom.xml
----------------------------------------------------------------------
diff --git a/twill-api/pom.xml b/twill-api/pom.xml
index c011dee..3cb6d97 100644
--- a/twill-api/pom.xml
+++ b/twill-api/pom.xml
@@ -24,7 +24,7 @@
<parent>
<groupId>org.apache.twill</groupId>
<artifactId>twill-parent</artifactId>
- <version>0.7.0-incubating-SNAPSHOT</version>
+ <version>0.7.0-incubating</version>
</parent>
<artifactId>twill-api</artifactId>
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/047e011f/twill-common/pom.xml
----------------------------------------------------------------------
diff --git a/twill-common/pom.xml b/twill-common/pom.xml
index b371acb..d1acb55 100644
--- a/twill-common/pom.xml
+++ b/twill-common/pom.xml
@@ -22,7 +22,7 @@
<parent>
<artifactId>twill-parent</artifactId>
<groupId>org.apache.twill</groupId>
- <version>0.7.0-incubating-SNAPSHOT</version>
+ <version>0.7.0-incubating</version>
</parent>
<modelVersion>4.0.0</modelVersion>
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/047e011f/twill-core/pom.xml
----------------------------------------------------------------------
diff --git a/twill-core/pom.xml b/twill-core/pom.xml
index 26ae718..e6b5514 100644
--- a/twill-core/pom.xml
+++ b/twill-core/pom.xml
@@ -22,7 +22,7 @@
<parent>
<artifactId>twill-parent</artifactId>
<groupId>org.apache.twill</groupId>
- <version>0.7.0-incubating-SNAPSHOT</version>
+ <version>0.7.0-incubating</version>
</parent>
<modelVersion>4.0.0</modelVersion>
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/047e011f/twill-discovery-api/pom.xml
----------------------------------------------------------------------
diff --git a/twill-discovery-api/pom.xml b/twill-discovery-api/pom.xml
index 147c6bc..a38aff3 100644
--- a/twill-discovery-api/pom.xml
+++ b/twill-discovery-api/pom.xml
@@ -22,7 +22,7 @@
<parent>
<artifactId>twill-parent</artifactId>
<groupId>org.apache.twill</groupId>
- <version>0.7.0-incubating-SNAPSHOT</version>
+ <version>0.7.0-incubating</version>
</parent>
<modelVersion>4.0.0</modelVersion>
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/047e011f/twill-discovery-core/pom.xml
----------------------------------------------------------------------
diff --git a/twill-discovery-core/pom.xml b/twill-discovery-core/pom.xml
index fb3dcbb..f80510d 100644
--- a/twill-discovery-core/pom.xml
+++ b/twill-discovery-core/pom.xml
@@ -22,7 +22,7 @@
<parent>
<artifactId>twill-parent</artifactId>
<groupId>org.apache.twill</groupId>
- <version>0.7.0-incubating-SNAPSHOT</version>
+ <version>0.7.0-incubating</version>
</parent>
<modelVersion>4.0.0</modelVersion>
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/047e011f/twill-examples/echo/pom.xml
----------------------------------------------------------------------
diff --git a/twill-examples/echo/pom.xml b/twill-examples/echo/pom.xml
index 8ba89ff..474127e 100644
--- a/twill-examples/echo/pom.xml
+++ b/twill-examples/echo/pom.xml
@@ -25,7 +25,7 @@ limitations under the License.
<parent>
<artifactId>twill-examples</artifactId>
<groupId>org.apache.twill</groupId>
- <version>0.7.0-incubating-SNAPSHOT</version>
+ <version>0.7.0-incubating</version>
</parent>
<name>Apache Twill examples: Echo</name>
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/047e011f/twill-examples/pom.xml
----------------------------------------------------------------------
diff --git a/twill-examples/pom.xml b/twill-examples/pom.xml
index 10163c2..3e2929b 100644
--- a/twill-examples/pom.xml
+++ b/twill-examples/pom.xml
@@ -25,7 +25,7 @@ limitations under the License.
<parent>
<groupId>org.apache.twill</groupId>
<artifactId>twill-parent</artifactId>
- <version>0.7.0-incubating-SNAPSHOT</version>
+ <version>0.7.0-incubating</version>
</parent>
<artifactId>twill-examples</artifactId>
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/047e011f/twill-examples/yarn/pom.xml
----------------------------------------------------------------------
diff --git a/twill-examples/yarn/pom.xml b/twill-examples/yarn/pom.xml
index 71f86e8..dea3777 100644
--- a/twill-examples/yarn/pom.xml
+++ b/twill-examples/yarn/pom.xml
@@ -24,7 +24,7 @@ limitations under the License.
<parent>
<artifactId>twill-examples</artifactId>
<groupId>org.apache.twill</groupId>
- <version>0.7.0-incubating-SNAPSHOT</version>
+ <version>0.7.0-incubating</version>
</parent>
<name>Apache Twill examples: YARN</name>
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/047e011f/twill-ext/pom.xml
----------------------------------------------------------------------
diff --git a/twill-ext/pom.xml b/twill-ext/pom.xml
index 539fd20..0eb45cd 100644
--- a/twill-ext/pom.xml
+++ b/twill-ext/pom.xml
@@ -22,7 +22,7 @@ limitations under the License.
<parent>
<artifactId>twill-parent</artifactId>
<groupId>org.apache.twill</groupId>
- <version>0.7.0-incubating-SNAPSHOT</version>
+ <version>0.7.0-incubating</version>
</parent>
<modelVersion>4.0.0</modelVersion>
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/047e011f/twill-java8-test/pom.xml
----------------------------------------------------------------------
diff --git a/twill-java8-test/pom.xml b/twill-java8-test/pom.xml
index 892a025..65b1c6e 100644
--- a/twill-java8-test/pom.xml
+++ b/twill-java8-test/pom.xml
@@ -24,7 +24,7 @@
<parent>
<groupId>org.apache.twill</groupId>
<artifactId>twill-parent</artifactId>
- <version>0.7.0-incubating-SNAPSHOT</version>
+ <version>0.7.0-incubating</version>
</parent>
<artifactId>twill-java8-test</artifactId>
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/047e011f/twill-yarn/pom.xml
----------------------------------------------------------------------
diff --git a/twill-yarn/pom.xml b/twill-yarn/pom.xml
index 9d61831..1df959a 100644
--- a/twill-yarn/pom.xml
+++ b/twill-yarn/pom.xml
@@ -22,7 +22,7 @@
<parent>
<artifactId>twill-parent</artifactId>
<groupId>org.apache.twill</groupId>
- <version>0.7.0-incubating-SNAPSHOT</version>
+ <version>0.7.0-incubating</version>
</parent>
<modelVersion>4.0.0</modelVersion>
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/047e011f/twill-zookeeper/pom.xml
----------------------------------------------------------------------
diff --git a/twill-zookeeper/pom.xml b/twill-zookeeper/pom.xml
index 6cf302d..9edc990 100644
--- a/twill-zookeeper/pom.xml
+++ b/twill-zookeeper/pom.xml
@@ -22,7 +22,7 @@
<parent>
<artifactId>twill-parent</artifactId>
<groupId>org.apache.twill</groupId>
- <version>0.7.0-incubating-SNAPSHOT</version>
+ <version>0.7.0-incubating</version>
</parent>
<modelVersion>4.0.0</modelVersion>
[04/22] incubator-twill git commit: (TWILL-141) Fix namespacing of
ZKClient
Posted by ch...@apache.org.
(TWILL-141) Fix namespacing of ZKClient
- Not to fail with exception when creating “/“ through the
namespaced ZKClient
- Return the correct path in OperationFuture.getRequestPath() for
futures returned from namespaced ZKClient
This closes #64 from GitHub.
Signed-off-by: hsaputra <hs...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/incubator-twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-twill/commit/f88e18f7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-twill/tree/f88e18f7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-twill/diff/f88e18f7
Branch: refs/heads/site
Commit: f88e18f7587aae3528b1e47a22b0b281ff91f95e
Parents: 66402b4
Author: Terence Yim <ch...@apache.org>
Authored: Thu Sep 24 03:22:24 2015 -0700
Committer: hsaputra <hs...@apache.org>
Committed: Fri Sep 25 12:17:53 2015 -0700
----------------------------------------------------------------------
.../internal/zookeeper/NamespaceZKClient.java | 33 ++++++++----
.../zookeeper/SettableOperationFuture.java | 2 +-
.../apache/twill/zookeeper/ZKClientTest.java | 55 ++++++++++++++++++++
3 files changed, 78 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/f88e18f7/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/NamespaceZKClient.java
----------------------------------------------------------------------
diff --git a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/NamespaceZKClient.java b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/NamespaceZKClient.java
index e19bb0a..239a656 100644
--- a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/NamespaceZKClient.java
+++ b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/NamespaceZKClient.java
@@ -70,47 +70,58 @@ public final class NamespaceZKClient extends ForwardingZKClient {
@Override
public OperationFuture<String> create(String path, @Nullable byte[] data,
CreateMode createMode, boolean createParent, Iterable<ACL> acl) {
- return relayPath(delegate.create(namespace + path, data, createMode, createParent, acl),
+ return relayPath(delegate.create(getNamespacedPath(path), data, createMode, createParent, acl),
this.<String>createFuture(path));
}
@Override
public OperationFuture<Stat> exists(String path, @Nullable Watcher watcher) {
- return relayFuture(delegate.exists(namespace + path, watcher), this.<Stat>createFuture(path));
+ return relayFuture(delegate.exists(getNamespacedPath(path), watcher), this.<Stat>createFuture(path));
}
@Override
public OperationFuture<NodeChildren> getChildren(String path, @Nullable Watcher watcher) {
- return relayFuture(delegate.getChildren(namespace + path, watcher), this.<NodeChildren>createFuture(path));
+ return relayFuture(delegate.getChildren(getNamespacedPath(path), watcher), this.<NodeChildren>createFuture(path));
}
@Override
public OperationFuture<NodeData> getData(String path, @Nullable Watcher watcher) {
- return relayFuture(delegate.getData(namespace + path, watcher), this.<NodeData>createFuture(path));
+ return relayFuture(delegate.getData(getNamespacedPath(path), watcher), this.<NodeData>createFuture(path));
}
@Override
public OperationFuture<Stat> setData(String dataPath, byte[] data, int version) {
- return relayFuture(delegate.setData(namespace + dataPath, data, version), this.<Stat>createFuture(dataPath));
+ return relayFuture(delegate.setData(getNamespacedPath(dataPath), data, version), this.<Stat>createFuture(dataPath));
}
@Override
public OperationFuture<String> delete(String deletePath, int version) {
- return relayPath(delegate.delete(namespace + deletePath, version), this.<String>createFuture(deletePath));
+ return relayPath(delegate.delete(getNamespacedPath(deletePath), version), this.<String>createFuture(deletePath));
}
@Override
public OperationFuture<ACLData> getACL(String path) {
- return relayFuture(delegate.getACL(namespace + path), this.<ACLData>createFuture(path));
+ return relayFuture(delegate.getACL(getNamespacedPath(path)), this.<ACLData>createFuture(path));
}
@Override
public OperationFuture<Stat> setACL(String path, Iterable<ACL> acl, int version) {
- return relayFuture(delegate.setACL(namespace + path, acl, version), this.<Stat>createFuture(path));
+ return relayFuture(delegate.setACL(getNamespacedPath(path), acl, version), this.<Stat>createFuture(path));
+ }
+
+ /**
+ * Returns the namespaced path for the given path. The returned path should be used when performing
+ * ZK operations with the delegating ZKClient.
+ */
+ private String getNamespacedPath(String path) {
+ if ("/".equals(path)) {
+ return namespace;
+ }
+ return namespace + path;
}
private <V> SettableOperationFuture<V> createFuture(String path) {
- return SettableOperationFuture.create(namespace + path, Threads.SAME_THREAD_EXECUTOR);
+ return SettableOperationFuture.create(path, Threads.SAME_THREAD_EXECUTOR);
}
private <V> OperationFuture<V> relayFuture(final OperationFuture<V> from, final SettableOperationFuture<V> to) {
@@ -134,8 +145,8 @@ public final class NamespaceZKClient extends ForwardingZKClient {
@Override
public void run() {
try {
- String path = from.get();
- to.set(path.substring(namespace.length()));
+ String relativePath = from.get().substring(namespace.length());
+ to.set(relativePath.isEmpty() ? "/" : relativePath);
} catch (Exception e) {
to.setException(e.getCause());
}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/f88e18f7/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/SettableOperationFuture.java
----------------------------------------------------------------------
diff --git a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/SettableOperationFuture.java b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/SettableOperationFuture.java
index 06f089e..f98b8f6 100644
--- a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/SettableOperationFuture.java
+++ b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/SettableOperationFuture.java
@@ -35,7 +35,7 @@ public final class SettableOperationFuture<V> extends AbstractFuture<V> implemen
private final Executor executor;
public static <V> SettableOperationFuture<V> create(String path, Executor executor) {
- return new SettableOperationFuture<V>(path, executor);
+ return new SettableOperationFuture<>(path, executor);
}
private SettableOperationFuture(String requestPath, Executor executor) {
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/f88e18f7/twill-zookeeper/src/test/java/org/apache/twill/zookeeper/ZKClientTest.java
----------------------------------------------------------------------
diff --git a/twill-zookeeper/src/test/java/org/apache/twill/zookeeper/ZKClientTest.java b/twill-zookeeper/src/test/java/org/apache/twill/zookeeper/ZKClientTest.java
index a9120c3..b9cb8a4 100644
--- a/twill-zookeeper/src/test/java/org/apache/twill/zookeeper/ZKClientTest.java
+++ b/twill-zookeeper/src/test/java/org/apache/twill/zookeeper/ZKClientTest.java
@@ -394,4 +394,59 @@ public class ZKClientTest {
serverThread.interrupt();
}
}
+
+ @Test
+ public void testNamespace() throws ExecutionException, InterruptedException {
+ InMemoryZKServer zkServer = InMemoryZKServer.builder().setTickTime(1000).build();
+ zkServer.startAndWait();
+
+ try {
+ ZKClientService zkClient = ZKClientService.Builder
+ .of(zkServer.getConnectionStr())
+ .build();
+ zkClient.startAndWait();
+
+ ZKClient zk = ZKClients.namespace(zkClient, "/test");
+ // Create the "/ should create the "/test" from the root
+ OperationFuture<String> createFuture = zk.create("/", null, CreateMode.PERSISTENT);
+ // Shouldn't have namespace as prefix for path returned from the future.
+ Assert.assertEquals("/", createFuture.getRequestPath());
+ Assert.assertEquals("/", createFuture.get());
+
+ // Create a path under the namespace
+ createFuture = zk.create("/subpath", null, CreateMode.PERSISTENT);
+ Assert.assertEquals("/subpath", createFuture.getRequestPath());
+ Assert.assertEquals("/subpath", createFuture.get());
+
+ // Check for exists
+ OperationFuture<Stat> existsFuture = zk.exists("/subpath");
+ Assert.assertEquals("/subpath", existsFuture.getRequestPath());
+ Assert.assertNotNull(existsFuture.get());
+
+ // Put some data
+ OperationFuture<Stat> setFuture = zk.setData("/subpath", "hello".getBytes());
+ Assert.assertEquals("/subpath", setFuture.getRequestPath());
+ Assert.assertNotNull(setFuture.get());
+
+ // Read the data back
+ OperationFuture<NodeData> getFuture = zk.getData("/subpath");
+ Assert.assertEquals("/subpath", getFuture.getRequestPath());
+ Assert.assertArrayEquals("hello".getBytes(), getFuture.get().getData());
+
+ // Delete the sub path
+ OperationFuture < String > deleteFuture = zk.delete("/subpath");
+ Assert.assertEquals("/subpath", deleteFuture.getRequestPath());
+ Assert.assertEquals("/subpath", deleteFuture.get());
+
+ // Delete the namespace root
+ deleteFuture = zk.delete("/");
+ Assert.assertEquals("/", deleteFuture.getRequestPath());
+ Assert.assertEquals("/", deleteFuture.get());
+
+ // The namespace must be gone
+ Assert.assertNull(zkClient.exists("/test").get());
+ } finally {
+ zkServer.stopAndWait();
+ }
+ }
}
[07/22] incubator-twill git commit: Remove the if-check in the
YarnTwillPreparer class for log level when setting env key.
Posted by ch...@apache.org.
Remove the if-check in the YarnTwillPreparer class for log level when setting env key.
Add precondition check in the setLogLevel method to prevent setting null early.
This closes #67
Signed-off-by: hsaputra <hs...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/incubator-twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-twill/commit/f6d2b6c4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-twill/tree/f6d2b6c4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-twill/diff/f6d2b6c4
Branch: refs/heads/site
Commit: f6d2b6c427bd7ccec3951cfdeeb2b0e2db12e6da
Parents: e95c6a4
Author: hsaputra <hs...@apache.org>
Authored: Fri Oct 9 13:23:28 2015 -0700
Committer: hsaputra <hs...@apache.org>
Committed: Fri Oct 9 20:31:30 2015 -0700
----------------------------------------------------------------------
.../main/java/org/apache/twill/yarn/YarnTwillPreparer.java | 7 +++----
1 file changed, 3 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/f6d2b6c4/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
index 6da2f8b..d4edfeb 100644
--- a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
+++ b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
@@ -300,6 +300,7 @@ final class YarnTwillPreparer implements TwillPreparer {
@Override
public TwillPreparer setLogLevel(LogEntry.Level logLevel) {
+ Preconditions.checkNotNull(logLevel);
this.logLevel = logLevel;
return this;
}
@@ -350,10 +351,8 @@ final class YarnTwillPreparer implements TwillPreparer {
.put(EnvKeys.TWILL_APP_NAME, twillSpec.getName())
.put(EnvKeys.YARN_RM_SCHEDULER_ADDRESS, yarnConfig.get(YarnConfiguration.RM_SCHEDULER_ADDRESS));
- if (logLevel != null) {
- LOG.debug("Log level is set to {} for the Twill application.", logLevel);
- builder.put(EnvKeys.TWILL_APP_LOG_LEVEL, logLevel.toString());
- }
+ LOG.debug("Log level is set to {} for the Twill application.", logLevel);
+ builder.put(EnvKeys.TWILL_APP_LOG_LEVEL, logLevel.toString());
int memory = Resources.computeMaxHeapSize(appMasterInfo.getMemoryMB(),
Constants.APP_MASTER_RESERVED_MEMORY_MB, Constants.HEAP_MIN_RATIO);
[06/22] incubator-twill git commit: (TWILL-148) Allow setting of env
variables - Added methods to TwillPreparer for setting env for runnables
Posted by ch...@apache.org.
(TWILL-148) Allow setting of env variables - Added methods to TwillPreparer for setting env for runnables
This closes #69 on github
Signed-off-by: Terence Yim <ch...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/incubator-twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-twill/commit/e95c6a49
Tree: http://git-wip-us.apache.org/repos/asf/incubator-twill/tree/e95c6a49
Diff: http://git-wip-us.apache.org/repos/asf/incubator-twill/diff/e95c6a49
Branch: refs/heads/site
Commit: e95c6a495e0faef7569adbeae2d768f57391b44f
Parents: ef8b1ea
Author: Terence Yim <ch...@apache.org>
Authored: Fri Oct 9 18:27:17 2015 -0700
Committer: Terence Yim <ch...@apache.org>
Committed: Fri Oct 9 19:34:02 2015 -0700
----------------------------------------------------------------------
.../org/apache/twill/api/TwillPreparer.java | 20 ++++
.../org/apache/twill/internal/Constants.java | 1 +
.../appmaster/ApplicationMasterService.java | 46 +++++---
.../apache/twill/yarn/YarnTwillPreparer.java | 55 ++++++++-
.../apache/twill/yarn/EnvironmentTestRun.java | 111 +++++++++++++++++++
5 files changed, 219 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/e95c6a49/twill-api/src/main/java/org/apache/twill/api/TwillPreparer.java
----------------------------------------------------------------------
diff --git a/twill-api/src/main/java/org/apache/twill/api/TwillPreparer.java b/twill-api/src/main/java/org/apache/twill/api/TwillPreparer.java
index f60080a..d7d5529 100644
--- a/twill-api/src/main/java/org/apache/twill/api/TwillPreparer.java
+++ b/twill-api/src/main/java/org/apache/twill/api/TwillPreparer.java
@@ -21,6 +21,7 @@ import org.apache.twill.api.logging.LogEntry;
import org.apache.twill.api.logging.LogHandler;
import java.net.URI;
+import java.util.Map;
/**
* This interface exposes methods to set up the Twill runtime environment and start a Twill application.
@@ -179,6 +180,25 @@ public interface TwillPreparer {
TwillPreparer withClassPaths(Iterable<String> classPaths);
/**
+ * Adds the set of environment variables that will be set as container environment variables for all runnables.
+ *
+ * @param env set of environment variables
+ * @return This {@link TwillPreparer}
+ */
+ TwillPreparer withEnv(Map<String, String> env);
+
+ /**
+ * Adds the set of environment variables that will be set as container environment variables for the given runnable.
+ * Environment variables set through this method has higher precedence than the one set through {@link #withEnv(Map)}
+ * if there is a key clash.
+ *
+ * @param runnableName Name of the {@link TwillRunnable}.
+ * @param env set of environment variables
+ * @return This {@link TwillPreparer}
+ */
+ TwillPreparer withEnv(String runnableName, Map<String, String> env);
+
+ /**
* Adds the set of paths to the classpath on the target machine for ApplicationMaster and all runnables.
* @return This {@link TwillPreparer}
*/
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/e95c6a49/twill-core/src/main/java/org/apache/twill/internal/Constants.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/Constants.java b/twill-core/src/main/java/org/apache/twill/internal/Constants.java
index f897bfa..39de851 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/Constants.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/Constants.java
@@ -62,6 +62,7 @@ public final class Constants {
public static final String LOCALIZE_FILES = "localizeFiles.json";
public static final String TWILL_SPEC = "twillSpec.json";
public static final String ARGUMENTS = "arguments.json";
+ public static final String ENVIRONMENTS = "environments.json";
public static final String LOGBACK_TEMPLATE = "logback-template.xml";
public static final String JVM_OPTIONS = "jvm.opts";
public static final String CREDENTIALS = "credentials.store";
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/e95c6a49/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
index 355cea3..e1523d6 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
@@ -25,7 +25,6 @@ import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.DiscreteDomains;
import com.google.common.collect.HashMultiset;
-import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -88,7 +87,9 @@ import java.io.FileReader;
import java.io.IOException;
import java.io.Reader;
import java.util.Collection;
+import java.util.HashMap;
import java.util.Iterator;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
@@ -122,6 +123,7 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp
private final EventHandler eventHandler;
private final Location applicationLocation;
private final PlacementPolicyManager placementPolicyManager;
+ private final Map<String, Map<String, String>> environments;
private volatile boolean stopped;
private Queue<RunnableContainerRequest> runnableContainerRequests;
@@ -140,6 +142,7 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp
this.jvmOpts = loadJvmOptions();
this.reservedMemory = getReservedMemory();
this.placementPolicyManager = new PlacementPolicyManager(twillSpec.getPlacementPolicies());
+ this.environments = getEnvironments();
this.amLiveNode = new ApplicationMasterLiveNodeData(Integer.parseInt(System.getenv(EnvKeys.YARN_APP_ID)),
Long.parseLong(System.getenv(EnvKeys.YARN_APP_ID_CLUSTER_TIME)),
@@ -634,18 +637,22 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp
int containerCount = expectedContainers.getExpected(runnableName);
- ProcessLauncher.PrepareLaunchContext launchContext = processLauncher.prepareLaunch(
- ImmutableMap.<String, String>builder()
- .put(EnvKeys.TWILL_APP_DIR, System.getenv(EnvKeys.TWILL_APP_DIR))
- .put(EnvKeys.TWILL_FS_USER, System.getenv(EnvKeys.TWILL_FS_USER))
- .put(EnvKeys.TWILL_APP_RUN_ID, runId.getId())
- .put(EnvKeys.TWILL_APP_NAME, twillSpec.getName())
- .put(EnvKeys.TWILL_APP_LOG_LEVEL, System.getenv(EnvKeys.TWILL_APP_LOG_LEVEL))
- .put(EnvKeys.TWILL_ZK_CONNECT, zkClient.getConnectString())
- .put(EnvKeys.TWILL_LOG_KAFKA_ZK, getKafkaZKConnect())
- .build()
- , getLocalizeFiles(), credentials
- );
+ // Setup container environment variables
+ Map<String, String> env = new LinkedHashMap<>();
+ if (environments.containsKey(runnableName)) {
+ env.putAll(environments.get(runnableName));
+ }
+ // Override with system env
+ env.put(EnvKeys.TWILL_APP_DIR, System.getenv(EnvKeys.TWILL_APP_DIR));
+ env.put(EnvKeys.TWILL_FS_USER, System.getenv(EnvKeys.TWILL_FS_USER));
+ env.put(EnvKeys.TWILL_APP_RUN_ID, runId.getId());
+ env.put(EnvKeys.TWILL_APP_NAME, twillSpec.getName());
+ env.put(EnvKeys.TWILL_APP_LOG_LEVEL, System.getenv(EnvKeys.TWILL_APP_LOG_LEVEL));
+ env.put(EnvKeys.TWILL_ZK_CONNECT, zkClient.getConnectString());
+ env.put(EnvKeys.TWILL_LOG_KAFKA_ZK, getKafkaZKConnect());
+
+ ProcessLauncher.PrepareLaunchContext launchContext = processLauncher.prepareLaunch(env, getLocalizeFiles(),
+ credentials);
TwillContainerLauncher launcher = new TwillContainerLauncher(
twillSpec.getRunnables().get(runnableName), processLauncher.getContainerInfo(), launchContext,
@@ -679,6 +686,19 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp
}
}
+ private Map<String, Map<String, String>> getEnvironments() {
+ File envFile = new File(Constants.Files.ENVIRONMENTS);
+ if (!envFile.exists()) {
+ return new HashMap<>();
+ }
+
+ try (Reader reader = Files.newReader(envFile, Charsets.UTF_8)) {
+ return new Gson().fromJson(reader, new TypeToken<Map<String, Map<String, String>>>() { }.getType());
+ } catch (IOException e) {
+ throw Throwables.propagate(e);
+ }
+ }
+
private String getZKNamespace(String runnableName) {
return String.format("/%s/runnables/%s", runId.getId(), runnableName);
}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/e95c6a49/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
index ea77116..6da2f8b 100644
--- a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
+++ b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
@@ -36,6 +36,7 @@ import com.google.common.collect.Sets;
import com.google.common.io.ByteStreams;
import com.google.common.io.OutputSupplier;
import com.google.common.reflect.TypeToken;
+import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
@@ -52,7 +53,6 @@ import org.apache.twill.api.TwillPreparer;
import org.apache.twill.api.TwillSpecification;
import org.apache.twill.api.logging.LogEntry;
import org.apache.twill.api.logging.LogHandler;
-import org.apache.twill.filesystem.HDFSLocationFactory;
import org.apache.twill.filesystem.Location;
import org.apache.twill.filesystem.LocationFactory;
import org.apache.twill.internal.ApplicationBundler;
@@ -97,6 +97,8 @@ import java.io.Writer;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -125,6 +127,7 @@ final class YarnTwillPreparer implements TwillPreparer {
private final List<URI> resources = Lists.newArrayList();
private final List<String> classPaths = Lists.newArrayList();
private final ListMultimap<String, String> runnableArgs = ArrayListMultimap.create();
+ private final Map<String, Map<String, String>> environments = new HashMap<>();
private final List<String> applicationClassPaths = Lists.newArrayList();
private final Credentials credentials;
private final int reservedMemory;
@@ -214,6 +217,8 @@ final class YarnTwillPreparer implements TwillPreparer {
@Override
public TwillPreparer withArguments(String runnableName, Iterable<String> args) {
+ Preconditions.checkArgument(twillSpec.getRunnables().containsKey(runnableName),
+ "Runnable %s is not defined in the application.", runnableName);
runnableArgs.putAll(runnableName, args);
return this;
}
@@ -252,6 +257,23 @@ final class YarnTwillPreparer implements TwillPreparer {
}
@Override
+ public TwillPreparer withEnv(Map<String, String> env) {
+ // Add the given environments to all runnables
+ for (String runnableName : twillSpec.getRunnables().keySet()) {
+ setEnv(runnableName, env, false);
+ }
+ return this;
+ }
+
+ @Override
+ public TwillPreparer withEnv(String runnableName, Map<String, String> env) {
+ Preconditions.checkArgument(twillSpec.getRunnables().containsKey(runnableName),
+ "Runnable %s is not defined in the application.", runnableName);
+ setEnv(runnableName, env, true);
+ return this;
+ }
+
+ @Override
public TwillPreparer withApplicationClassPaths(String... classPaths) {
return withApplicationClassPaths(ImmutableList.copyOf(classPaths));
}
@@ -306,6 +328,7 @@ final class YarnTwillPreparer implements TwillPreparer {
saveLauncher(localFiles);
saveJvmOptions(localFiles);
saveArguments(new Arguments(arguments, runnableArgs), localFiles);
+ saveEnvironments(localFiles);
saveLocalFiles(localFiles, ImmutableSet.of(Constants.Files.TWILL_SPEC,
Constants.Files.LOGBACK_TEMPLATE,
Constants.Files.CONTAINER_JAR,
@@ -360,6 +383,21 @@ final class YarnTwillPreparer implements TwillPreparer {
}
}
+ private void setEnv(String runnableName, Map<String, String> env, boolean overwrite) {
+ Map<String, String> environment = environments.get(runnableName);
+ if (environment == null) {
+ environment = new LinkedHashMap<>(env);
+ environments.put(runnableName, environment);
+ return;
+ }
+
+ for (Map.Entry<String, String> entry : env.entrySet()) {
+ if (overwrite || !environment.containsKey(entry.getKey())) {
+ environment.put(entry.getKey(), entry.getValue());
+ }
+ }
+ }
+
private Credentials createCredentials() {
Credentials credentials = new Credentials();
@@ -596,6 +634,21 @@ final class YarnTwillPreparer implements TwillPreparer {
localFiles.put(Constants.Files.ARGUMENTS, createLocalFile(Constants.Files.ARGUMENTS, location));
}
+ private void saveEnvironments(Map<String, LocalFile> localFiles) throws IOException {
+ if (environments.isEmpty()) {
+ return;
+ }
+
+ LOG.debug("Create and copy {}", Constants.Files.ENVIRONMENTS);
+ final Location location = createTempLocation(Constants.Files.ENVIRONMENTS);
+ try (Writer writer = new OutputStreamWriter(location.getOutputStream(), Charsets.UTF_8)) {
+ new Gson().toJson(environments, writer);
+ }
+ LOG.debug("Done {}", Constants.Files.ENVIRONMENTS);
+
+ localFiles.put(Constants.Files.ENVIRONMENTS, createLocalFile(Constants.Files.ENVIRONMENTS, location));
+ }
+
/**
* Serializes the list of files that needs to localize from AM to Container.
*/
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/e95c6a49/twill-yarn/src/test/java/org/apache/twill/yarn/EnvironmentTestRun.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/EnvironmentTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/EnvironmentTestRun.java
new file mode 100644
index 0000000..4309cb4
--- /dev/null
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/EnvironmentTestRun.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.yarn;
+
+import com.google.common.base.Charsets;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.io.LineReader;
+import com.google.common.util.concurrent.SettableFuture;
+import org.apache.twill.api.TwillApplication;
+import org.apache.twill.api.TwillController;
+import org.apache.twill.api.TwillRunner;
+import org.apache.twill.api.TwillSpecification;
+import org.apache.twill.api.logging.PrinterLogHandler;
+import org.apache.twill.common.Threads;
+import org.apache.twill.discovery.Discoverable;
+import org.apache.twill.discovery.ServiceDiscovered;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.net.Socket;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Unit test for testing environment settings.
+ */
+public class EnvironmentTestRun extends BaseYarnTest {
+
+ @Test
+ public void testEnv() throws Exception {
+ TwillRunner runner = getTwillRunner();
+
+ TwillController controller = runner.prepare(new EchoApp())
+ .addLogHandler(new PrinterLogHandler(new PrintWriter(System.out, true)))
+ .withApplicationArguments("echo")
+ .withArguments("echo1", "echo1")
+ .withArguments("echo2", "echo2")
+ .withEnv(ImmutableMap.of("GREETING", "Hello"))
+ .withEnv("echo2", ImmutableMap.of("GREETING", "Hello2"))
+ .start();
+
+ // Service echo1 should returns "Hello" as greeting, echo2 should returns "Hello2"
+ Map<String, String> runnableGreetings = ImmutableMap.of("echo1", "Hello", "echo2", "Hello2");
+ for (Map.Entry<String, String> entry : runnableGreetings.entrySet()) {
+ Discoverable discoverable = getDiscoverable(controller.discoverService(entry.getKey()), 60, TimeUnit.SECONDS);
+ try (
+ Socket socket = new Socket(discoverable.getSocketAddress().getAddress(),
+ discoverable.getSocketAddress().getPort())
+ ) {
+ PrintWriter writer = new PrintWriter(new OutputStreamWriter(socket.getOutputStream(), Charsets.UTF_8), true);
+ LineReader reader = new LineReader(new InputStreamReader(socket.getInputStream(), Charsets.UTF_8));
+
+ writer.println("GREETING");
+ Assert.assertEquals(entry.getValue(), reader.readLine());
+ }
+ }
+
+ controller.terminate().get();
+ }
+
+ private Discoverable getDiscoverable(ServiceDiscovered serviceDiscovered,
+ long timeout, TimeUnit unit) throws Exception {
+ final SettableFuture<Discoverable> completion = SettableFuture.create();
+ serviceDiscovered.watchChanges(new ServiceDiscovered.ChangeListener() {
+ @Override
+ public void onChange(ServiceDiscovered serviceDiscovered) {
+ Iterator<Discoverable> itor = serviceDiscovered.iterator();
+ if (itor.hasNext()) {
+ completion.set(itor.next());
+ }
+ }
+ }, Threads.SAME_THREAD_EXECUTOR);
+ return completion.get(timeout, unit);
+ }
+
+ /**
+ * Application to add two {@link EnvironmentEchoServer} for testing.
+ */
+ public static final class EchoApp implements TwillApplication {
+
+ @Override
+ public TwillSpecification configure() {
+ return TwillSpecification.Builder.with()
+ .setName("EchoApp")
+ .withRunnable()
+ .add("echo1", new EnvironmentEchoServer()).noLocalFiles()
+ .add("echo2", new EnvironmentEchoServer()).noLocalFiles()
+ .anyOrder()
+ .build();
+ }
+ }
+}
[22/22] incubator-twill git commit: Merge branch 'master' into site
Posted by ch...@apache.org.
Merge branch 'master' into site
Project: http://git-wip-us.apache.org/repos/asf/incubator-twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-twill/commit/87e2b465
Tree: http://git-wip-us.apache.org/repos/asf/incubator-twill/tree/87e2b465
Diff: http://git-wip-us.apache.org/repos/asf/incubator-twill/diff/87e2b465
Branch: refs/heads/site
Commit: 87e2b4652ec6da690a8dbad6de88080bc9cf6154
Parents: 4b7c848 180e446
Author: Terence Yim <ch...@apache.org>
Authored: Tue Jan 26 11:15:56 2016 -0800
Committer: Terence Yim <ch...@apache.org>
Committed: Tue Jan 26 11:15:56 2016 -0800
----------------------------------------------------------------------
NOTICE | 2 +-
pom.xml | 4 +-
twill-api/pom.xml | 2 +-
.../org/apache/twill/api/TwillPreparer.java | 20 ++
twill-common/pom.xml | 2 +-
.../apache/twill/filesystem/LocalLocation.java | 15 +-
.../twill/filesystem/LocalLocationFactory.java | 4 +-
.../org/apache/twill/internal/Constants.java | 83 +++++++
twill-core/pom.xml | 2 +-
.../twill/internal/AbstractTwillService.java | 2 +-
.../internal/AbstractZKServiceController.java | 3 +-
.../org/apache/twill/internal/Constants.java | 76 -------
.../apache/twill/internal/ContainerInfo.java | 15 +-
.../apache/twill/internal/ProcessLauncher.java | 2 +-
.../twill/internal/ResourceCapability.java | 34 +++
.../twill/internal/TwillContainerLauncher.java | 19 +-
.../internal/json/TwillRunResourcesCodec.java | 14 +-
.../kafka/client/SimpleKafkaConsumer.java | 63 +++++-
.../apache/twill/internal/utils/Resources.java | 46 ++++
.../apache/twill/launcher/TwillLauncher.java | 36 ++-
.../apache/twill/kafka/client/KafkaTest.java | 68 ++++++
twill-discovery-api/pom.xml | 2 +-
twill-discovery-core/pom.xml | 2 +-
.../twill/discovery/ZKDiscoveryService.java | 4 +-
.../twill/discovery/ZKDiscoveryServiceTest.java | 2 +-
twill-examples/echo/pom.xml | 2 +-
twill-examples/pom.xml | 2 +-
twill-examples/yarn/pom.xml | 2 +-
twill-ext/pom.xml | 2 +-
twill-java8-test/pom.xml | 2 +-
twill-yarn/pom.xml | 2 +-
.../internal/yarn/Hadoop20YarnAppClient.java | 23 +-
.../internal/yarn/Hadoop21YarnAppClient.java | 24 +-
.../twill/filesystem/FileContextLocation.java | 219 +++++++++++++++++++
.../filesystem/FileContextLocationFactory.java | 119 ++++++++++
.../apache/twill/filesystem/HDFSLocation.java | 4 +-
.../twill/filesystem/HDFSLocationFactory.java | 18 +-
.../org/apache/twill/internal/ServiceMain.java | 40 ++--
.../appmaster/ApplicationMasterInfo.java | 63 ++++++
.../appmaster/ApplicationMasterMain.java | 89 +++++++-
.../ApplicationMasterProcessLauncher.java | 24 +-
.../appmaster/ApplicationMasterService.java | 74 +++++--
.../appmaster/ApplicationSubmitter.java | 3 +-
.../internal/container/TwillContainerMain.java | 5 +-
.../yarn/AbstractYarnProcessLauncher.java | 3 +-
.../twill/internal/yarn/YarnAppClient.java | 11 +-
.../apache/twill/yarn/YarnTwillPreparer.java | 87 ++++++--
.../twill/yarn/YarnTwillRunnerService.java | 4 +-
.../filesystem/FileContextLocationTest.java | 50 +++++
.../twill/filesystem/HDFSLocationTest.java | 6 +-
.../twill/filesystem/LocalLocationTest.java | 17 +-
.../twill/filesystem/LocationTestBase.java | 68 +++++-
.../org/apache/twill/yarn/BaseYarnTest.java | 4 +
.../apache/twill/yarn/ContainerSizeTestRun.java | 83 ++++++-
.../apache/twill/yarn/EchoServerTestRun.java | 68 +++++-
.../apache/twill/yarn/EnvironmentTestRun.java | 111 ++++++++++
.../twill/yarn/ServiceDiscoveryTestRun.java | 91 ++++----
.../org/apache/twill/yarn/SocketServer.java | 13 +-
.../java/org/apache/twill/yarn/TwillTester.java | 4 +
.../org/apache/twill/yarn/YarnTestSuite.java | 32 +--
twill-zookeeper/pom.xml | 2 +-
.../internal/zookeeper/NamespaceZKClient.java | 33 ++-
.../zookeeper/SettableOperationFuture.java | 2 +-
.../apache/twill/zookeeper/ZKClientTest.java | 75 ++++++-
64 files changed, 1630 insertions(+), 373 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/87e2b465/pom.xml
----------------------------------------------------------------------
[16/22] incubator-twill git commit: (TWILL-151) Improve Logging error
when fetching message after Kafka server is stopped
Posted by ch...@apache.org.
(TWILL-151) Improve Logging error when fetching message after Kafka server is stopped
This closes #65 on GitHub.
Signed-off-by: Terence Yim <ch...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/incubator-twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-twill/commit/323e5af4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-twill/tree/323e5af4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-twill/diff/323e5af4
Branch: refs/heads/site
Commit: 323e5af4e536f3b5f306a5c59cd6787bf3a99818
Parents: b740da4
Author: sanojkodikkara <sa...@gmail.com>
Authored: Wed Sep 30 16:23:08 2015 +0530
Committer: Terence Yim <ch...@apache.org>
Committed: Wed Jan 6 10:55:36 2016 -0800
----------------------------------------------------------------------
.../internal/kafka/client/SimpleKafkaConsumer.java | 15 ++++++++++++---
1 file changed, 12 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/323e5af4/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java
index 8cfe889..0299e56 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java
@@ -28,6 +28,7 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.Futures;
+
import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder;
import kafka.api.PartitionOffsetRequestInfo;
@@ -39,6 +40,7 @@ import kafka.javaapi.OffsetResponse;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.javaapi.message.ByteBufferMessageSet;
import kafka.message.MessageAndOffset;
+
import org.apache.twill.common.Cancellable;
import org.apache.twill.common.Threads;
import org.apache.twill.kafka.client.BrokerInfo;
@@ -49,6 +51,7 @@ import org.apache.twill.kafka.client.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.net.ConnectException;
import java.nio.channels.ClosedByInterruptException;
import java.util.Iterator;
import java.util.List;
@@ -374,9 +377,15 @@ final class SimpleKafkaConsumer implements KafkaConsumer {
invokeCallback(messages, offset);
backoff.reset();
} catch (Throwable t) {
- if (running || !(t instanceof ClosedByInterruptException)) {
- // Only log if it is still running, otherwise, it just the interrupt caused by the stop.
- LOG.info("Exception when fetching message on {}.", topicPart, t);
+ // Only log if it is still running, otherwise, it just the interrupt caused by the stop.
+ if (!running) {
+ LOG.debug("Unable to fetch messages on {}, kafka consumer service shutdown is in progress.", topicPart);
+ } else {
+ if (t instanceof ClosedByInterruptException || t instanceof ConnectException) {
+ LOG.debug("Unable to fetch messages on {}, kafka server shutdown is in progress.", topicPart);
+ } else {
+ LOG.info("Exception when fetching message on {}.", topicPart, t);
+ }
backoff.backoff();
}
consumers.refresh(consumerEntry.getKey());
[09/22] incubator-twill git commit: Added missing test to
YarnTestSuite.
Posted by ch...@apache.org.
Added missing test to YarnTestSuite.
Project: http://git-wip-us.apache.org/repos/asf/incubator-twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-twill/commit/161032a2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-twill/tree/161032a2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-twill/diff/161032a2
Branch: refs/heads/site
Commit: 161032a209535bf08265df7107a345b15592d886
Parents: e4a3676
Author: Terence Yim <ch...@apache.org>
Authored: Fri Oct 23 14:09:24 2015 -0700
Committer: Terence Yim <ch...@apache.org>
Committed: Fri Oct 23 14:09:24 2015 -0700
----------------------------------------------------------------------
.../org/apache/twill/yarn/YarnTestSuite.java | 32 +++++++++++---------
1 file changed, 17 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/161032a2/twill-yarn/src/test/java/org/apache/twill/yarn/YarnTestSuite.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/YarnTestSuite.java b/twill-yarn/src/test/java/org/apache/twill/yarn/YarnTestSuite.java
index d9efe3c..bea99d0 100644
--- a/twill-yarn/src/test/java/org/apache/twill/yarn/YarnTestSuite.java
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/YarnTestSuite.java
@@ -25,21 +25,23 @@ import org.junit.runners.Suite;
*/
@RunWith(Suite.class)
@Suite.SuiteClasses({
- PlacementPolicyTestRun.class,
- EchoServerTestRun.class,
- ResourceReportTestRun.class,
- TaskCompletedTestRun.class,
- DistributeShellTestRun.class,
- LocalFileTestRun.class,
- FailureRestartTestRun.class,
- ProvisionTimeoutTestRun.class,
- LogHandlerTestRun.class,
- SessionExpireTestRun.class,
- ServiceDiscoveryTestRun.class,
- DebugTestRun.class,
- ContainerSizeTestRun.class,
- InitializeFailTestRun.class
- })
+ ContainerSizeTestRun.class,
+ DebugTestRun.class,
+ DistributeShellTestRun.class,
+ EchoServerTestRun.class,
+ EnvironmentTestRun.class,
+ FailureRestartTestRun.class,
+ InitializeFailTestRun.class,
+ LocalFileTestRun.class,
+ LogHandlerTestRun.class,
+ LogLevelTestRun.class,
+ PlacementPolicyTestRun.class,
+ ProvisionTimeoutTestRun.class,
+ ResourceReportTestRun.class,
+ ServiceDiscoveryTestRun.class,
+ SessionExpireTestRun.class,
+ TaskCompletedTestRun.class
+})
public final class YarnTestSuite extends BaseYarnTest {
}
[11/22] incubator-twill git commit: (TWILL-156) use Files.move
instead of File.renameTo so we can have options to replace existing files and
perform atomic move, this allows us to support windows rename
Posted by ch...@apache.org.
(TWILL-156) use Files.move instead of File.renameTo so we can have options to replace existing files and perform atomic move, this allows us to support windows rename
This closes #72 on GitHub.
Signed-off-by: Terence Yim <ch...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/incubator-twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-twill/commit/359b12b9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-twill/tree/359b12b9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-twill/diff/359b12b9
Branch: refs/heads/site
Commit: 359b12b904e0c52c54453c6afc5ef578d484c0a5
Parents: 87b063c
Author: shankar <sh...@cask.co>
Authored: Fri Dec 4 15:46:59 2015 -0800
Committer: Terence Yim <ch...@apache.org>
Committed: Fri Dec 4 23:56:58 2015 -0800
----------------------------------------------------------------------
.../java/org/apache/twill/filesystem/LocalLocation.java | 11 ++++++++---
1 file changed, 8 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/359b12b9/twill-common/src/main/java/org/apache/twill/filesystem/LocalLocation.java
----------------------------------------------------------------------
diff --git a/twill-common/src/main/java/org/apache/twill/filesystem/LocalLocation.java b/twill-common/src/main/java/org/apache/twill/filesystem/LocalLocation.java
index a560694..a873545 100644
--- a/twill-common/src/main/java/org/apache/twill/filesystem/LocalLocation.java
+++ b/twill-common/src/main/java/org/apache/twill/filesystem/LocalLocation.java
@@ -25,6 +25,9 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardCopyOption;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Deque;
@@ -176,9 +179,11 @@ final class LocalLocation implements Location {
@Override
public Location renameTo(Location destination) throws IOException {
// destination will always be of the same type as this location
- boolean success = file.renameTo(((LocalLocation) destination).file);
- if (success) {
- return new LocalLocation(locationFactory, ((LocalLocation) destination).file);
+ Path target = Files.move(file.toPath(), ((LocalLocation) destination).file.toPath(),
+ StandardCopyOption.ATOMIC_MOVE, StandardCopyOption.REPLACE_EXISTING);
+
+ if (target != null) {
+ return new LocalLocation(locationFactory, target.toFile());
} else {
return null;
}
[03/22] incubator-twill git commit: (TWILL-153) Honor actual resource
size of the container
Posted by ch...@apache.org.
(TWILL-153) Honor actual resource size of the container
- Determine the -Xmx based on the actual size of the container
- it can be smaller or bigger than the one requested in the TwillSpec
- Refactor resource specification for AM
- A forward looking change for TWILL-90
- Simple code cleanup to get rid of code warning from IDE.
- Fix a easy to fail test - ZKClientTest.testExpireRewatch()
This closes #63 in Github
Signed-off-by: Terence Yim <ch...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/incubator-twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-twill/commit/66402b4f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-twill/tree/66402b4f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-twill/diff/66402b4f
Branch: refs/heads/site
Commit: 66402b4f234290194e6c9c8de3d7edf84aad22ff
Parents: d2d9081
Author: Terence Yim <ch...@apache.org>
Authored: Tue Sep 22 14:35:26 2015 -0700
Committer: Terence Yim <ch...@apache.org>
Committed: Wed Sep 23 22:34:45 2015 -0700
----------------------------------------------------------------------
.../org/apache/twill/internal/Constants.java | 5 +-
.../apache/twill/internal/ContainerInfo.java | 15 ++--
.../apache/twill/internal/ProcessLauncher.java | 2 +-
.../twill/internal/ResourceCapability.java | 34 ++++++++
.../twill/internal/TwillContainerLauncher.java | 19 ++---
.../internal/json/TwillRunResourcesCodec.java | 14 ++--
.../apache/twill/internal/utils/Resources.java | 46 +++++++++++
.../internal/yarn/Hadoop20YarnAppClient.java | 23 ++++--
.../internal/yarn/Hadoop21YarnAppClient.java | 24 ++++--
.../appmaster/ApplicationMasterInfo.java | 63 +++++++++++++++
.../ApplicationMasterProcessLauncher.java | 24 ++----
.../appmaster/ApplicationMasterService.java | 24 ++++--
.../appmaster/ApplicationSubmitter.java | 3 +-
.../yarn/AbstractYarnProcessLauncher.java | 3 +-
.../twill/internal/yarn/YarnAppClient.java | 11 +--
.../apache/twill/yarn/YarnTwillPreparer.java | 14 ++--
.../apache/twill/yarn/ContainerSizeTestRun.java | 83 +++++++++++++++++++-
.../apache/twill/zookeeper/ZKClientTest.java | 20 +++--
18 files changed, 339 insertions(+), 88 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/66402b4f/twill-core/src/main/java/org/apache/twill/internal/Constants.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/Constants.java b/twill-core/src/main/java/org/apache/twill/internal/Constants.java
index defd013..f897bfa 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/Constants.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/Constants.java
@@ -37,14 +37,13 @@ public final class Constants {
*/
public static final int CONSTRAINED_PROVISION_REQUEST_TIMEOUT = 5000;
+ public static final double HEAP_MIN_RATIO = 0.7d;
+
/** Memory size of AM. */
public static final int APP_MASTER_MEMORY_MB = 512;
public static final int APP_MASTER_RESERVED_MEMORY_MB = 150;
- public static final String STDOUT = "stdout";
- public static final String STDERR = "stderr";
-
public static final String CLASSPATH = "classpath";
public static final String APPLICATION_CLASSPATH = "application-classpath";
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/66402b4f/twill-core/src/main/java/org/apache/twill/internal/ContainerInfo.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/ContainerInfo.java b/twill-core/src/main/java/org/apache/twill/internal/ContainerInfo.java
index 67c21d3..5c93ede 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/ContainerInfo.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/ContainerInfo.java
@@ -22,15 +22,20 @@ import java.net.InetAddress;
/**
* Represents information of the container that the processing is/will be running in.
*/
-public interface ContainerInfo {
+public interface ContainerInfo extends ResourceCapability {
+ /**
+ * Returns the ID of the container.
+ */
String getId();
+ /**
+ * Returns the host information of the container.
+ */
InetAddress getHost();
+ /**
+ * Returns the port for communicating to the container host.
+ */
int getPort();
-
- int getMemoryMB();
-
- int getVirtualCores();
}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/66402b4f/twill-core/src/main/java/org/apache/twill/internal/ProcessLauncher.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/ProcessLauncher.java b/twill-core/src/main/java/org/apache/twill/internal/ProcessLauncher.java
index d0f289b..beb8e6b 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/ProcessLauncher.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/ProcessLauncher.java
@@ -26,7 +26,7 @@ import java.util.Map;
*
* @param <T> Type of the object that contains information about the container that the process is going to launch.
*/
-public interface ProcessLauncher<T> {
+public interface ProcessLauncher<T extends ResourceCapability> {
/**
* Returns information about the container that this launch would launch process in.
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/66402b4f/twill-core/src/main/java/org/apache/twill/internal/ResourceCapability.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/ResourceCapability.java b/twill-core/src/main/java/org/apache/twill/internal/ResourceCapability.java
new file mode 100644
index 0000000..2cdd080
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/ResourceCapability.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal;
+
+/**
+ * Represents information about compute resources capability.
+ */
+public interface ResourceCapability {
+
+ /**
+ * Returns memory size in MB.
+ */
+ int getMemoryMB();
+
+ /**
+ * Returns the number of virtual cpu cores.
+ */
+ int getVirtualCores();
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/66402b4f/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java b/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java
index 0667bb4..d01db4b 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java
@@ -27,6 +27,7 @@ import org.apache.twill.api.RunId;
import org.apache.twill.api.RuntimeSpecification;
import org.apache.twill.filesystem.Location;
import org.apache.twill.internal.state.Message;
+import org.apache.twill.internal.utils.Resources;
import org.apache.twill.launcher.FindFreePort;
import org.apache.twill.launcher.TwillLauncher;
import org.apache.twill.zookeeper.NodeData;
@@ -46,9 +47,8 @@ public final class TwillContainerLauncher {
private static final Logger LOG = LoggerFactory.getLogger(TwillContainerLauncher.class);
- private static final double HEAP_MIN_RATIO = 0.7d;
-
private final RuntimeSpecification runtimeSpec;
+ private final ContainerInfo containerInfo;
private final ProcessLauncher.PrepareLaunchContext launchContext;
private final ZKClient zkClient;
private final int instanceCount;
@@ -56,10 +56,12 @@ public final class TwillContainerLauncher {
private final int reservedMemory;
private final Location secureStoreLocation;
- public TwillContainerLauncher(RuntimeSpecification runtimeSpec, ProcessLauncher.PrepareLaunchContext launchContext,
+ public TwillContainerLauncher(RuntimeSpecification runtimeSpec, ContainerInfo containerInfo,
+ ProcessLauncher.PrepareLaunchContext launchContext,
ZKClient zkClient, int instanceCount, JvmOptions jvmOpts, int reservedMemory,
Location secureStoreLocation) {
this.runtimeSpec = runtimeSpec;
+ this.containerInfo = containerInfo;
this.launchContext = launchContext;
this.zkClient = zkClient;
this.instanceCount = instanceCount;
@@ -98,15 +100,6 @@ public final class TwillContainerLauncher {
LOG.warn("Failed to launch container with secure store {}.", secureStoreLocation);
}
- int memory = runtimeSpec.getResourceSpecification().getMemorySize();
- if (((double) (memory - reservedMemory) / memory) >= HEAP_MIN_RATIO) {
- // Reduce -Xmx by the reserved memory size.
- memory = runtimeSpec.getResourceSpecification().getMemorySize() - reservedMemory;
- } else {
- // If it is a small VM, just discount it by the min ratio.
- memory = (int) Math.ceil(memory * HEAP_MIN_RATIO);
- }
-
// Currently no reporting is supported for runnable containers
launchContext
.addEnvironment(EnvKeys.TWILL_RUN_ID, runId.getId())
@@ -135,6 +128,8 @@ public final class TwillContainerLauncher {
} else {
firstCommand = "$JAVA_HOME/bin/java";
}
+
+ int memory = Resources.computeMaxHeapSize(containerInfo.getMemoryMB(), reservedMemory, Constants.HEAP_MIN_RATIO);
commandBuilder.add("-Djava.io.tmpdir=tmp",
"-Dyarn.container=$" + EnvKeys.YARN_CONTAINER_ID,
"-Dtwill.runnable=$" + EnvKeys.TWILL_APP_NAME + ".$" + EnvKeys.TWILL_RUNNABLE_NAME,
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/66402b4f/twill-core/src/main/java/org/apache/twill/internal/json/TwillRunResourcesCodec.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/json/TwillRunResourcesCodec.java b/twill-core/src/main/java/org/apache/twill/internal/json/TwillRunResourcesCodec.java
index 9a6f555..7dea371 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/json/TwillRunResourcesCodec.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/json/TwillRunResourcesCodec.java
@@ -35,13 +35,13 @@ import java.lang.reflect.Type;
*/
public final class TwillRunResourcesCodec implements JsonSerializer<TwillRunResources>,
JsonDeserializer<TwillRunResources> {
- private final String CONTAINER_ID = "containerId";
- private final String INSTANCE_ID = "instanceId";
- private final String HOST = "host";
- private final String MEMORY_MB = "memoryMB";
- private final String VIRTUAL_CORES = "virtualCores";
- private final String DEBUG_PORT = "debugPort";
- private final String LOG_LEVEL = "logLevel";
+ private static final String CONTAINER_ID = "containerId";
+ private static final String INSTANCE_ID = "instanceId";
+ private static final String HOST = "host";
+ private static final String MEMORY_MB = "memoryMB";
+ private static final String VIRTUAL_CORES = "virtualCores";
+ private static final String DEBUG_PORT = "debugPort";
+ private static final String LOG_LEVEL = "logLevel";
@Override
public JsonElement serialize(TwillRunResources src, Type typeOfSrc, JsonSerializationContext context) {
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/66402b4f/twill-core/src/main/java/org/apache/twill/internal/utils/Resources.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/utils/Resources.java b/twill-core/src/main/java/org/apache/twill/internal/utils/Resources.java
new file mode 100644
index 0000000..2ebb789
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/utils/Resources.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.utils;
+
+/**
+ * Utility class to help adjusting container resource requirement.
+ */
+public final class Resources {
+
+ /**
+ * Computes the max heap size for a JVM process.
+ *
+ * @param containerMemory memory in MB of the container memory.
+ * It is the maximum memory size allowed for the process.
+ * @param nonHeapMemory memory in MB that needs to be reserved for non JVM heap memory for the process.
+ * @param minHeapRatio minimum ratio for heap to non-heap memory.
+ * @return memory in MB representing the max heap size for a JVM process.
+ */
+ public static int computeMaxHeapSize(int containerMemory, int nonHeapMemory, double minHeapRatio) {
+ if (((double) (containerMemory - nonHeapMemory) / containerMemory) >= minHeapRatio) {
+ // Reduce -Xmx by the reserved memory size.
+ return containerMemory - nonHeapMemory;
+ } else {
+ // If it is a small VM, just discount it by the min ratio.
+ return (int) Math.ceil(containerMemory * minHeapRatio);
+ }
+ }
+
+ private Resources() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/66402b4f/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAppClient.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAppClient.java b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAppClient.java
index 3afa49a..bfa494c 100644
--- a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAppClient.java
+++ b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAppClient.java
@@ -38,8 +38,10 @@ import org.apache.hadoop.yarn.client.YarnClientImpl;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.util.Records;
import org.apache.twill.api.TwillSpecification;
+import org.apache.twill.internal.Constants;
import org.apache.twill.internal.ProcessController;
import org.apache.twill.internal.ProcessLauncher;
+import org.apache.twill.internal.appmaster.ApplicationMasterInfo;
import org.apache.twill.internal.appmaster.ApplicationMasterProcessLauncher;
import org.apache.twill.internal.appmaster.ApplicationSubmitter;
import org.slf4j.Logger;
@@ -70,8 +72,8 @@ public final class Hadoop20YarnAppClient extends AbstractIdleService implements
}
@Override
- public ProcessLauncher<ApplicationId> createLauncher(TwillSpecification twillSpec,
- @Nullable String schedulerQueue) throws Exception {
+ public ProcessLauncher<ApplicationMasterInfo> createLauncher(TwillSpecification twillSpec,
+ @Nullable String schedulerQueue) throws Exception {
// Request for new application
final GetNewApplicationResponse response = yarnClient.getNewApplication();
final ApplicationId appId = response.getApplicationId();
@@ -86,10 +88,17 @@ public final class Hadoop20YarnAppClient extends AbstractIdleService implements
appSubmissionContext.setQueue(schedulerQueue);
}
+ // TODO: Make it adjustable through TwillSpec (TWILL-90)
+ // Set the resource requirement for AM
+ Resource amResource = Records.newRecord(Resource.class);
+ amResource.setMemory(Constants.APP_MASTER_MEMORY_MB);
+ final Resource capability = adjustMemory(response, amResource);
+ ApplicationMasterInfo appMasterInfo = new ApplicationMasterInfo(appId, capability.getMemory(), 1);
+
ApplicationSubmitter submitter = new ApplicationSubmitter() {
@Override
- public ProcessController<YarnApplicationReport> submit(YarnLaunchContext launchContext, Resource capability) {
+ public ProcessController<YarnApplicationReport> submit(YarnLaunchContext launchContext) {
ContainerLaunchContext context = launchContext.getLaunchContext();
addRMToken(context);
context.setUser(appSubmissionContext.getUser());
@@ -106,7 +115,7 @@ public final class Hadoop20YarnAppClient extends AbstractIdleService implements
}
};
- return new ApplicationMasterProcessLauncher(appId, submitter);
+ return new ApplicationMasterProcessLauncher(appMasterInfo, submitter);
}
private Resource adjustMemory(GetNewApplicationResponse response, Resource capability) {
@@ -158,9 +167,9 @@ public final class Hadoop20YarnAppClient extends AbstractIdleService implements
}
@Override
- public ProcessLauncher<ApplicationId> createLauncher(String user,
- TwillSpecification twillSpec,
- @Nullable String schedulerQueue) throws Exception {
+ public ProcessLauncher<ApplicationMasterInfo> createLauncher(String user,
+ TwillSpecification twillSpec,
+ @Nullable String schedulerQueue) throws Exception {
this.user = user;
return createLauncher(twillSpec, schedulerQueue);
}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/66402b4f/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAppClient.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAppClient.java b/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAppClient.java
index 046e3f1..6fd11e5 100644
--- a/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAppClient.java
+++ b/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAppClient.java
@@ -35,8 +35,10 @@ import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.client.api.YarnClientApplication;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.twill.api.TwillSpecification;
+import org.apache.twill.internal.Constants;
import org.apache.twill.internal.ProcessController;
import org.apache.twill.internal.ProcessLauncher;
+import org.apache.twill.internal.appmaster.ApplicationMasterInfo;
import org.apache.twill.internal.appmaster.ApplicationMasterProcessLauncher;
import org.apache.twill.internal.appmaster.ApplicationSubmitter;
import org.slf4j.Logger;
@@ -64,8 +66,8 @@ public final class Hadoop21YarnAppClient extends AbstractIdleService implements
}
@Override
- public ProcessLauncher<ApplicationId> createLauncher(TwillSpecification twillSpec,
- @Nullable String schedulerQueue) throws Exception {
+ public ProcessLauncher<ApplicationMasterInfo> createLauncher(TwillSpecification twillSpec,
+ @Nullable String schedulerQueue) throws Exception {
// Request for new application
YarnClientApplication application = yarnClient.createApplication();
final GetNewApplicationResponse response = application.getNewApplicationResponse();
@@ -80,14 +82,20 @@ public final class Hadoop21YarnAppClient extends AbstractIdleService implements
appSubmissionContext.setQueue(schedulerQueue);
}
+ // TODO: Make it adjustable through TwillSpec (TWILL-90)
+ // Set the resource requirement for AM
+ final Resource capability = adjustMemory(response, Resource.newInstance(Constants.APP_MASTER_MEMORY_MB, 1));
+ ApplicationMasterInfo appMasterInfo = new ApplicationMasterInfo(appId, capability.getMemory(),
+ capability.getVirtualCores());
+
ApplicationSubmitter submitter = new ApplicationSubmitter() {
@Override
- public ProcessController<YarnApplicationReport> submit(YarnLaunchContext context, Resource capability) {
+ public ProcessController<YarnApplicationReport> submit(YarnLaunchContext context) {
ContainerLaunchContext launchContext = context.getLaunchContext();
addRMToken(launchContext);
appSubmissionContext.setAMContainerSpec(launchContext);
- appSubmissionContext.setResource(adjustMemory(response, capability));
+ appSubmissionContext.setResource(capability);
appSubmissionContext.setMaxAppAttempts(2);
try {
@@ -100,7 +108,7 @@ public final class Hadoop21YarnAppClient extends AbstractIdleService implements
}
};
- return new ApplicationMasterProcessLauncher(appId, submitter);
+ return new ApplicationMasterProcessLauncher(appMasterInfo, submitter);
}
private Resource adjustMemory(GetNewApplicationResponse response, Resource capability) {
@@ -139,9 +147,9 @@ public final class Hadoop21YarnAppClient extends AbstractIdleService implements
}
@Override
- public ProcessLauncher<ApplicationId> createLauncher(String user,
- TwillSpecification twillSpec,
- @Nullable String schedulerQueue) throws Exception {
+ public ProcessLauncher<ApplicationMasterInfo> createLauncher(String user,
+ TwillSpecification twillSpec,
+ @Nullable String schedulerQueue) throws Exception {
// Ignore user
return createLauncher(twillSpec, schedulerQueue);
}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/66402b4f/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterInfo.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterInfo.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterInfo.java
new file mode 100644
index 0000000..fbeeca0
--- /dev/null
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterInfo.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.appmaster;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.twill.internal.ResourceCapability;
+
+/**
+ * Represents information of the application master.
+ */
+public class ApplicationMasterInfo implements ResourceCapability {
+
+ private final ApplicationId appId;
+ private final int memoryMB;
+ private final int virtualCores;
+
+ public ApplicationMasterInfo(ApplicationId appId, int memoryMB, int virtualCores) {
+ this.appId = appId;
+ this.memoryMB = memoryMB;
+ this.virtualCores = virtualCores;
+ }
+
+ /**
+ * Returns the application ID for the YARN application.
+ */
+ public ApplicationId getAppId() {
+ return appId;
+ }
+
+ @Override
+ public int getMemoryMB() {
+ return memoryMB;
+ }
+
+ @Override
+ public int getVirtualCores() {
+ return virtualCores;
+ }
+
+ @Override
+ public String toString() {
+ return "ApplicationMasterInfo{" +
+ "appId=" + appId +
+ ", memoryMB=" + memoryMB +
+ ", virtualCores=" + virtualCores +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/66402b4f/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterProcessLauncher.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterProcessLauncher.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterProcessLauncher.java
index 126ff97..da11816 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterProcessLauncher.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterProcessLauncher.java
@@ -19,38 +19,30 @@ package org.apache.twill.internal.appmaster;
import com.google.common.collect.ImmutableMap;
import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.util.Records;
-import org.apache.twill.internal.Constants;
import org.apache.twill.internal.EnvKeys;
import org.apache.twill.internal.ProcessController;
import org.apache.twill.internal.yarn.AbstractYarnProcessLauncher;
import org.apache.twill.internal.yarn.YarnLaunchContext;
-import org.apache.twill.internal.yarn.YarnUtils;
import java.util.Map;
/**
* A {@link org.apache.twill.internal.ProcessLauncher} for launching Application Master from the client.
*/
-public final class ApplicationMasterProcessLauncher extends AbstractYarnProcessLauncher<ApplicationId> {
+public final class ApplicationMasterProcessLauncher extends AbstractYarnProcessLauncher<ApplicationMasterInfo> {
private final ApplicationSubmitter submitter;
- public ApplicationMasterProcessLauncher(ApplicationId appId, ApplicationSubmitter submitter) {
- super(appId);
+ public ApplicationMasterProcessLauncher(ApplicationMasterInfo info, ApplicationSubmitter submitter) {
+ super(info);
this.submitter = submitter;
}
@Override
@SuppressWarnings("unchecked")
protected <R> ProcessController<R> doLaunch(YarnLaunchContext launchContext) {
- final ApplicationId appId = getContainerInfo();
-
- // Set the resource requirement for AM
- Resource capability = Records.newRecord(Resource.class);
- capability.setMemory(Constants.APP_MASTER_MEMORY_MB);
- YarnUtils.setVirtualCores(capability, 1);
+ ApplicationMasterInfo appMasterInfo = getContainerInfo();
+ ApplicationId appId = appMasterInfo.getAppId();
// Put in extra environments
Map<String, String> env = ImmutableMap.<String, String>builder()
@@ -58,11 +50,11 @@ public final class ApplicationMasterProcessLauncher extends AbstractYarnProcessL
.put(EnvKeys.YARN_APP_ID, Integer.toString(appId.getId()))
.put(EnvKeys.YARN_APP_ID_CLUSTER_TIME, Long.toString(appId.getClusterTimestamp()))
.put(EnvKeys.YARN_APP_ID_STR, appId.toString())
- .put(EnvKeys.YARN_CONTAINER_MEMORY_MB, Integer.toString(Constants.APP_MASTER_MEMORY_MB))
- .put(EnvKeys.YARN_CONTAINER_VIRTUAL_CORES, Integer.toString(YarnUtils.getVirtualCores(capability)))
+ .put(EnvKeys.YARN_CONTAINER_MEMORY_MB, Integer.toString(appMasterInfo.getMemoryMB()))
+ .put(EnvKeys.YARN_CONTAINER_VIRTUAL_CORES, Integer.toString(appMasterInfo.getVirtualCores()))
.build();
launchContext.setEnvironment(env);
- return (ProcessController<R>) submitter.submit(launchContext, capability);
+ return (ProcessController<R>) submitter.submit(launchContext);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/66402b4f/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
index 818db05..355cea3 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
@@ -176,10 +176,14 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp
}
@SuppressWarnings("unchecked")
+ @Nullable
private EventHandler createEventHandler(TwillSpecification twillSpec) {
try {
// Should be able to load by this class ClassLoader, as they packaged in the same jar.
EventHandlerSpecification handlerSpec = twillSpec.getEventHandler();
+ if (handlerSpec == null) {
+ return null;
+ }
Class<?> handlerClass = getClass().getClassLoader().loadClass(handlerSpec.getClassName());
Preconditions.checkArgument(EventHandler.class.isAssignableFrom(handlerClass),
@@ -221,7 +225,9 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp
LOG.info("Start application master with spec: " + TwillSpecificationAdapter.create().toJson(twillSpec));
// initialize the event handler, if it fails, it will fail the application.
- eventHandler.initialize(new BasicEventHandlerContext(twillSpec.getEventHandler()));
+ if (eventHandler != null) {
+ eventHandler.initialize(new BasicEventHandlerContext(twillSpec.getEventHandler()));
+ }
instanceChangeExecutor = Executors.newSingleThreadExecutor(Threads.createDaemonThreadFactory("instanceChanger"));
@@ -237,11 +243,13 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp
LOG.info("Stop application master with spec: {}", TwillSpecificationAdapter.create().toJson(twillSpec));
- try {
- // call event handler destroy. If there is error, only log and not affected stop sequence.
- eventHandler.destroy();
- } catch (Throwable t) {
- LOG.warn("Exception when calling {}.destroy()", twillSpec.getEventHandler().getClassName(), t);
+ if (eventHandler != null) {
+ try {
+ // call event handler destroy. If there is error, only log and not affected stop sequence.
+ eventHandler.destroy();
+ } catch (Throwable t) {
+ LOG.warn("Exception when calling {}.destroy()", eventHandler.getClass().getName(), t);
+ }
}
instanceChangeExecutor.shutdownNow();
@@ -491,7 +499,7 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp
}
}
- if (!timeoutEvents.isEmpty()) {
+ if (!timeoutEvents.isEmpty() && eventHandler != null) {
try {
EventHandler.TimeoutAction action = eventHandler.launchTimeout(timeoutEvents);
if (action.getTimeout() < 0) {
@@ -640,7 +648,7 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp
);
TwillContainerLauncher launcher = new TwillContainerLauncher(
- twillSpec.getRunnables().get(runnableName), launchContext,
+ twillSpec.getRunnables().get(runnableName), processLauncher.getContainerInfo(), launchContext,
ZKClients.namespace(zkClient, getZKNamespace(runnableName)),
containerCount, jvmOpts, reservedMemory, getSecureStoreLocation());
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/66402b4f/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationSubmitter.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationSubmitter.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationSubmitter.java
index 38f90ae..e82dbbc 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationSubmitter.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationSubmitter.java
@@ -17,7 +17,6 @@
*/
package org.apache.twill.internal.appmaster;
-import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.twill.internal.ProcessController;
import org.apache.twill.internal.yarn.YarnApplicationReport;
import org.apache.twill.internal.yarn.YarnLaunchContext;
@@ -27,5 +26,5 @@ import org.apache.twill.internal.yarn.YarnLaunchContext;
*/
public interface ApplicationSubmitter {
- ProcessController<YarnApplicationReport> submit(YarnLaunchContext launchContext, Resource capability);
+ ProcessController<YarnApplicationReport> submit(YarnLaunchContext launchContext);
}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/66402b4f/twill-yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnProcessLauncher.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnProcessLauncher.java b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnProcessLauncher.java
index 2023b0e..eb917f3 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnProcessLauncher.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnProcessLauncher.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.twill.api.LocalFile;
import org.apache.twill.internal.ProcessController;
import org.apache.twill.internal.ProcessLauncher;
+import org.apache.twill.internal.ResourceCapability;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,7 +39,7 @@ import java.util.Map;
*
* @param <T> Type of the object that contains information about the container that the process is going to launch.
*/
-public abstract class AbstractYarnProcessLauncher<T> implements ProcessLauncher<T> {
+public abstract class AbstractYarnProcessLauncher<T extends ResourceCapability> implements ProcessLauncher<T> {
private static final Logger LOG = LoggerFactory.getLogger(AbstractYarnProcessLauncher.class);
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/66402b4f/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnAppClient.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnAppClient.java b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnAppClient.java
index 67a2292..df956bf 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnAppClient.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnAppClient.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.twill.api.TwillSpecification;
import org.apache.twill.internal.ProcessController;
import org.apache.twill.internal.ProcessLauncher;
+import org.apache.twill.internal.appmaster.ApplicationMasterInfo;
import java.util.List;
import javax.annotation.Nullable;
@@ -36,8 +37,8 @@ public interface YarnAppClient extends Service {
* Creates a {@link ProcessLauncher} for launching the application represented by the given spec. If scheduler queue
* is available and is supported by the YARN cluster, it will be launched in the given queue.
*/
- ProcessLauncher<ApplicationId> createLauncher(TwillSpecification twillSpec,
- @Nullable String schedulerQueue) throws Exception;
+ ProcessLauncher<ApplicationMasterInfo> createLauncher(TwillSpecification twillSpec,
+ @Nullable String schedulerQueue) throws Exception;
/**
* Creates a {@link ProcessLauncher} for launching application with the given user and spec. If scheduler queue
@@ -46,9 +47,9 @@ public interface YarnAppClient extends Service {
* @deprecated This method will get removed.
*/
@Deprecated
- ProcessLauncher<ApplicationId> createLauncher(String user,
- TwillSpecification twillSpec,
- @Nullable String schedulerQueue) throws Exception;
+ ProcessLauncher<ApplicationMasterInfo> createLauncher(String user,
+ TwillSpecification twillSpec,
+ @Nullable String schedulerQueue) throws Exception;
/**
* Creates a {@link ProcessController} that can controls an application represented by the given application id.
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/66402b4f/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
index 4e9f76d..a444dda 100644
--- a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
+++ b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
@@ -40,7 +40,6 @@ import com.google.gson.GsonBuilder;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.twill.api.ClassAcceptor;
import org.apache.twill.api.EventHandlerSpecification;
@@ -68,6 +67,7 @@ import org.apache.twill.internal.LogOnlyEventHandler;
import org.apache.twill.internal.ProcessController;
import org.apache.twill.internal.ProcessLauncher;
import org.apache.twill.internal.RunIds;
+import org.apache.twill.internal.appmaster.ApplicationMasterInfo;
import org.apache.twill.internal.appmaster.ApplicationMasterMain;
import org.apache.twill.internal.container.TwillContainerMain;
import org.apache.twill.internal.json.ArgumentsCodec;
@@ -76,6 +76,7 @@ import org.apache.twill.internal.json.LocalFileCodec;
import org.apache.twill.internal.json.TwillSpecificationAdapter;
import org.apache.twill.internal.utils.Dependencies;
import org.apache.twill.internal.utils.Paths;
+import org.apache.twill.internal.utils.Resources;
import org.apache.twill.internal.yarn.YarnAppClient;
import org.apache.twill.internal.yarn.YarnApplicationReport;
import org.apache.twill.internal.yarn.YarnUtils;
@@ -283,8 +284,8 @@ final class YarnTwillPreparer implements TwillPreparer {
@Override
public TwillController start() {
try {
- final ProcessLauncher<ApplicationId> launcher = yarnAppClient.createLauncher(twillSpec, schedulerQueue);
- final ApplicationId appId = launcher.getContainerInfo();
+ final ProcessLauncher<ApplicationMasterInfo> launcher = yarnAppClient.createLauncher(twillSpec, schedulerQueue);
+ final ApplicationMasterInfo appMasterInfo = launcher.getContainerInfo();
Callable<ProcessController<YarnApplicationReport>> submitTask =
new Callable<ProcessController<YarnApplicationReport>>() {
@Override
@@ -310,7 +311,7 @@ final class YarnTwillPreparer implements TwillPreparer {
Constants.Files.LAUNCHER_JAR,
Constants.Files.ARGUMENTS));
- LOG.debug("Submit AM container spec: {}", appId);
+ LOG.debug("Submit AM container spec: {}", appMasterInfo);
// java -Djava.io.tmpdir=tmp -cp launcher.jar:$HADOOP_CONF_DIR -XmxMemory
// org.apache.twill.internal.TwillLauncher
// appMaster.jar
@@ -329,6 +330,9 @@ final class YarnTwillPreparer implements TwillPreparer {
LOG.debug("Log level is set to {} for the Twill application.", logLevel);
builder.put(EnvKeys.TWILL_APP_LOG_LEVEL, logLevel.toString());
}
+
+ int memory = Resources.computeMaxHeapSize(appMasterInfo.getMemoryMB(),
+ Constants.APP_MASTER_RESERVED_MEMORY_MB, Constants.HEAP_MIN_RATIO);
return launcher.prepareLaunch(builder.build(), localFiles.values(), credentials)
.addCommand(
"$JAVA_HOME/bin/java",
@@ -336,7 +340,7 @@ final class YarnTwillPreparer implements TwillPreparer {
"-Dyarn.appId=$" + EnvKeys.YARN_APP_ID_STR,
"-Dtwill.app=$" + EnvKeys.TWILL_APP_NAME,
"-cp", Constants.Files.LAUNCHER_JAR + ":$HADOOP_CONF_DIR",
- "-Xmx" + (Constants.APP_MASTER_MEMORY_MB - Constants.APP_MASTER_RESERVED_MEMORY_MB) + "m",
+ "-Xmx" + memory + "m",
extraOptions == null ? "" : extraOptions,
TwillLauncher.class.getName(),
Constants.Files.APP_MASTER_JAR,
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/66402b4f/twill-yarn/src/test/java/org/apache/twill/yarn/ContainerSizeTestRun.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/ContainerSizeTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/ContainerSizeTestRun.java
index 6e27b69..c6f7b9a 100644
--- a/twill-yarn/src/test/java/org/apache/twill/yarn/ContainerSizeTestRun.java
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/ContainerSizeTestRun.java
@@ -29,6 +29,8 @@ import org.apache.twill.api.logging.PrinterLogHandler;
import org.apache.twill.discovery.ServiceDiscovered;
import org.junit.Assert;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.PrintWriter;
import java.util.concurrent.ExecutionException;
@@ -36,11 +38,14 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
- * Test for requesting different container size in different order.
- * It specifically test for workaround for YARN-314.
+ * Tests related to different container sizes.
*/
public class ContainerSizeTestRun extends BaseYarnTest {
+ /**
+ * Test for requesting different container size in different order.
+ * It specifically test for workaround for YARN-314.
+ */
@Test
public void testContainerSize() throws InterruptedException, TimeoutException, ExecutionException {
TwillRunner runner = getTwillRunner();
@@ -56,6 +61,20 @@ public class ContainerSizeTestRun extends BaseYarnTest {
}
}
+ @Test
+ public void testMaxHeapSize() throws InterruptedException, TimeoutException, ExecutionException {
+ TwillRunner runner = getTwillRunner();
+ TwillController controller = runner.prepare(new MaxHeapApp())
+ .addLogHandler(new PrinterLogHandler(new PrintWriter(System.out, true)))
+ .start();
+
+ try {
+ ServiceDiscovered discovered = controller.discoverService("sleep");
+ Assert.assertTrue(waitForSize(discovered, 1, 120));
+ } finally {
+ controller.terminate().get(120, TimeUnit.SECONDS);
+ }
+ }
/**
* An application that has two runnables with different memory size.
@@ -86,7 +105,6 @@ public class ContainerSizeTestRun extends BaseYarnTest {
}
}
-
/**
* A runnable that sleep for 120 seconds.
*/
@@ -116,4 +134,63 @@ public class ContainerSizeTestRun extends BaseYarnTest {
}
}
}
+
+ /**
+ * An application for testing max heap size.
+ */
+ public static final class MaxHeapApp implements TwillApplication {
+
+ @Override
+ public TwillSpecification configure() {
+ // Make the runnable request for container smaller than 128MB (the allocation minimum)
+ ResourceSpecification res = ResourceSpecification.Builder.with()
+ .setVirtualCores(1)
+ .setMemory(16, ResourceSpecification.SizeUnit.MEGA)
+ .build();
+
+ return TwillSpecification.Builder.with()
+ .setName("MaxHeapApp")
+ .withRunnable()
+ .add("sleep", new MaxHeapRunnable(12345), res).noLocalFiles()
+ .anyOrder()
+ .build();
+ }
+ }
+
+ /**
+ * The runnable for testing max heap size.
+ */
+ public static final class MaxHeapRunnable extends AbstractTwillRunnable {
+
+ private static final Logger LOG = LoggerFactory.getLogger(MaxHeapRunnable.class);
+ private volatile Thread runThread;
+
+ public MaxHeapRunnable(int port) {
+ super(ImmutableMap.of("port", Integer.toString(port)));
+ }
+
+ @Override
+ public void run() {
+ // This heap size should be > 16, since the min allocation size is 128mb
+ if (Runtime.getRuntime().maxMemory() <= 16 * 1024 * 1024) {
+ LOG.error("Memory size is too small: {}", Runtime.getRuntime().maxMemory());
+ return;
+ }
+
+ runThread = Thread.currentThread();
+ getContext().announce("sleep", Integer.parseInt(getContext().getSpecification().getConfigs().get("port")));
+ try {
+ TimeUnit.SECONDS.sleep(120);
+ } catch (InterruptedException e) {
+ // Ignore.
+ }
+ }
+
+ @Override
+ public void stop() {
+ if (runThread != null) {
+ runThread.interrupt();
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/66402b4f/twill-zookeeper/src/test/java/org/apache/twill/zookeeper/ZKClientTest.java
----------------------------------------------------------------------
diff --git a/twill-zookeeper/src/test/java/org/apache/twill/zookeeper/ZKClientTest.java b/twill-zookeeper/src/test/java/org/apache/twill/zookeeper/ZKClientTest.java
index 162d4db..a9120c3 100644
--- a/twill-zookeeper/src/test/java/org/apache/twill/zookeeper/ZKClientTest.java
+++ b/twill-zookeeper/src/test/java/org/apache/twill/zookeeper/ZKClientTest.java
@@ -30,6 +30,7 @@ import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.server.auth.DigestAuthenticationProvider;
import org.junit.Assert;
import org.junit.ClassRule;
@@ -188,12 +189,21 @@ public class ZKClientTest {
client.startAndWait();
try {
- final BlockingQueue<Watcher.Event.EventType> events = new LinkedBlockingQueue<Watcher.Event.EventType>();
+ final BlockingQueue<Watcher.Event.EventType> events = new LinkedBlockingQueue<>();
client.exists("/expireRewatch", new Watcher() {
@Override
- public void process(WatchedEvent event) {
- client.exists("/expireRewatch", this);
- events.add(event.getType());
+ public void process(final WatchedEvent event) {
+ Futures.addCallback(client.exists("/expireRewatch", this), new FutureCallback<Stat>() {
+ @Override
+ public void onSuccess(Stat result) {
+ events.add(event.getType());
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ LOG.error("Failed to call exists on /expireRewatch", t);
+ }
+ });
}
});
@@ -309,7 +319,7 @@ public class ZKClientTest {
Assert.assertEquals("digest", acl.getId().getScheme());
Assert.assertEquals(digest, acl.getId().getId());
- Assert.assertEquals("test", new String(noAuthClient.getData(path).get().getData()));
+ Assert.assertArrayEquals("test".getBytes(), noAuthClient.getData(path).get().getData());
// When tries to write using the no-auth zk client, it should fail.
try {
[08/22] incubator-twill git commit: (TWILL-131) Remove ZK node when
application finished.
Posted by ch...@apache.org.
(TWILL-131) Remove ZK node when application finished.
- Remove the application ZK node when the application terminates
This closes #70 on Github
Signed-off-by: Terence Yim <ch...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/incubator-twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-twill/commit/e4a36762
Tree: http://git-wip-us.apache.org/repos/asf/incubator-twill/tree/e4a36762
Diff: http://git-wip-us.apache.org/repos/asf/incubator-twill/diff/e4a36762
Branch: refs/heads/site
Commit: e4a36762e8df34aa4971e29863714f199cb8ddcd
Parents: f6d2b6c
Author: Terence Yim <ch...@apache.org>
Authored: Wed Oct 14 13:39:32 2015 -0700
Committer: Terence Yim <ch...@apache.org>
Committed: Tue Oct 20 08:47:20 2015 -0700
----------------------------------------------------------------------
.../org/apache/twill/internal/Constants.java | 83 ++++++++++++++++++
.../twill/internal/AbstractTwillService.java | 2 +-
.../internal/AbstractZKServiceController.java | 3 +-
.../org/apache/twill/internal/Constants.java | 76 -----------------
.../twill/discovery/ZKDiscoveryService.java | 4 +-
.../twill/discovery/ZKDiscoveryServiceTest.java | 2 +-
.../org/apache/twill/internal/ServiceMain.java | 19 +++--
.../appmaster/ApplicationMasterMain.java | 89 +++++++++++++++++++-
.../appmaster/ApplicationMasterService.java | 6 +-
.../internal/container/TwillContainerMain.java | 5 +-
.../apache/twill/yarn/YarnTwillPreparer.java | 8 +-
.../twill/yarn/YarnTwillRunnerService.java | 4 +-
.../org/apache/twill/yarn/BaseYarnTest.java | 4 +
.../apache/twill/yarn/EchoServerTestRun.java | 68 +++++++++++++--
.../java/org/apache/twill/yarn/TwillTester.java | 4 +
15 files changed, 266 insertions(+), 111 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/e4a36762/twill-common/src/main/java/org/apache/twill/internal/Constants.java
----------------------------------------------------------------------
diff --git a/twill-common/src/main/java/org/apache/twill/internal/Constants.java b/twill-common/src/main/java/org/apache/twill/internal/Constants.java
new file mode 100644
index 0000000..dd04eb1
--- /dev/null
+++ b/twill-common/src/main/java/org/apache/twill/internal/Constants.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal;
+
+/**
+ * This class contains collection of common constants used in Twill.
+ */
+public final class Constants {
+
+ public static final String LOG_TOPIC = "log";
+
+ /** Maximum number of seconds for AM to start. */
+ public static final int APPLICATION_MAX_START_SECONDS = 60;
+ /** Maximum number of seconds for AM to stop. */
+ public static final int APPLICATION_MAX_STOP_SECONDS = 60;
+
+ public static final long PROVISION_TIMEOUT = 30000;
+
+ /**
+ * Milliseconds AM should wait for RM to allocate a constrained provision request.
+ * On timeout, AM relaxes the request constraints.
+ */
+ public static final int CONSTRAINED_PROVISION_REQUEST_TIMEOUT = 5000;
+
+ public static final double HEAP_MIN_RATIO = 0.7d;
+
+ /** Memory size of AM. */
+ public static final int APP_MASTER_MEMORY_MB = 512;
+
+ public static final int APP_MASTER_RESERVED_MEMORY_MB = 150;
+
+ public static final String CLASSPATH = "classpath";
+ public static final String APPLICATION_CLASSPATH = "application-classpath";
+
+ /** Command names for the restart runnable instances. */
+ public static final String RESTART_ALL_RUNNABLE_INSTANCES = "restartAllRunnableInstances";
+ public static final String RESTART_RUNNABLES_INSTANCES = "restartRunnablesInstances";
+
+ /**
+ * Common ZK paths constants
+ */
+ public static final String DISCOVERY_PATH_PREFIX = "/discoverable";
+ public static final String INSTANCES_PATH_PREFIX = "/instances";
+
+
+ /**
+ * Constants for names of internal files that are shared between client, AM and containers.
+ */
+ public static final class Files {
+
+ public static final String LAUNCHER_JAR = "launcher.jar";
+ public static final String APP_MASTER_JAR = "appMaster.jar";
+ public static final String CONTAINER_JAR = "container.jar";
+ public static final String LOCALIZE_FILES = "localizeFiles.json";
+ public static final String TWILL_SPEC = "twillSpec.json";
+ public static final String ARGUMENTS = "arguments.json";
+ public static final String ENVIRONMENTS = "environments.json";
+ public static final String LOGBACK_TEMPLATE = "logback-template.xml";
+ public static final String JVM_OPTIONS = "jvm.opts";
+ public static final String CREDENTIALS = "credentials.store";
+
+ private Files() {
+ }
+ }
+
+ private Constants() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/e4a36762/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java b/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java
index 2f95e0e..8688d0b 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java
@@ -359,7 +359,7 @@ public abstract class AbstractTwillService extends AbstractExecutionThreadServic
}
private String getLiveNodePath() {
- return "/instances/" + runId.getId();
+ return String.format("%s/%s", Constants.INSTANCES_PATH_PREFIX, runId.getId());
}
private <T> byte[] toJson(T obj) {
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/e4a36762/twill-core/src/main/java/org/apache/twill/internal/AbstractZKServiceController.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/AbstractZKServiceController.java b/twill-core/src/main/java/org/apache/twill/internal/AbstractZKServiceController.java
index 0cf92ea..9b30823 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/AbstractZKServiceController.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/AbstractZKServiceController.java
@@ -21,7 +21,6 @@ import com.google.common.collect.Lists;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
-
import org.apache.twill.api.Command;
import org.apache.twill.api.RunId;
import org.apache.twill.api.ServiceController;
@@ -239,7 +238,7 @@ public abstract class AbstractZKServiceController extends AbstractExecutionServi
* Returns the zookeeper node path for the ephemeral instance node for this runId.
*/
protected final String getInstancePath() {
- return String.format("/instances/%s", getRunId().getId());
+ return String.format("%s/%s", Constants.INSTANCES_PATH_PREFIX, getRunId().getId());
}
private String getZKPath(String path) {
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/e4a36762/twill-core/src/main/java/org/apache/twill/internal/Constants.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/Constants.java b/twill-core/src/main/java/org/apache/twill/internal/Constants.java
deleted file mode 100644
index 39de851..0000000
--- a/twill-core/src/main/java/org/apache/twill/internal/Constants.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal;
-
-/**
- * This class contains collection of common constants used in Twill.
- */
-public final class Constants {
-
- public static final String LOG_TOPIC = "log";
-
- /** Maximum number of seconds for AM to start. */
- public static final int APPLICATION_MAX_START_SECONDS = 60;
- /** Maximum number of seconds for AM to stop. */
- public static final int APPLICATION_MAX_STOP_SECONDS = 60;
-
- public static final long PROVISION_TIMEOUT = 30000;
-
- /**
- * Milliseconds AM should wait for RM to allocate a constrained provision request.
- * On timeout, AM relaxes the request constraints.
- */
- public static final int CONSTRAINED_PROVISION_REQUEST_TIMEOUT = 5000;
-
- public static final double HEAP_MIN_RATIO = 0.7d;
-
- /** Memory size of AM. */
- public static final int APP_MASTER_MEMORY_MB = 512;
-
- public static final int APP_MASTER_RESERVED_MEMORY_MB = 150;
-
- public static final String CLASSPATH = "classpath";
- public static final String APPLICATION_CLASSPATH = "application-classpath";
-
- /** Command names for the restart runnable instances. */
- public static final String RESTART_ALL_RUNNABLE_INSTANCES = "restartAllRunnableInstances";
- public static final String RESTART_RUNNABLES_INSTANCES = "restartRunnablesInstances";
-
- /**
- * Constants for names of internal files that are shared between client, AM and containers.
- */
- public static final class Files {
-
- public static final String LAUNCHER_JAR = "launcher.jar";
- public static final String APP_MASTER_JAR = "appMaster.jar";
- public static final String CONTAINER_JAR = "container.jar";
- public static final String LOCALIZE_FILES = "localizeFiles.json";
- public static final String TWILL_SPEC = "twillSpec.json";
- public static final String ARGUMENTS = "arguments.json";
- public static final String ENVIRONMENTS = "environments.json";
- public static final String LOGBACK_TEMPLATE = "logback-template.xml";
- public static final String JVM_OPTIONS = "jvm.opts";
- public static final String CREDENTIALS = "credentials.store";
-
- private Files() {
- }
- }
-
- private Constants() {
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/e4a36762/twill-discovery-core/src/main/java/org/apache/twill/discovery/ZKDiscoveryService.java
----------------------------------------------------------------------
diff --git a/twill-discovery-core/src/main/java/org/apache/twill/discovery/ZKDiscoveryService.java b/twill-discovery-core/src/main/java/org/apache/twill/discovery/ZKDiscoveryService.java
index 3f0db34..c563bab 100644
--- a/twill-discovery-core/src/main/java/org/apache/twill/discovery/ZKDiscoveryService.java
+++ b/twill-discovery-core/src/main/java/org/apache/twill/discovery/ZKDiscoveryService.java
@@ -31,6 +31,7 @@ import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import org.apache.twill.common.Cancellable;
import org.apache.twill.common.Threads;
+import org.apache.twill.internal.Constants;
import org.apache.twill.zookeeper.NodeChildren;
import org.apache.twill.zookeeper.NodeData;
import org.apache.twill.zookeeper.OperationFuture;
@@ -94,7 +95,6 @@ import java.util.concurrent.locks.ReentrantLock;
*/
public class ZKDiscoveryService implements DiscoveryService, DiscoveryServiceClient {
private static final Logger LOG = LoggerFactory.getLogger(ZKDiscoveryService.class);
- private static final String NAMESPACE = "/discoverable";
private static final long RETRY_MILLIS = 1000;
@@ -112,7 +112,7 @@ public class ZKDiscoveryService implements DiscoveryService, DiscoveryServiceCli
* @param zkClient The {@link ZKClient} for interacting with zookeeper.
*/
public ZKDiscoveryService(ZKClient zkClient) {
- this(zkClient, NAMESPACE);
+ this(zkClient, Constants.DISCOVERY_PATH_PREFIX);
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/e4a36762/twill-discovery-core/src/test/java/org/apache/twill/discovery/ZKDiscoveryServiceTest.java
----------------------------------------------------------------------
diff --git a/twill-discovery-core/src/test/java/org/apache/twill/discovery/ZKDiscoveryServiceTest.java b/twill-discovery-core/src/test/java/org/apache/twill/discovery/ZKDiscoveryServiceTest.java
index 7707c5b..7d6e369 100644
--- a/twill-discovery-core/src/test/java/org/apache/twill/discovery/ZKDiscoveryServiceTest.java
+++ b/twill-discovery-core/src/test/java/org/apache/twill/discovery/ZKDiscoveryServiceTest.java
@@ -63,7 +63,7 @@ public class ZKDiscoveryServiceTest extends DiscoveryServiceTestBase {
zkServer.stopAndWait();
}
- @Test (timeout = 10000)
+ @Test (timeout = 30000)
public void testDoubleRegister() throws Exception {
Map.Entry<DiscoveryService, DiscoveryServiceClient> entry = create();
DiscoveryService discoveryService = entry.getKey();
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/e4a36762/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java b/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java
index a6af3d3..cafd375 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java
@@ -157,12 +157,16 @@ public abstract class ServiceMain {
/**
* Creates a {@link ZKClientService}.
*/
- protected static ZKClientService createZKClient(String zkConnectStr) {
+ protected static ZKClientService createZKClient(String zkConnectStr, String appName) {
return ZKClientServices.delegate(
- ZKClients.reWatchOnExpire(
- ZKClients.retryOnFailure(
- ZKClientService.Builder.of(zkConnectStr).build(),
- RetryStrategies.fixDelay(1, TimeUnit.SECONDS))));
+ ZKClients.namespace(
+ ZKClients.reWatchOnExpire(
+ ZKClients.retryOnFailure(
+ ZKClientService.Builder.of(zkConnectStr).build(),
+ RetryStrategies.fixDelay(1, TimeUnit.SECONDS)
+ )
+ ), "/" + appName
+ ));
}
private void configureLogger() {
@@ -256,10 +260,11 @@ public abstract class ServiceMain {
/**
* A simple service for creating/remove ZK paths needed for {@link AbstractTwillService}.
*/
- protected static final class TwillZKPathService extends AbstractIdleService {
+ protected static class TwillZKPathService extends AbstractIdleService {
+
+ protected static final long TIMEOUT_SECONDS = 5L;
private static final Logger LOG = LoggerFactory.getLogger(TwillZKPathService.class);
- private static final long TIMEOUT_SECONDS = 5L;
private final ZKClient zkClient;
private final String path;
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/e4a36762/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
index 3e8cb93..38a2463 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
@@ -19,6 +19,7 @@ package org.apache.twill.internal.appmaster;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.AbstractIdleService;
+import com.google.common.util.concurrent.Futures;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -32,6 +33,7 @@ import org.apache.twill.internal.logging.Loggings;
import org.apache.twill.internal.utils.Networks;
import org.apache.twill.internal.yarn.VersionDetectYarnAMClientFactory;
import org.apache.twill.internal.yarn.YarnAMClient;
+import org.apache.twill.zookeeper.OperationFuture;
import org.apache.twill.zookeeper.ZKClient;
import org.apache.twill.zookeeper.ZKClientService;
import org.apache.twill.zookeeper.ZKOperations;
@@ -43,7 +45,10 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.net.InetAddress;
import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Properties;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
/**
@@ -65,7 +70,7 @@ public final class ApplicationMasterMain extends ServiceMain {
File twillSpec = new File(Constants.Files.TWILL_SPEC);
RunId runId = RunIds.fromString(System.getenv(EnvKeys.TWILL_RUN_ID));
- ZKClientService zkClientService = createZKClient(zkConnect);
+ ZKClientService zkClientService = createZKClient(zkConnect, System.getenv(EnvKeys.TWILL_APP_NAME));
Configuration conf = new YarnConfiguration(new HdfsConfiguration(new Configuration()));
setRMSchedulerAddress(conf);
@@ -74,12 +79,12 @@ public final class ApplicationMasterMain extends ServiceMain {
twillSpec, amClient, createAppLocation(conf));
TrackerService trackerService = new TrackerService(service);
- new ApplicationMasterMain(String.format("%s/%s/kafka", zkConnect, runId.getId()))
+ new ApplicationMasterMain(service.getKafkaZKConnect())
.doMain(
service,
new YarnAMClientService(amClient, trackerService),
zkClientService,
- new TwillZKPathService(zkClientService, runId),
+ new AppMasterTwillZKPathService(zkClientService, runId),
new ApplicationKafkaService(zkClientService, runId)
);
}
@@ -229,4 +234,82 @@ public final class ApplicationMasterMain extends ServiceMain {
}
}
}
+
+ private static final class AppMasterTwillZKPathService extends TwillZKPathService {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AppMasterTwillZKPathService.class);
+ private final ZKClient zkClient;
+
+ public AppMasterTwillZKPathService(ZKClient zkClient, RunId runId) {
+ super(zkClient, runId);
+ this.zkClient = zkClient;
+ }
+
+ @Override
+ protected void shutDown() throws Exception {
+ super.shutDown();
+
+ // Deletes ZK nodes created for the application execution.
+ // We don't have to worry about a race condition if another instance of the same app starts at the same time
+ // as when removal is performed. This is because we always create nodes with "createParent == true",
+ // which takes care of the parent node recreation if it is removed from here.
+
+ // Try to delete the /instances path. It may throws NotEmptyException if there are other instances of the
+ // same app running, which we can safely ignore and return.
+ if (!delete(Constants.INSTANCES_PATH_PREFIX)) {
+ return;
+ }
+
+ // Try to delete children under /discovery. It may fail with NotEmptyException if there are other instances
+ // of the same app running that has discovery services running.
+ List<String> children = zkClient.getChildren(Constants.DISCOVERY_PATH_PREFIX)
+ .get(TIMEOUT_SECONDS, TimeUnit.SECONDS).getChildren();
+ List<OperationFuture<?>> deleteFutures = new ArrayList<>();
+ for (String child : children) {
+ String path = Constants.DISCOVERY_PATH_PREFIX + "/" + child;
+ LOG.info("Removing ZK path: {}{}", zkClient.getConnectString(), path);
+ deleteFutures.add(zkClient.delete(path));
+ }
+ Futures.successfulAsList(deleteFutures).get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
+ for (OperationFuture<?> future : deleteFutures) {
+ try {
+ future.get();
+ } catch (ExecutionException e) {
+ if (e.getCause() instanceof KeeperException.NotEmptyException) {
+ return;
+ }
+ throw e;
+ }
+ }
+
+ // Delete the /discovery. It may fail with NotEmptyException (due to race between apps),
+ // which can safely ignore and return.
+ if (!delete(Constants.DISCOVERY_PATH_PREFIX)) {
+ return;
+ }
+
+ // Delete the ZK path for the app namespace.
+ delete("/");
+ }
+
+ /**
+ * Deletes the given ZK path.
+ *
+ * @param path path to delete
+ * @return true if the path was deleted, false if failed to delete due to {@link KeeperException.NotEmptyException}.
+ * @throws Exception if failed to delete the path
+ */
+ private boolean delete(String path) throws Exception {
+ try {
+ LOG.info("Removing ZK path: {}{}", zkClient.getConnectString(), path);
+ zkClient.delete(path).get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
+ return true;
+ } catch (ExecutionException e) {
+ if (e.getCause() instanceof KeeperException.NotEmptyException) {
+ return false;
+ }
+ throw e;
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/e4a36762/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
index e1523d6..c376de4 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
@@ -236,7 +236,7 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp
// Creates ZK path for runnable
zkClient.create("/" + runId.getId() + "/runnables", null, CreateMode.PERSISTENT).get();
- runningContainers.addWatcher("/discoverable");
+ runningContainers.addWatcher(Constants.DISCOVERY_PATH_PREFIX);
runnableContainerRequests = initContainerRequests();
}
@@ -648,7 +648,7 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp
env.put(EnvKeys.TWILL_APP_RUN_ID, runId.getId());
env.put(EnvKeys.TWILL_APP_NAME, twillSpec.getName());
env.put(EnvKeys.TWILL_APP_LOG_LEVEL, System.getenv(EnvKeys.TWILL_APP_LOG_LEVEL));
- env.put(EnvKeys.TWILL_ZK_CONNECT, zkClient.getConnectString());
+ env.put(EnvKeys.TWILL_ZK_CONNECT, System.getenv(EnvKeys.TWILL_ZK_CONNECT));
env.put(EnvKeys.TWILL_LOG_KAFKA_ZK, getKafkaZKConnect());
ProcessLauncher.PrepareLaunchContext launchContext = processLauncher.prepareLaunch(env, getLocalizeFiles(),
@@ -703,7 +703,7 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp
return String.format("/%s/runnables/%s", runId.getId(), runnableName);
}
- private String getKafkaZKConnect() {
+ String getKafkaZKConnect() {
return String.format("%s/%s/kafka", zkClient.getConnectString(), runId.getId());
}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/e4a36762/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java b/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java
index 51837a7..3ea786a 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java
@@ -18,7 +18,6 @@
package org.apache.twill.internal.container;
import com.google.common.base.Charsets;
-import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.io.Files;
import com.google.common.util.concurrent.AbstractService;
@@ -28,9 +27,7 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.twill.api.LocalFile;
import org.apache.twill.api.RunId;
-import org.apache.twill.api.RuntimeSpecification;
import org.apache.twill.api.TwillRunnableSpecification;
import org.apache.twill.api.TwillSpecification;
import org.apache.twill.discovery.ZKDiscoveryService;
@@ -80,7 +77,7 @@ public final class TwillContainerMain extends ServiceMain {
int instanceId = Integer.parseInt(System.getenv(EnvKeys.TWILL_INSTANCE_ID));
int instanceCount = Integer.parseInt(System.getenv(EnvKeys.TWILL_INSTANCE_COUNT));
- ZKClientService zkClientService = createZKClient(zkConnectStr);
+ ZKClientService zkClientService = createZKClient(zkConnectStr, System.getenv(EnvKeys.TWILL_APP_NAME));
ZKDiscoveryService discoveryService = new ZKDiscoveryService(zkClientService);
ZKClient appRunZkClient = getAppRunZKClient(zkClientService, appRunId);
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/e4a36762/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
index d4edfeb..d04cdab 100644
--- a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
+++ b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
@@ -116,7 +116,7 @@ final class YarnTwillPreparer implements TwillPreparer {
private final YarnConfiguration yarnConfig;
private final TwillSpecification twillSpec;
private final YarnAppClient yarnAppClient;
- private final ZKClient zkClient;
+ private final String zkConnectString;
private final LocationFactory locationFactory;
private final YarnTwillControllerFactory controllerFactory;
private final RunId runId;
@@ -139,13 +139,13 @@ final class YarnTwillPreparer implements TwillPreparer {
private LogEntry.Level logLevel;
YarnTwillPreparer(YarnConfiguration yarnConfig, TwillSpecification twillSpec,
- YarnAppClient yarnAppClient, ZKClient zkClient,
+ YarnAppClient yarnAppClient, String zkConnectString,
LocationFactory locationFactory, String extraOptions, LogEntry.Level logLevel,
YarnTwillControllerFactory controllerFactory) {
this.yarnConfig = yarnConfig;
this.twillSpec = twillSpec;
this.yarnAppClient = yarnAppClient;
- this.zkClient = ZKClients.namespace(zkClient, "/" + twillSpec.getName());
+ this.zkConnectString = zkConnectString;
this.locationFactory = locationFactory;
this.controllerFactory = controllerFactory;
this.runId = RunIds.generate();
@@ -345,7 +345,7 @@ final class YarnTwillPreparer implements TwillPreparer {
ImmutableMap.Builder<String, String> builder = ImmutableMap.<String, String>builder()
.put(EnvKeys.TWILL_FS_USER, fsUser)
.put(EnvKeys.TWILL_APP_DIR, getAppLocation().toURI().toASCIIString())
- .put(EnvKeys.TWILL_ZK_CONNECT, zkClient.getConnectString())
+ .put(EnvKeys.TWILL_ZK_CONNECT, zkConnectString)
.put(EnvKeys.TWILL_RUN_ID, runId.getId())
.put(EnvKeys.TWILL_RESERVED_MEMORY_MB, Integer.toString(reservedMemory))
.put(EnvKeys.TWILL_APP_NAME, twillSpec.getName())
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/e4a36762/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java
index 8a13017..c5853d6 100644
--- a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java
+++ b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java
@@ -277,8 +277,8 @@ public final class YarnTwillRunnerService implements TwillRunnerService {
final TwillSpecification twillSpec = application.configure();
final String appName = twillSpec.getName();
- return new YarnTwillPreparer(yarnConfig, twillSpec, yarnAppClient, zkClientService, locationFactory, jvmOptions,
- LogEntry.Level.INFO, new YarnTwillControllerFactory() {
+ return new YarnTwillPreparer(yarnConfig, twillSpec, yarnAppClient, zkClientService.getConnectString(),
+ locationFactory, jvmOptions, LogEntry.Level.INFO, new YarnTwillControllerFactory() {
@Override
public YarnTwillController create(RunId runId, Iterable<LogHandler> logHandlers,
Callable<ProcessController<YarnApplicationReport>> startUp) {
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/e4a36762/twill-yarn/src/test/java/org/apache/twill/yarn/BaseYarnTest.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/BaseYarnTest.java b/twill-yarn/src/test/java/org/apache/twill/yarn/BaseYarnTest.java
index b5c7f58..a9cf2ed 100644
--- a/twill-yarn/src/test/java/org/apache/twill/yarn/BaseYarnTest.java
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/BaseYarnTest.java
@@ -109,4 +109,8 @@ public abstract class BaseYarnTest {
public List<NodeReport> getNodeReports() throws Exception {
return TWILL_TESTER.getNodeReports();
}
+
+ public String getZKConnectionString() {
+ return TWILL_TESTER.getZKConnectionString();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/e4a36762/twill-yarn/src/test/java/org/apache/twill/yarn/EchoServerTestRun.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/EchoServerTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/EchoServerTestRun.java
index 3f0f20c..13c07b1 100644
--- a/twill-yarn/src/test/java/org/apache/twill/yarn/EchoServerTestRun.java
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/EchoServerTestRun.java
@@ -31,23 +31,22 @@ import org.apache.twill.api.TwillRunnerService;
import org.apache.twill.api.logging.PrinterLogHandler;
import org.apache.twill.common.Threads;
import org.apache.twill.discovery.Discoverable;
+import org.apache.twill.zookeeper.ZKClientService;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.net.Socket;
-import java.net.URISyntaxException;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import javax.annotation.Nullable;
/**
@@ -58,8 +57,7 @@ public final class EchoServerTestRun extends BaseYarnTest {
private static final Logger LOG = LoggerFactory.getLogger(EchoServerTestRun.class);
@Test
- public void testEchoServer() throws InterruptedException, ExecutionException, IOException,
- URISyntaxException, TimeoutException {
+ public void testEchoServer() throws Exception {
TwillRunner runner = getTwillRunner();
TwillController controller = runner.prepare(new EchoServer(),
@@ -158,6 +156,64 @@ public final class EchoServerTestRun extends BaseYarnTest {
TimeUnit.SECONDS.sleep(2);
}
+ @Test
+ public void testZKCleanup() throws Exception {
+ ZKClientService zkClient = ZKClientService.Builder.of(getZKConnectionString() + "/twill").build();
+ zkClient.startAndWait();
+
+ try {
+ TwillRunner runner = getTwillRunner();
+
+ // Start an application and stop it.
+ TwillController controller = runner.prepare(new EchoServer())
+ .addLogHandler(new PrinterLogHandler(new PrintWriter(System.out, true)))
+ .withApplicationArguments("echo")
+ .withArguments("EchoServer", "echo2")
+ .start();
+
+ Iterable<Discoverable> echoServices = controller.discoverService("echo");
+ Assert.assertTrue(waitForSize(echoServices, 1, 120));
+
+ controller.terminate().get();
+
+ // Verify the ZK node gets cleanup
+ Assert.assertNull(zkClient.exists("/EchoServer").get());
+
+ // Start two instances of the application and stop one of it
+ List<TwillController> controllers = new ArrayList<>();
+ for (int i = 0; i < 2; i++) {
+ controllers.add(runner.prepare(new EchoServer())
+ .addLogHandler(new PrinterLogHandler(new PrintWriter(System.out, true)))
+ .withApplicationArguments("echo")
+ .withArguments("EchoServer", "echo2")
+ .start());
+ }
+
+ // There should be two instances up and running.
+ echoServices = controller.discoverService("echo");
+ Assert.assertTrue(waitForSize(echoServices, 2, 120));
+
+ // Stop one instance of the app
+ controllers.get(0).terminate().get();
+
+ // Verify the ZK node should still be there
+ Assert.assertNotNull(zkClient.exists("/EchoServer").get());
+
+ // We should still be able to do discovery, which depends on the ZK node.
+ echoServices = controller.discoverService("echo");
+ Assert.assertTrue(waitForSize(echoServices, 1, 120));
+
+ // Stop second instance of the app
+ controllers.get(1).terminate().get();
+
+ // Verify the ZK node gets cleanup
+ Assert.assertNull(zkClient.exists("/EchoServer").get());
+
+ } finally {
+ zkClient.stopAndWait();
+ }
+ }
+
/**
* Need helper method here to wait for getting resource report because {@link TwillController#getResourceReport()}
* could return null if the application has not fully started.
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/e4a36762/twill-yarn/src/test/java/org/apache/twill/yarn/TwillTester.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/TwillTester.java b/twill-yarn/src/test/java/org/apache/twill/yarn/TwillTester.java
index f669b83..e604cec 100644
--- a/twill-yarn/src/test/java/org/apache/twill/yarn/TwillTester.java
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/TwillTester.java
@@ -160,6 +160,10 @@ public class TwillTester extends ExternalResource {
return yarnAppClient.getNodeReports();
}
+ public String getZKConnectionString() {
+ return zkServer.getConnectionStr();
+ }
+
private void stopQuietly(Service service) {
try {
service.stopAndWait();
[20/22] incubator-twill git commit: Bump version to
0.8.0-incubating-SNAPSHOT
Posted by ch...@apache.org.
Bump version to 0.8.0-incubating-SNAPSHOT
Project: http://git-wip-us.apache.org/repos/asf/incubator-twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-twill/commit/59bf3883
Tree: http://git-wip-us.apache.org/repos/asf/incubator-twill/tree/59bf3883
Diff: http://git-wip-us.apache.org/repos/asf/incubator-twill/diff/59bf3883
Branch: refs/heads/site
Commit: 59bf3883f2016fffae1a4b68aa68b28912ca993d
Parents: c6b3f0d
Author: Terence Yim <ch...@apache.org>
Authored: Mon Jan 11 10:36:57 2016 -0800
Committer: Terence Yim <ch...@apache.org>
Committed: Mon Jan 11 10:36:57 2016 -0800
----------------------------------------------------------------------
pom.xml | 2 +-
twill-api/pom.xml | 2 +-
twill-common/pom.xml | 2 +-
twill-core/pom.xml | 2 +-
twill-discovery-api/pom.xml | 2 +-
twill-discovery-core/pom.xml | 2 +-
twill-examples/echo/pom.xml | 2 +-
twill-examples/pom.xml | 2 +-
twill-examples/yarn/pom.xml | 2 +-
twill-ext/pom.xml | 2 +-
twill-java8-test/pom.xml | 2 +-
twill-yarn/pom.xml | 2 +-
twill-zookeeper/pom.xml | 2 +-
13 files changed, 13 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/59bf3883/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index c116322..c4792c4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -29,7 +29,7 @@
<groupId>org.apache.twill</groupId>
<artifactId>twill-parent</artifactId>
- <version>0.7.0-incubating</version>
+ <version>0.8.0-incubating-SNAPSHOT</version>
<packaging>pom</packaging>
<name>Apache Twill</name>
<url>http://twill.incubator.apache.org</url>
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/59bf3883/twill-api/pom.xml
----------------------------------------------------------------------
diff --git a/twill-api/pom.xml b/twill-api/pom.xml
index 3cb6d97..ee34b84 100644
--- a/twill-api/pom.xml
+++ b/twill-api/pom.xml
@@ -24,7 +24,7 @@
<parent>
<groupId>org.apache.twill</groupId>
<artifactId>twill-parent</artifactId>
- <version>0.7.0-incubating</version>
+ <version>0.8.0-incubating-SNAPSHOT</version>
</parent>
<artifactId>twill-api</artifactId>
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/59bf3883/twill-common/pom.xml
----------------------------------------------------------------------
diff --git a/twill-common/pom.xml b/twill-common/pom.xml
index d1acb55..cd25620 100644
--- a/twill-common/pom.xml
+++ b/twill-common/pom.xml
@@ -22,7 +22,7 @@
<parent>
<artifactId>twill-parent</artifactId>
<groupId>org.apache.twill</groupId>
- <version>0.7.0-incubating</version>
+ <version>0.8.0-incubating-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/59bf3883/twill-core/pom.xml
----------------------------------------------------------------------
diff --git a/twill-core/pom.xml b/twill-core/pom.xml
index e6b5514..88212ec 100644
--- a/twill-core/pom.xml
+++ b/twill-core/pom.xml
@@ -22,7 +22,7 @@
<parent>
<artifactId>twill-parent</artifactId>
<groupId>org.apache.twill</groupId>
- <version>0.7.0-incubating</version>
+ <version>0.8.0-incubating-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/59bf3883/twill-discovery-api/pom.xml
----------------------------------------------------------------------
diff --git a/twill-discovery-api/pom.xml b/twill-discovery-api/pom.xml
index a38aff3..a7723d5 100644
--- a/twill-discovery-api/pom.xml
+++ b/twill-discovery-api/pom.xml
@@ -22,7 +22,7 @@
<parent>
<artifactId>twill-parent</artifactId>
<groupId>org.apache.twill</groupId>
- <version>0.7.0-incubating</version>
+ <version>0.8.0-incubating-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/59bf3883/twill-discovery-core/pom.xml
----------------------------------------------------------------------
diff --git a/twill-discovery-core/pom.xml b/twill-discovery-core/pom.xml
index f80510d..4264ef1 100644
--- a/twill-discovery-core/pom.xml
+++ b/twill-discovery-core/pom.xml
@@ -22,7 +22,7 @@
<parent>
<artifactId>twill-parent</artifactId>
<groupId>org.apache.twill</groupId>
- <version>0.7.0-incubating</version>
+ <version>0.8.0-incubating-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/59bf3883/twill-examples/echo/pom.xml
----------------------------------------------------------------------
diff --git a/twill-examples/echo/pom.xml b/twill-examples/echo/pom.xml
index 474127e..6893236 100644
--- a/twill-examples/echo/pom.xml
+++ b/twill-examples/echo/pom.xml
@@ -25,7 +25,7 @@ limitations under the License.
<parent>
<artifactId>twill-examples</artifactId>
<groupId>org.apache.twill</groupId>
- <version>0.7.0-incubating</version>
+ <version>0.8.0-incubating-SNAPSHOT</version>
</parent>
<name>Apache Twill examples: Echo</name>
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/59bf3883/twill-examples/pom.xml
----------------------------------------------------------------------
diff --git a/twill-examples/pom.xml b/twill-examples/pom.xml
index 3e2929b..0550876 100644
--- a/twill-examples/pom.xml
+++ b/twill-examples/pom.xml
@@ -25,7 +25,7 @@ limitations under the License.
<parent>
<groupId>org.apache.twill</groupId>
<artifactId>twill-parent</artifactId>
- <version>0.7.0-incubating</version>
+ <version>0.8.0-incubating-SNAPSHOT</version>
</parent>
<artifactId>twill-examples</artifactId>
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/59bf3883/twill-examples/yarn/pom.xml
----------------------------------------------------------------------
diff --git a/twill-examples/yarn/pom.xml b/twill-examples/yarn/pom.xml
index dea3777..58d4b99 100644
--- a/twill-examples/yarn/pom.xml
+++ b/twill-examples/yarn/pom.xml
@@ -24,7 +24,7 @@ limitations under the License.
<parent>
<artifactId>twill-examples</artifactId>
<groupId>org.apache.twill</groupId>
- <version>0.7.0-incubating</version>
+ <version>0.8.0-incubating-SNAPSHOT</version>
</parent>
<name>Apache Twill examples: YARN</name>
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/59bf3883/twill-ext/pom.xml
----------------------------------------------------------------------
diff --git a/twill-ext/pom.xml b/twill-ext/pom.xml
index 0eb45cd..c5bab6c 100644
--- a/twill-ext/pom.xml
+++ b/twill-ext/pom.xml
@@ -22,7 +22,7 @@ limitations under the License.
<parent>
<artifactId>twill-parent</artifactId>
<groupId>org.apache.twill</groupId>
- <version>0.7.0-incubating</version>
+ <version>0.8.0-incubating-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/59bf3883/twill-java8-test/pom.xml
----------------------------------------------------------------------
diff --git a/twill-java8-test/pom.xml b/twill-java8-test/pom.xml
index 65b1c6e..fc8ccae 100644
--- a/twill-java8-test/pom.xml
+++ b/twill-java8-test/pom.xml
@@ -24,7 +24,7 @@
<parent>
<groupId>org.apache.twill</groupId>
<artifactId>twill-parent</artifactId>
- <version>0.7.0-incubating</version>
+ <version>0.8.0-incubating-SNAPSHOT</version>
</parent>
<artifactId>twill-java8-test</artifactId>
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/59bf3883/twill-yarn/pom.xml
----------------------------------------------------------------------
diff --git a/twill-yarn/pom.xml b/twill-yarn/pom.xml
index 1df959a..acfacaa 100644
--- a/twill-yarn/pom.xml
+++ b/twill-yarn/pom.xml
@@ -22,7 +22,7 @@
<parent>
<artifactId>twill-parent</artifactId>
<groupId>org.apache.twill</groupId>
- <version>0.7.0-incubating</version>
+ <version>0.8.0-incubating-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/59bf3883/twill-zookeeper/pom.xml
----------------------------------------------------------------------
diff --git a/twill-zookeeper/pom.xml b/twill-zookeeper/pom.xml
index 9edc990..2f680f7 100644
--- a/twill-zookeeper/pom.xml
+++ b/twill-zookeeper/pom.xml
@@ -22,7 +22,7 @@
<parent>
<artifactId>twill-parent</artifactId>
<groupId>org.apache.twill</groupId>
- <version>0.7.0-incubating</version>
+ <version>0.8.0-incubating-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
[05/22] incubator-twill git commit: (TWILL-154) Remove hardcoded check for “hdfs” or “maprfs” URI prefix
Posted by ch...@apache.org.
(TWILL-154) Remove hardcoded check for “hdfs” or “maprfs” URI prefix
Signed-off-by: Terence Yim <ch...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/incubator-twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-twill/commit/ef8b1eae
Tree: http://git-wip-us.apache.org/repos/asf/incubator-twill/tree/ef8b1eae
Diff: http://git-wip-us.apache.org/repos/asf/incubator-twill/diff/ef8b1eae
Branch: refs/heads/site
Commit: ef8b1eae804db155dd05b20163e507521694647b
Parents: f88e18f
Author: Terence Yim <ch...@apache.org>
Authored: Fri Oct 9 13:42:22 2015 -0700
Committer: Terence Yim <ch...@apache.org>
Committed: Fri Oct 9 15:36:40 2015 -0700
----------------------------------------------------------------------
.../org/apache/twill/internal/ServiceMain.java | 21 +++++++++-----------
.../apache/twill/yarn/YarnTwillPreparer.java | 5 +++--
2 files changed, 12 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/ef8b1eae/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java b/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java
index f7bea24..a6af3d3 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java
@@ -136,20 +136,17 @@ public abstract class ServiceMain {
return new LocalLocationFactory().create(appDir);
}
- if ("hdfs".equals(appDir.getScheme()) || "maprfs".equals(appDir.getScheme())) {
- if (UserGroupInformation.isSecurityEnabled()) {
- return new HDFSLocationFactory(FileSystem.get(appDir, conf)).create(appDir);
- }
-
- String fsUser = System.getenv(EnvKeys.TWILL_FS_USER);
- if (fsUser == null) {
- throw new IllegalStateException("Missing environment variable " + EnvKeys.TWILL_FS_USER);
- }
- return new HDFSLocationFactory(FileSystem.get(appDir, conf, fsUser)).create(appDir);
+ // If not file, assuming it is a FileSystem, hence construct with HDFSLocationFactory which wraps
+ // a FileSystem created from the Configuration
+ if (UserGroupInformation.isSecurityEnabled()) {
+ return new HDFSLocationFactory(FileSystem.get(appDir, conf)).create(appDir);
}
- LOG.warn("Unsupported location type {}.", appDir);
- throw new IllegalArgumentException("Unsupported location type " + appDir);
+ String fsUser = System.getenv(EnvKeys.TWILL_FS_USER);
+ if (fsUser == null) {
+ throw new IllegalStateException("Missing environment variable " + EnvKeys.TWILL_FS_USER);
+ }
+ return new HDFSLocationFactory(FileSystem.get(appDir, conf, fsUser)).create(appDir);
} catch (Exception e) {
LOG.error("Failed to create application location for {}.", appDir);
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/ef8b1eae/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
index a444dda..ea77116 100644
--- a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
+++ b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
@@ -52,6 +52,7 @@ import org.apache.twill.api.TwillPreparer;
import org.apache.twill.api.TwillSpecification;
import org.apache.twill.api.logging.LogEntry;
import org.apache.twill.api.logging.LogHandler;
+import org.apache.twill.filesystem.HDFSLocationFactory;
import org.apache.twill.filesystem.Location;
import org.apache.twill.filesystem.LocationFactory;
import org.apache.twill.internal.ApplicationBundler;
@@ -451,8 +452,8 @@ final class YarnTwillPreparer implements TwillPreparer {
Location location;
URI uri = localFile.getURI();
- if ("hdfs".equals(uri.getScheme())) {
- // Assuming the location factory is HDFS one. If it is not, it will failed, which is the correct behavior.
+ if (locationFactory.getHomeLocation().toURI().getScheme().equals(uri.getScheme())) {
+ // If the source file location is having the same scheme as the target location, no need to copy
location = locationFactory.create(uri);
} else {
URL url = uri.toURL();
[15/22] incubator-twill git commit: (TWILL-161) Added back off logic
to SimplyKafkaConsumer
Posted by ch...@apache.org.
(TWILL-161) Added back off logic to SimplyKafkaConsumer
- Avoid excessive amount of polling and logs in case of failure
This closes #76 on GitHub.
Signed-off-by: Terence Yim <ch...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/incubator-twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-twill/commit/b740da43
Tree: http://git-wip-us.apache.org/repos/asf/incubator-twill/tree/b740da43
Diff: http://git-wip-us.apache.org/repos/asf/incubator-twill/diff/b740da43
Branch: refs/heads/site
Commit: b740da4385ec542268508882e10069832f5ec35c
Parents: 388a6d9
Author: Terence Yim <ch...@apache.org>
Authored: Tue Jan 5 14:04:29 2016 -0800
Committer: Terence Yim <ch...@apache.org>
Committed: Tue Jan 5 15:40:33 2016 -0800
----------------------------------------------------------------------
.../kafka/client/SimpleKafkaConsumer.java | 48 +++++++++++---
.../apache/twill/kafka/client/KafkaTest.java | 68 ++++++++++++++++++++
2 files changed, 108 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/b740da43/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java
index 230521c..8cfe889 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java
@@ -72,7 +72,8 @@ final class SimpleKafkaConsumer implements KafkaConsumer {
private static final int SO_TIMEOUT = 5 * 1000; // 5 seconds.
private static final int MAX_WAIT = 1000; // 1 second.
private static final long CONSUMER_EXPIRE_MINUTES = 1L; // close consumer if not used for 1 minute.
- private static final long CONSUMER_FAILURE_RETRY_INTERVAL = 2000L; // Sleep for 2 seconds if failure in consumer.
+ private static final long INIT_CONSUMER_FAILURE_BACKOFF = 100L; // Initial backoff for 100ms if failure in consumer.
+ private static final long MAX_CONSUMER_FAILURE_BACKOFF = 10000L; // Backoff max for 10 seconds if failure in consumer.
private static final long EMPTY_FETCH_WAIT = 500L; // Sleep for 500 ms if no message is fetched.
private final BrokerService brokerService;
@@ -328,16 +329,12 @@ final class SimpleKafkaConsumer implements KafkaConsumer {
final AtomicLong offset = new AtomicLong(startOffset);
Map.Entry<BrokerInfo, SimpleConsumer> consumerEntry = null;
-
+ ExponentialBackoff backoff = new ExponentialBackoff(INIT_CONSUMER_FAILURE_BACKOFF,
+ MAX_CONSUMER_FAILURE_BACKOFF, TimeUnit.MILLISECONDS);
while (running) {
if (consumerEntry == null && (consumerEntry = getConsumerEntry()) == null) {
LOG.debug("No leader for topic partition {}.", topicPart);
- try {
- TimeUnit.MILLISECONDS.sleep(CONSUMER_FAILURE_RETRY_INTERVAL);
- } catch (InterruptedException e) {
- // OK to ignore this, as interrupt would be caused by thread termination.
- LOG.trace("Consumer sleep interrupted.", e);
- }
+ backoff.backoff();
continue;
}
@@ -375,10 +372,12 @@ final class SimpleKafkaConsumer implements KafkaConsumer {
// Call the callback
invokeCallback(messages, offset);
+ backoff.reset();
} catch (Throwable t) {
if (running || !(t instanceof ClosedByInterruptException)) {
// Only log if it is still running, otherwise, it just the interrupt caused by the stop.
LOG.info("Exception when fetching message on {}.", topicPart, t);
+ backoff.backoff();
}
consumers.refresh(consumerEntry.getKey());
consumerEntry = null;
@@ -477,5 +476,38 @@ final class SimpleKafkaConsumer implements KafkaConsumer {
}
};
}
+
+ /**
+ * Helper class for performance exponential backoff on message fetching failure.
+ */
+ private final class ExponentialBackoff {
+ private final long initialBackoff;
+ private final long maxBackoff;
+ private final TimeUnit backoffUnit;
+ private int failureCount = 0;
+
+ private ExponentialBackoff(long initialBackoff, long maxBackoff, TimeUnit backoffUnit) {
+ this.initialBackoff = initialBackoff;
+ this.maxBackoff = maxBackoff;
+ this.backoffUnit = backoffUnit;
+ }
+
+ void backoff() {
+ failureCount++;
+ long multiplier = failureCount > Long.SIZE ? Long.MAX_VALUE : (1L << (failureCount - 1));
+ long backoff = Math.min(initialBackoff * multiplier, maxBackoff);
+ backoff = backoff < 0 ? maxBackoff : backoff;
+ try {
+ backoffUnit.sleep(backoff);
+ } catch (InterruptedException e) {
+ // OK to ignore since this method is called from the consumer thread only, which on thread shutdown,
+ // the thread will be interrupted
+ }
+ }
+
+ void reset() {
+ failureCount = 0;
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/b740da43/twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java
----------------------------------------------------------------------
diff --git a/twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java b/twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java
index 93119ab..4ac8ae4 100644
--- a/twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java
+++ b/twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java
@@ -42,8 +42,11 @@ import java.io.File;
import java.io.IOException;
import java.util.Iterator;
import java.util.Properties;
+import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
/**
@@ -84,6 +87,71 @@ public class KafkaTest {
}
@Test
+ public void testKafkaClientReconnect() throws Exception {
+ String topic = "backoff";
+ Properties kafkServerConfig = generateKafkaConfig(zkServer.getConnectionStr() + "/backoff");
+ EmbeddedKafkaServer server = new EmbeddedKafkaServer(kafkServerConfig);
+
+ ZKClientService zkClient = ZKClientService.Builder.of(zkServer.getConnectionStr() + "/backoff").build();
+ zkClient.startAndWait();
+ try {
+ zkClient.create("/", null, CreateMode.PERSISTENT).get();
+
+ ZKKafkaClientService kafkaClient = new ZKKafkaClientService(zkClient);
+ kafkaClient.startAndWait();
+
+ try {
+ server.startAndWait();
+ try {
+ // Publish a messages
+ createPublishThread(kafkaClient, topic, Compression.NONE, "First message", 1).start();
+
+ // Creater a consumer
+ final BlockingQueue<String> queue = new LinkedBlockingQueue<>();
+ Cancellable cancel = kafkaClient.getConsumer().prepare().add(topic, 0, 0)
+ .consume(new KafkaConsumer.MessageCallback() {
+ @Override
+ public void onReceived(Iterator<FetchedMessage> messages) {
+ while (messages.hasNext()) {
+ queue.offer(Charsets.UTF_8.decode(messages.next().getPayload()).toString());
+ }
+ }
+
+ @Override
+ public void finished() {
+ }
+ });
+
+ // Wait for the first message
+ Assert.assertEquals("0 First message", queue.poll(60, TimeUnit.SECONDS));
+
+ // Shutdown the server
+ server.stopAndWait();
+
+ // Start the server again.
+ // Needs to create a new instance with the same config since guava service cannot be restarted
+ server = new EmbeddedKafkaServer(kafkServerConfig);
+ server.startAndWait();
+
+ // Publish another message
+ createPublishThread(kafkaClient, topic, Compression.NONE, "Second message", 1).start();
+
+ // Should be able to get the second message
+ Assert.assertEquals("0 Second message", queue.poll(60, TimeUnit.SECONDS));
+
+ cancel.cancel();
+ } finally {
+ kafkaClient.stopAndWait();
+ }
+ } finally {
+ server.stopAndWait();
+ }
+ } finally {
+ zkClient.stopAndWait();
+ }
+ }
+
+ @Test
public void testKafkaClient() throws Exception {
String topic = "testClient";
[21/22] incubator-twill git commit: Update license statement year to
2016
Posted by ch...@apache.org.
Update license statement year to 2016
Project: http://git-wip-us.apache.org/repos/asf/incubator-twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-twill/commit/180e446d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-twill/tree/180e446d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-twill/diff/180e446d
Branch: refs/heads/site
Commit: 180e446d23709706bf189fed66529bc2be6c3884
Parents: 59bf388
Author: Terence Yim <ch...@apache.org>
Authored: Tue Jan 26 11:08:53 2016 -0800
Committer: Terence Yim <ch...@apache.org>
Committed: Tue Jan 26 11:08:53 2016 -0800
----------------------------------------------------------------------
NOTICE | 2 +-
pom.xml | 2 +-
2 files changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/180e446d/NOTICE
----------------------------------------------------------------------
diff --git a/NOTICE b/NOTICE
index 17c9ea9..a88b8d4 100644
--- a/NOTICE
+++ b/NOTICE
@@ -1,5 +1,5 @@
Apache Twill
-Copyright 2013-2015 The Apache Software Foundation
+Copyright 2013-2016 The Apache Software Foundation
This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/180e446d/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index c4792c4..839214c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -234,7 +234,7 @@
<link>http://docs.oracle.com/javase/7/docs/api/</link>
</links>
<bottom>
- <![CDATA[Copyright © 2013-2015 <a href="http://www.apache.org">The Apache Software Foundation</a>. All rights reserved.]]>
+ <![CDATA[Copyright © 2013-2016 <a href="http://www.apache.org">The Apache Software Foundation</a>. All rights reserved.]]>
</bottom>
</configuration>
<executions>
[17/22] incubator-twill git commit: Fix an easy to fail unit-test
Posted by ch...@apache.org.
Fix an easy to fail unit-test
- The test has race condition. The mutual discovery
doesn’t works well as one runnable can be finished
and not discoverable anymore before another one
tries to discover.
- Switch to use an Echo server / client runnable,
which only the client needs to discover the server.
This closes #77 on GitHub.
Signed-off-by: Terence Yim <ch...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/incubator-twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-twill/commit/900e3829
Tree: http://git-wip-us.apache.org/repos/asf/incubator-twill/tree/900e3829
Diff: http://git-wip-us.apache.org/repos/asf/incubator-twill/diff/900e3829
Branch: refs/heads/site
Commit: 900e382938090fb4585389cfa7d0acf020f088e8
Parents: 323e5af
Author: Terence Yim <ch...@apache.org>
Authored: Tue Jan 5 23:46:06 2016 -0800
Committer: Terence Yim <ch...@apache.org>
Committed: Wed Jan 6 22:16:31 2016 -0800
----------------------------------------------------------------------
.../twill/yarn/ServiceDiscoveryTestRun.java | 91 +++++++++++---------
.../org/apache/twill/yarn/SocketServer.java | 13 +--
2 files changed, 56 insertions(+), 48 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/900e3829/twill-yarn/src/test/java/org/apache/twill/yarn/ServiceDiscoveryTestRun.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/ServiceDiscoveryTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/ServiceDiscoveryTestRun.java
index 59fe835..77f53ad 100644
--- a/twill-yarn/src/test/java/org/apache/twill/yarn/ServiceDiscoveryTestRun.java
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/ServiceDiscoveryTestRun.java
@@ -17,8 +17,10 @@
*/
package org.apache.twill.yarn;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
+import com.google.common.io.LineReader;
import org.apache.twill.api.AbstractTwillRunnable;
-import org.apache.twill.api.Command;
import org.apache.twill.api.TwillApplication;
import org.apache.twill.api.TwillContext;
import org.apache.twill.api.TwillController;
@@ -34,9 +36,17 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
import java.io.PrintWriter;
+import java.io.Reader;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -51,18 +61,16 @@ public final class ServiceDiscoveryTestRun extends BaseYarnTest {
TwillController controller = twillRunner
.prepare(new ServiceApplication())
.addLogHandler(new PrinterLogHandler(new PrintWriter(System.out, true)))
- .withArguments("r1", "12345")
- .withArguments("r2", "45678")
+ .withApplicationArguments("echo")
.start();
- ServiceDiscovered completed = controller.discoverService("completed");
- Assert.assertTrue(waitForSize(completed, 2, 120));
- controller.sendCommand(Command.Builder.of("done").build());
- controller.awaitTerminated(120, TimeUnit.SECONDS);
+ ServiceDiscovered discovered = controller.discoverService("discovered");
+ Assert.assertTrue(waitForSize(discovered, 1, 120));
+ controller.terminate().get();
}
/**
- * An application that contains two {@link ServiceRunnable}.
+ * An application that contains an EchoServer and an EchoClient.
*/
public static final class ServiceApplication implements TwillApplication {
@@ -71,66 +79,63 @@ public final class ServiceDiscoveryTestRun extends BaseYarnTest {
return TwillSpecification.Builder.with()
.setName("ServiceApp")
.withRunnable()
- .add("r1", new ServiceRunnable()).noLocalFiles()
- .add("r2", new ServiceRunnable()).noLocalFiles()
+ .add("server", new EchoServer()).noLocalFiles()
+ .add("client", new EchoClient()).noLocalFiles()
.anyOrder()
.build();
}
}
/**
- * A Runnable that will announce on service and wait for announcement from another instance in the same service.
+ * A runnable to discover the echo server and issue a call to it.
*/
- public static final class ServiceRunnable extends AbstractTwillRunnable {
+ public static final class EchoClient extends AbstractTwillRunnable {
- private static final Logger LOG = LoggerFactory.getLogger(ServiceRunnable.class);
- private static final String SERVICE_NAME = "service";
- private final CountDownLatch stopLatch = new CountDownLatch(1);
+ private static final Logger LOG = LoggerFactory.getLogger(EchoClient.class);
+
+ private final CountDownLatch completion = new CountDownLatch(1);
@Override
public void run() {
- final int port = Integer.parseInt(getContext().getArguments()[0]);
- Cancellable cancelService = getContext().announce(SERVICE_NAME, port);
-
- final CountDownLatch discoveredLatch = new CountDownLatch(1);
-
- ServiceDiscovered serviceDiscovered = getContext().discover(SERVICE_NAME);
+ final BlockingQueue<Discoverable> discoverables = new LinkedBlockingQueue<>();
+ ServiceDiscovered serviceDiscovered = getContext().discover(getContext().getApplicationArguments()[0]);
serviceDiscovered.watchChanges(new ServiceDiscovered.ChangeListener() {
@Override
public void onChange(ServiceDiscovered serviceDiscovered) {
- // Try to find a discoverable that is not this instance
- for (Discoverable discoverable : serviceDiscovered) {
- int discoveredPort = discoverable.getSocketAddress().getPort();
- if (SERVICE_NAME.equals(discoverable.getName()) && discoveredPort != port) {
- LOG.info("{}: Service discovered at {}", getContext().getSpecification().getName(), discoveredPort);
- discoveredLatch.countDown();
- }
- }
+ Iterables.addAll(discoverables, serviceDiscovered);
}
}, Threads.SAME_THREAD_EXECUTOR);
try {
- discoveredLatch.await();
- } catch (InterruptedException e) {
- LOG.warn("Interrupted.", e);
- }
+ Discoverable discoverable = discoverables.poll(120, TimeUnit.SECONDS);
+ // Make a call to the echo server
+ InetSocketAddress address = discoverable.getSocketAddress();
+ Socket socket = new Socket(address.getAddress(), address.getPort());
+ String message = "Hello World";
+ try (PrintWriter printer = new PrintWriter(new OutputStreamWriter(socket.getOutputStream(), "UTF-8"))) {
+ printer.println(message);
+ printer.flush();
- // Announce the "complete" service so that the driver knows this runnable has discovered the other
- Cancellable cancelCompleted = getContext().announce("completed", port);
- try {
- stopLatch.await();
- cancelService.cancel();
- cancelCompleted.cancel();
+ try (Reader reader = new InputStreamReader(socket.getInputStream(), "UTF-8")) {
+ LineReader lineReader = new LineReader(reader);
+ String line = lineReader.readLine();
+ Preconditions.checkState(message.equals(line), "Expected %s, got %s", message, line);
+ }
+ }
+
+ Cancellable cancellable = getContext().announce("discovered", 12345);
+ completion.await();
+ cancellable.cancel();
} catch (InterruptedException e) {
LOG.warn("Interrupted.", e);
+ } catch (IOException e) {
+ LOG.error("Failed to talk to server", e);
}
}
@Override
- public void handleCommand(Command command) throws Exception {
- if ("done".equals(command.getCommand())) {
- stopLatch.countDown();
- }
+ public void stop() {
+ completion.countDown();
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/900e3829/twill-yarn/src/test/java/org/apache/twill/yarn/SocketServer.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/SocketServer.java b/twill-yarn/src/test/java/org/apache/twill/yarn/SocketServer.java
index fd38576..e9e6a99 100644
--- a/twill-yarn/src/test/java/org/apache/twill/yarn/SocketServer.java
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/SocketServer.java
@@ -19,7 +19,6 @@ package org.apache.twill.yarn;
import com.google.common.base.Charsets;
import com.google.common.base.Throwables;
-import com.google.common.collect.ImmutableList;
import org.apache.twill.api.AbstractTwillRunnable;
import org.apache.twill.api.TwillContext;
import org.apache.twill.common.Cancellable;
@@ -34,6 +33,7 @@ import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
+import java.util.ArrayList;
import java.util.List;
/**
@@ -57,10 +57,13 @@ public abstract class SocketServer extends AbstractTwillRunnable {
", id: " + context.getInstanceId() +
", count: " + context.getInstanceCount());
- final List<Cancellable> cancellables = ImmutableList.of(
- context.announce(context.getApplicationArguments()[0], serverSocket.getLocalPort()),
- context.announce(context.getArguments()[0], serverSocket.getLocalPort())
- );
+ // Announce with service names as specified in app arguments and runnable arguments
+ final List<Cancellable> cancellables = new ArrayList<>();
+ for (String[] args : new String[][] {context.getApplicationArguments(), context.getArguments()}) {
+ if (args.length > 0) {
+ cancellables.add(context.announce(args[0], serverSocket.getLocalPort()));
+ }
+ }
canceller = new Cancellable() {
@Override
public void cancel() {
[12/22] incubator-twill git commit: Simple JavaDoc fix for a typo,
from denoteed to denoted.
Posted by ch...@apache.org.
Simple JavaDoc fix for a typo, from denoteed to denoted.
Signed-off-by: Terence Yim <ch...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/incubator-twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-twill/commit/65c730b8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-twill/tree/65c730b8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-twill/diff/65c730b8
Branch: refs/heads/site
Commit: 65c730b82528da669c6abaae96284e18defd8402
Parents: 359b12b
Author: Henry Saputra <hs...@apache.org>
Authored: Sat Dec 26 13:48:39 2015 -0800
Committer: Terence Yim <ch...@apache.org>
Committed: Sun Dec 27 10:53:05 2015 +0800
----------------------------------------------------------------------
.../src/main/java/org/apache/twill/filesystem/HDFSLocation.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/65c730b8/twill-yarn/src/main/java/org/apache/twill/filesystem/HDFSLocation.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/filesystem/HDFSLocation.java b/twill-yarn/src/main/java/org/apache/twill/filesystem/HDFSLocation.java
index 2ab97db..818fe23 100644
--- a/twill-yarn/src/main/java/org/apache/twill/filesystem/HDFSLocation.java
+++ b/twill-yarn/src/main/java/org/apache/twill/filesystem/HDFSLocation.java
@@ -121,7 +121,7 @@ final class HDFSLocation implements Location {
}
/**
- * @return Returns the name of the file or directory denoteed by this abstract pathname.
+ * @return Returns the name of the file or directory denoted by this abstract pathname.
*/
@Override
public String getName() {