You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@twill.apache.org by ch...@apache.org on 2015/01/26 23:32:22 UTC
[4/5] incubator-twill git commit: (TWILL-63) Speed up launch time a bit by reusing appMaster.jar and container.jar if first tier classes doesn’t changed.
(TWILL-63) Speed up launch time a bit by reusing appMaster.jar and container.jar if first tier classes doesn’t changed.
Project: http://git-wip-us.apache.org/repos/asf/incubator-twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-twill/commit/c13e05ba
Tree: http://git-wip-us.apache.org/repos/asf/incubator-twill/tree/c13e05ba
Diff: http://git-wip-us.apache.org/repos/asf/incubator-twill/diff/c13e05ba
Branch: refs/heads/twill-63
Commit: c13e05ba9b65e2d42f916d4ba6bc428290ca7f7a
Parents: b1adf69
Author: Terence Yim <te...@continuuity.com>
Authored: Fri Jun 20 23:38:12 2014 -0700
Committer: Terence Yim <ch...@apache.org>
Committed: Tue Jan 13 11:33:15 2015 -0800
----------------------------------------------------------------------
.../apache/twill/filesystem/LocalLocation.java | 27 +++
.../org/apache/twill/filesystem/Location.java | 17 ++
.../apache/twill/filesystem/LocationStatus.java | 67 ++++++
.../org/apache/twill/filesystem/Locations.java | 85 ++++++++
.../twill/internal/ApplicationBundler.java | 21 +-
.../org/apache/twill/internal/BundleCache.java | 206 +++++++++++++++++++
.../java/org/apache/twill/internal/Bundler.java | 50 +++++
.../org/apache/twill/internal/Constants.java | 24 +++
.../internal/utils/ApplicationBundlerTest.java | 3 +-
.../apache/twill/filesystem/HDFSLocation.java | 31 ++-
.../apache/twill/yarn/YarnTwillPreparer.java | 21 +-
.../twill/yarn/YarnTwillRunnerService.java | 27 ++-
.../org/apache/twill/yarn/YarnTestSuite.java | 21 ++
13 files changed, 567 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/c13e05ba/twill-common/src/main/java/org/apache/twill/filesystem/LocalLocation.java
----------------------------------------------------------------------
diff --git a/twill-common/src/main/java/org/apache/twill/filesystem/LocalLocation.java b/twill-common/src/main/java/org/apache/twill/filesystem/LocalLocation.java
index 6dec09c..9f0f7eb 100644
--- a/twill-common/src/main/java/org/apache/twill/filesystem/LocalLocation.java
+++ b/twill-common/src/main/java/org/apache/twill/filesystem/LocalLocation.java
@@ -211,6 +211,13 @@ final class LocalLocation implements Location {
}
@Override
+ public void setLastModified(long time) throws IOException {
+ if (!file.setLastModified(time)) {
+ throw new IOException("Failed to set last modified time on " + file);
+ }
+ }
+
+ @Override
public boolean isDirectory() throws IOException {
return file.isDirectory();
}
@@ -230,6 +237,21 @@ final class LocalLocation implements Location {
}
@Override
+ public List<LocationStatus> listStatus() throws IOException {
+ File[] files = file.listFiles();
+ ImmutableList.Builder<LocationStatus> result = ImmutableList.builder();
+ if (files != null) {
+ for (File file : files) {
+ Location location = new LocalLocation(locationFactory, file);
+ result.add(new LocationStatus(location, file.isDirectory(), file.lastModified(), file.length()));
+ }
+ } else if (!file.exists()) {
+ throw new FileNotFoundException("File " + file + " does not exist.");
+ }
+ return result.build();
+ }
+
+ @Override
public LocationFactory getLocationFactory() {
return locationFactory;
}
@@ -251,4 +273,9 @@ final class LocalLocation implements Location {
public int hashCode() {
return file.hashCode();
}
+
+ @Override
+ public String toString() {
+ return file.toURI().toString();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/c13e05ba/twill-common/src/main/java/org/apache/twill/filesystem/Location.java
----------------------------------------------------------------------
diff --git a/twill-common/src/main/java/org/apache/twill/filesystem/Location.java b/twill-common/src/main/java/org/apache/twill/filesystem/Location.java
index 04edd27..2f5e198 100644
--- a/twill-common/src/main/java/org/apache/twill/filesystem/Location.java
+++ b/twill-common/src/main/java/org/apache/twill/filesystem/Location.java
@@ -154,6 +154,14 @@ public interface Location {
long lastModified() throws IOException;
/**
+ * Sets the last modified time of file.
+ *
+ * @param time The new last-modified time, measured in milliseconds since
+ * the epoch (00:00:00 GMT, January 1, 1970)
+ */
+ void setLastModified(long time) throws IOException;
+
+ /**
* Checks if this location represents a directory.
*
* @return {@code true} if it is a directory, {@code false} otherwise.
@@ -169,6 +177,15 @@ public interface Location {
List<Location> list() throws IOException;
/**
+ * List the locations under this location with status. Each {@link LocationStatus} returned represents
+ * status of the associated {@link Location} at the time when list is performed.
+ *
+ * @return Immutable List of {@link LocationStatus} for locations under this location.
+ * An empty list is returned if this location is not a directory.
+ */
+ List<LocationStatus> listStatus() throws IOException;
+
+ /**
* Returns the location factory used to create this instance.
*
* @return The {@link LocationFactory} instance for creating this instance.
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/c13e05ba/twill-common/src/main/java/org/apache/twill/filesystem/LocationStatus.java
----------------------------------------------------------------------
diff --git a/twill-common/src/main/java/org/apache/twill/filesystem/LocationStatus.java b/twill-common/src/main/java/org/apache/twill/filesystem/LocationStatus.java
new file mode 100644
index 0000000..ef6463b
--- /dev/null
+++ b/twill-common/src/main/java/org/apache/twill/filesystem/LocationStatus.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.filesystem;
+
+/**
+ * Contains status of a {@link Location}.
+ */
+public class LocationStatus {
+
+ private final Location location;
+ private final boolean directory;
+ private final long lastModified;
+ private final long length;
+
+ /**
+ * Constructs a location status.
+ */
+ public LocationStatus(Location location, boolean directory, long lastModified, long length) {
+ this.location = location;
+ this.directory = directory;
+ this.lastModified = lastModified;
+ this.length = length;
+ }
+
+ /**
+ * Returns the {@link Location} that this status is for.
+ */
+ public Location getLocation() {
+ return location;
+ }
+
+ /**
+ * Returns {@code true} if the location is a directory, {@code false} otherwise.
+ */
+ public boolean isDirectory() {
+ return directory;
+ }
+
+ /**
+ * Returns the last modified timestamp in milliseconds for the location.
+ */
+ public long getLastModified() {
+ return lastModified;
+ }
+
+ /**
+ * Returns the length of the file represented by the location.
+ */
+ public long getLength() {
+ return length;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/c13e05ba/twill-common/src/main/java/org/apache/twill/filesystem/Locations.java
----------------------------------------------------------------------
diff --git a/twill-common/src/main/java/org/apache/twill/filesystem/Locations.java b/twill-common/src/main/java/org/apache/twill/filesystem/Locations.java
new file mode 100644
index 0000000..3eecd4c
--- /dev/null
+++ b/twill-common/src/main/java/org/apache/twill/filesystem/Locations.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.filesystem;
+
+import com.google.common.io.InputSupplier;
+import com.google.common.io.OutputSupplier;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URI;
+import javax.annotation.Nullable;
+
+/**
+ * Utility class for interacting with {@link Location}.
+ */
+public final class Locations {
+
+ /**
+ * Creates a new {@link InputSupplier} that can provides {@link InputStream} from the given location.
+ *
+ * @param location Location for the input stream.
+ * @return A {@link InputSupplier}.
+ */
+ public static InputSupplier<InputStream> newInputSupplier(final Location location) {
+ return new InputSupplier<InputStream>() {
+ @Override
+ public InputStream getInput() throws IOException {
+ return location.getInputStream();
+ }
+ };
+ }
+
+ /**
+ * Creates a new {@link OutputSupplier} that can provides {@link OutputStream} for the given location.
+ *
+ * @param location Location for the output.
+ * @return A {@link com.google.common.io.OutputSupplier}.
+ */
+ public static OutputSupplier<OutputStream> newOutputSupplier(final Location location) {
+ return new OutputSupplier<OutputStream>() {
+ @Override
+ public OutputStream getOutput() throws IOException {
+ return location.getOutputStream();
+ }
+ };
+ }
+
+ /**
+ * Creates a {@link Location} instance which represents the parent of the given location.
+ *
+ * @param location location to extra parent from.
+ * @return an instance representing the parent location or {@code null} if there is no parent.
+ */
+ @Nullable
+ public static Location getParent(Location location) {
+ URI source = location.toURI();
+
+ // If it is root, return null
+ if ("/".equals(source.getPath())) {
+ return null;
+ }
+
+ URI resolvedParent = URI.create(source.toString() + "/..").normalize();
+ return location.getLocationFactory().create(resolvedParent);
+ }
+
+ private Locations() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/c13e05ba/twill-core/src/main/java/org/apache/twill/internal/ApplicationBundler.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/ApplicationBundler.java b/twill-core/src/main/java/org/apache/twill/internal/ApplicationBundler.java
index 3f2d073..8a482e2 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/ApplicationBundler.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/ApplicationBundler.java
@@ -54,7 +54,7 @@ import java.util.zip.CheckedOutputStream;
/**
* This class builds jar files based on class dependencies.
*/
-public final class ApplicationBundler {
+public final class ApplicationBundler implements Bundler {
private static final Logger LOG = LoggerFactory.getLogger(ApplicationBundler.class);
@@ -102,28 +102,17 @@ public final class ApplicationBundler {
}
+ @Override
public void createBundle(Location target, Iterable<Class<?>> classes) throws IOException {
createBundle(target, classes, ImmutableList.<URI>of());
}
- /**
- * Same as calling {@link #createBundle(Location, Iterable)}.
- */
- public void createBundle(Location target, Class<?> clz, Class<?>...classes) throws IOException {
+ @Override
+ public void createBundle(Location target, Class<?> clz, Class<?>... classes) throws IOException {
createBundle(target, ImmutableSet.<Class<?>>builder().add(clz).add(classes).build());
}
- /**
- * Creates a jar file which includes all the given classes and all the classes that they depended on.
- * The jar will also include all classes and resources under the packages as given as include packages
- * in the constructor.
- *
- * @param target Where to save the target jar file.
- * @param resources Extra resources to put into the jar file. If resource is a jar file, it'll be put under
- * lib/ entry, otherwise under the resources/ entry.
- * @param classes Set of classes to start the dependency traversal.
- * @throws IOException
- */
+ @Override
public void createBundle(Location target, Iterable<Class<?>> classes, Iterable<URI> resources) throws IOException {
LOG.debug("start creating bundle {}. building a temporary file locally at first", target.getName());
// Write the jar to local tmp file first
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/c13e05ba/twill-core/src/main/java/org/apache/twill/internal/BundleCache.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/BundleCache.java b/twill-core/src/main/java/org/apache/twill/internal/BundleCache.java
new file mode 100644
index 0000000..82f9142
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/BundleCache.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal;
+
+import com.google.common.base.Function;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.hash.Hashing;
+import com.google.common.io.ByteStreams;
+import com.google.common.io.InputSupplier;
+import com.google.common.primitives.Longs;
+import com.google.common.util.concurrent.AbstractIdleService;
+import org.apache.twill.api.RunId;
+import org.apache.twill.filesystem.Location;
+import org.apache.twill.filesystem.LocationStatus;
+import org.apache.twill.filesystem.Locations;
+import org.objectweb.asm.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
+
+/**
+ * Cache for generated bundle jars.
+ */
+public final class BundleCache extends AbstractIdleService implements Bundler {
+
+ private static final Logger LOG = LoggerFactory.getLogger(BundleCache.class);
+ private static final String ACTIVE_FILE_NAME = "active";
+
+ private static final Function<Class<?>, InputSupplier<InputStream>> CLASS_INPUT =
+ new Function<Class<?>, InputSupplier<InputStream>>() {
+ @Override
+ public InputSupplier<InputStream> apply(final Class<?> clz) {
+ return new InputSupplier<InputStream>() {
+ @Override
+ public InputStream getInput() throws IOException {
+ return clz.getClassLoader().getResourceAsStream(Type.getInternalName(clz) + ".class");
+ }
+ };
+ }
+ };
+
+ private static final Function<URI, InputSupplier<InputStream>> URI_INPUT =
+ new Function<URI, InputSupplier<InputStream>>() {
+ @Override
+ public InputSupplier<InputStream> apply(final URI uri) {
+ return new InputSupplier<InputStream>() {
+ @Override
+ public InputStream getInput() throws IOException {
+ return uri.toURL().openStream();
+ }
+ };
+ }
+ };
+
+ private final Location cacheDir;
+ private final Bundler bundler;
+ private final long cleanupPeriodMs;
+ private final int maxEntries;
+ private final RunId sessionId;
+ private Timer timer;
+
+ public BundleCache(Location cacheDir, Bundler bundler, long cleanupPeriodMs, int maxEntries) {
+ this.cacheDir = cacheDir;
+ this.bundler = bundler;
+ this.cleanupPeriodMs = cleanupPeriodMs;
+ this.maxEntries = maxEntries;
+ this.sessionId = RunIds.generate();
+ }
+
+ @Override
+ public void createBundle(Location target, Iterable<Class<?>> classes) throws IOException {
+ createBundle(target, classes, ImmutableList.<URI>of());
+ }
+
+ @Override
+ public void createBundle(Location target, Class<?> clz, Class<?>... classes) throws IOException {
+ createBundle(target, ImmutableSet.<Class<?>>builder().add(clz).add(classes).build());
+ }
+
+ @Override
+ public void createBundle(Location target, Iterable<Class<?>> classes, Iterable<URI> resources) throws IOException {
+ // Compute hash of all classes
+ Iterable<InputSupplier<InputStream>> inputSuppliers = Iterables.concat(Iterables.transform(classes, CLASS_INPUT),
+ Iterables.transform(resources, URI_INPUT));
+ String hash = ByteStreams.hash(ByteStreams.join(inputSuppliers), Hashing.sha256()).toString();
+
+ // See if there is a cached copy
+ Location location = cacheDir.append(sessionId.getId()).append(hash);
+ if (!location.exists()) {
+ // No cache, call the application bundler.
+ Location tmpLocation = location.getTempFile(".tmp");
+ bundler.createBundle(tmpLocation, classes);
+
+ tmpLocation.renameTo(location);
+ } else {
+ LOG.debug("Cached bundle found for {}.", target);
+ // Touch the file with current time. It's for cache cleanup process to remove old cache.
+ location.setLastModified(System.currentTimeMillis());
+ }
+
+ // Copy from the cached location to the target
+ ByteStreams.copy(Locations.newInputSupplier(location), Locations.newOutputSupplier(target));
+ }
+
+ @Override
+ protected void startUp() throws Exception {
+ // Create the session and touch the active file inside the session directory.
+ // It is needed for cleanup check.
+ final Location activeFile = cacheDir.append(sessionId.getId()).append(ACTIVE_FILE_NAME);
+ activeFile.createNew();
+
+ // Schedule the cleanup task
+ timer = new Timer("bundle-cache-cleanup", true);
+ timer.schedule(new TimerTask() {
+ @Override
+ public void run() {
+ try {
+ long now = System.currentTimeMillis();
+
+ // Touch the active file
+ activeFile.setLastModified(now);
+
+ // Remove all directories that don't have active file or it has not been updated 4 * the cleanup cycle
+ List<Location> locations = cacheDir.list();
+ long expireTime = now - cleanupPeriodMs * 4;
+ for (Location location : locations) {
+ // No need to handle current session
+ if (location.getName().equals(sessionId.getId())) {
+ continue;
+ }
+
+ Location active = location.append(ACTIVE_FILE_NAME);
+ try {
+ if (!active.exists() || active.lastModified() <= expireTime) {
+ location.delete(true);
+ }
+ } catch (IOException e) {
+ LOG.error("Failed to cleanup location {}", location, e);
+ }
+ }
+ } catch (Throwable t) {
+ LOG.error("Failed to perform bundle cache cleanup.", t);
+ }
+
+ try {
+ // Handle current session cleanup
+ List<LocationStatus> statusList = cacheDir.append(sessionId.getId()).listStatus();
+ if (statusList.size() <= maxEntries) {
+ return;
+ }
+
+ // Sort the cached entries based on time.
+ statusList = Lists.newArrayList(statusList);
+ Collections.sort(statusList, new Comparator<LocationStatus>() {
+ @Override
+ public int compare(LocationStatus o1, LocationStatus o2) {
+ return Longs.compare(o1.getLastModified(), o2.getLastModified());
+ }
+ });
+
+ // Remove N cached entries with smallest last modified time.
+ int toIndex = statusList.size() - maxEntries;
+ for (LocationStatus status : statusList.subList(0, toIndex)) {
+ status.getLocation().delete();
+ }
+
+ } catch (Throwable t) {
+ LOG.error("Failed to perform bundle cache cleanup.", t);
+ }
+ }
+ }, 0L, cleanupPeriodMs);
+ }
+
+ @Override
+ protected void shutDown() throws Exception {
+ timer.cancel();
+ Location activeFile = cacheDir.append(sessionId.getId()).append(ACTIVE_FILE_NAME);
+ activeFile.delete();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/c13e05ba/twill-core/src/main/java/org/apache/twill/internal/Bundler.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/Bundler.java b/twill-core/src/main/java/org/apache/twill/internal/Bundler.java
new file mode 100644
index 0000000..7c42bd4
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/Bundler.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal;
+
+import org.apache.twill.filesystem.Location;
+
+import java.io.IOException;
+import java.net.URI;
+
+/**
+ *
+ */
+public interface Bundler {
+
+
+ void createBundle(Location target, Iterable<Class<?>> classes) throws IOException;
+
+ /**
+ * Same as calling {@link #createBundle(org.apache.twill.filesystem.Location, Iterable)}.
+ */
+ void createBundle(Location target, Class<?> clz, Class<?>... classes) throws IOException;
+
+ /**
+ * Creates a jar file which includes all the given classes and all the classes that they depended on.
+ * The jar will also include all classes and resources under the packages as given as include packages
+ * in the constructor.
+ *
+ * @param target Where to save the target jar file.
+ * @param resources Extra resources to put into the jar file. If resource is a jar file, it'll be put under
+ * lib/ entry, otherwise under the resources/ entry.
+ * @param classes Set of classes to start the dependency traversal.
+ * @throws java.io.IOException
+ */
+ void createBundle(Location target, Iterable<Class<?>> classes, Iterable<URI> resources) throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/c13e05ba/twill-core/src/main/java/org/apache/twill/internal/Constants.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/Constants.java b/twill-core/src/main/java/org/apache/twill/internal/Constants.java
index fbc6e70..d67545f 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/Constants.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/Constants.java
@@ -64,6 +64,30 @@ public final class Constants {
}
}
+ /**
+ * Constants related to {@link org.apache.twill.internal.BundleCache}.
+ */
+ public static final class BundleCache {
+
+ /** Key in Configuration for enable usage of bundle cache. */
+ public static final String ENABLE = "twill.bundle.cache.enable";
+
+ /** Key in Configuration for how often to perform cache cleanup. */
+ public static final String CLEANUP_SECONDS = "twill.bundle.cache.cleanup.seconds";
+
+ /** Key in Configuration for maximum cached entries to maintain. */
+ public static final String MAX_ENTRIES = "twill.bundle.cache.cleanup.max.entries";
+
+ // For bundle jar caching
+ public static final String CACHE_DIR = ".cached";
+
+ /** By default how often does the bundle cache cleanup run. */
+ public static final long DEFAULT_CLEANUP_SECONDS = 1800;
+
+ /** By default the max. number of entries in the cache. */
+ public static final int DEFAULT_MAX_ENTRIES = 1000;
+ }
+
private Constants() {
}
}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/c13e05ba/twill-core/src/test/java/org/apache/twill/internal/utils/ApplicationBundlerTest.java
----------------------------------------------------------------------
diff --git a/twill-core/src/test/java/org/apache/twill/internal/utils/ApplicationBundlerTest.java b/twill-core/src/test/java/org/apache/twill/internal/utils/ApplicationBundlerTest.java
index 38d0a05..1debc69 100644
--- a/twill-core/src/test/java/org/apache/twill/internal/utils/ApplicationBundlerTest.java
+++ b/twill-core/src/test/java/org/apache/twill/internal/utils/ApplicationBundlerTest.java
@@ -24,6 +24,7 @@ import com.google.common.io.Files;
import org.apache.twill.filesystem.LocalLocationFactory;
import org.apache.twill.filesystem.Location;
import org.apache.twill.internal.ApplicationBundler;
+import org.apache.twill.internal.Bundler;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
@@ -52,7 +53,7 @@ public class ApplicationBundlerTest {
Location location = new LocalLocationFactory(tmpDir.newFolder()).create("test.jar");
// Create a jar file with by tracing dependency
- ApplicationBundler bundler = new ApplicationBundler(ImmutableList.<String>of());
+ Bundler bundler = new ApplicationBundler(ImmutableList.<String>of());
bundler.createBundle(location, ApplicationBundler.class);
File targetDir = tmpDir.newFolder();
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/c13e05ba/twill-yarn/src/main/java/org/apache/twill/filesystem/HDFSLocation.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/filesystem/HDFSLocation.java b/twill-yarn/src/main/java/org/apache/twill/filesystem/HDFSLocation.java
index ed86cb8..7edbe8c 100644
--- a/twill-yarn/src/main/java/org/apache/twill/filesystem/HDFSLocation.java
+++ b/twill-yarn/src/main/java/org/apache/twill/filesystem/HDFSLocation.java
@@ -17,8 +17,10 @@
*/
package org.apache.twill.filesystem;
+import com.google.common.base.Function;
import com.google.common.base.Objects;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -38,6 +40,15 @@ import java.util.UUID;
* A concrete implementation of {@link Location} for the HDFS filesystem.
*/
final class HDFSLocation implements Location {
+
+ private static final Function<LocationStatus, Location> STATUS_TO_LOCATION =
+ new Function<LocationStatus, Location>() {
+ @Override
+ public Location apply(LocationStatus input) {
+ return input.getLocation();
+ }
+ };
+
private final FileSystem fs;
private final Path path;
private final HDFSLocationFactory locationFactory;
@@ -198,18 +209,29 @@ final class HDFSLocation implements Location {
}
@Override
+ public void setLastModified(long time) throws IOException {
+ fs.setTimes(path, time, -1);
+ }
+
+ @Override
public boolean isDirectory() throws IOException {
return fs.isDirectory(path);
}
@Override
public List<Location> list() throws IOException {
+ return Lists.transform(listStatus(), STATUS_TO_LOCATION);
+ }
+
+ @Override
+ public List<LocationStatus> listStatus() throws IOException {
FileStatus[] statuses = fs.listStatus(path);
- ImmutableList.Builder<Location> result = ImmutableList.builder();
+ ImmutableList.Builder<LocationStatus> result = ImmutableList.builder();
if (statuses != null) {
for (FileStatus status : statuses) {
if (!Objects.equal(path, status.getPath())) {
- result.add(new HDFSLocation(locationFactory, status.getPath()));
+ Location location = new HDFSLocation(locationFactory, status.getPath());
+ result.add(new LocationStatus(location, status.isDirectory(), status.getModificationTime(), status.getLen()));
}
}
}
@@ -238,4 +260,9 @@ final class HDFSLocation implements Location {
public int hashCode() {
return path.hashCode();
}
+
+ @Override
+ public String toString() {
+ return path.toUri().toString();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/c13e05ba/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
index 0e0fc75..d9a2b08 100644
--- a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
+++ b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
@@ -53,8 +53,8 @@ import org.apache.twill.api.TwillSpecification;
import org.apache.twill.api.logging.LogHandler;
import org.apache.twill.filesystem.Location;
import org.apache.twill.filesystem.LocationFactory;
-import org.apache.twill.internal.ApplicationBundler;
import org.apache.twill.internal.Arguments;
+import org.apache.twill.internal.Bundler;
import org.apache.twill.internal.Configs;
import org.apache.twill.internal.Constants;
import org.apache.twill.internal.DefaultLocalFile;
@@ -114,6 +114,7 @@ final class YarnTwillPreparer implements TwillPreparer {
private final LocationFactory locationFactory;
private final YarnTwillControllerFactory controllerFactory;
private final RunId runId;
+ private final Bundler bundler;
private final List<LogHandler> logHandlers = Lists.newArrayList();
private final List<String> arguments = Lists.newArrayList();
@@ -127,9 +128,8 @@ final class YarnTwillPreparer implements TwillPreparer {
private String extraOptions;
private JvmOptions.DebugOptions debugOptions = JvmOptions.DebugOptions.NO_DEBUG;
- YarnTwillPreparer(YarnConfiguration yarnConfig, TwillSpecification twillSpec,
- YarnAppClient yarnAppClient, ZKClient zkClient,
- LocationFactory locationFactory, String extraOptions,
+ YarnTwillPreparer(YarnConfiguration yarnConfig, TwillSpecification twillSpec, YarnAppClient yarnAppClient,
+ ZKClient zkClient, LocationFactory locationFactory, String extraOptions, Bundler bundler,
YarnTwillControllerFactory controllerFactory) {
this.yarnConfig = yarnConfig;
this.twillSpec = twillSpec;
@@ -138,6 +138,7 @@ final class YarnTwillPreparer implements TwillPreparer {
this.locationFactory = locationFactory;
this.controllerFactory = controllerFactory;
this.runId = RunIds.generate();
+ this.bundler = bundler;
this.credentials = createCredentials();
this.reservedMemory = yarnConfig.getInt(Configs.Keys.JAVA_RESERVED_MEMORY_MB,
Configs.Defaults.JAVA_RESERVED_MEMORY_MB);
@@ -260,8 +261,8 @@ final class YarnTwillPreparer implements TwillPreparer {
// Local files declared by runnables
Multimap<String, LocalFile> runnableLocalFiles = HashMultimap.create();
- createAppMasterJar(createBundler(), localFiles);
- createContainerJar(createBundler(), localFiles);
+ createAppMasterJar(bundler, localFiles);
+ createContainerJar(bundler, localFiles);
populateRunnableLocalFiles(twillSpec, runnableLocalFiles);
saveSpecification(twillSpec, runnableLocalFiles, localFiles);
saveLogback(localFiles);
@@ -337,10 +338,6 @@ final class YarnTwillPreparer implements TwillPreparer {
return credentials;
}
- private ApplicationBundler createBundler() {
- return new ApplicationBundler(ImmutableList.<String>of());
- }
-
private LocalFile createLocalFile(String name, Location location) throws IOException {
return createLocalFile(name, location, false);
}
@@ -349,7 +346,7 @@ final class YarnTwillPreparer implements TwillPreparer {
return new DefaultLocalFile(name, location.toURI(), location.lastModified(), location.length(), archive, null);
}
- private void createAppMasterJar(ApplicationBundler bundler, Map<String, LocalFile> localFiles) throws IOException {
+ private void createAppMasterJar(Bundler bundler, Map<String, LocalFile> localFiles) throws IOException {
try {
LOG.debug("Create and copy {}", Constants.Files.APP_MASTER_JAR);
Location location = createTempLocation(Constants.Files.APP_MASTER_JAR);
@@ -374,7 +371,7 @@ final class YarnTwillPreparer implements TwillPreparer {
}
}
- private void createContainerJar(ApplicationBundler bundler, Map<String, LocalFile> localFiles) throws IOException {
+ private void createContainerJar(Bundler bundler, Map<String, LocalFile> localFiles) throws IOException {
try {
Set<Class<?>> classes = Sets.newIdentityHashSet();
classes.add(TwillContainerMain.class);
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/c13e05ba/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java
index 6a33131..d8a3480 100644
--- a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java
+++ b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java
@@ -37,6 +37,7 @@ import com.google.common.util.concurrent.AbstractIdleService;
import com.google.common.util.concurrent.Callables;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.Service;
import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
@@ -64,6 +65,9 @@ import org.apache.twill.common.Threads;
import org.apache.twill.filesystem.HDFSLocationFactory;
import org.apache.twill.filesystem.Location;
import org.apache.twill.filesystem.LocationFactory;
+import org.apache.twill.internal.ApplicationBundler;
+import org.apache.twill.internal.BundleCache;
+import org.apache.twill.internal.Bundler;
import org.apache.twill.internal.Constants;
import org.apache.twill.internal.ProcessController;
import org.apache.twill.internal.RunIds;
@@ -128,6 +132,7 @@ public final class YarnTwillRunnerService extends AbstractIdleService implements
private final ZKClientService zkClientService;
private final LocationFactory locationFactory;
private final Table<String, RunId, YarnTwillController> controllers;
+ private final Bundler bundler;
private ScheduledExecutorService secureStoreScheduler;
private Iterable<LiveInfo> liveInfos;
@@ -159,6 +164,18 @@ public final class YarnTwillRunnerService extends AbstractIdleService implements
this.locationFactory = locationFactory;
this.zkClientService = getZKClientService(zkConnect);
this.controllers = HashBasedTable.create();
+
+ Bundler bundler = new ApplicationBundler(ImmutableList.<String>of());
+ if (config.getBoolean(Constants.BundleCache.ENABLE, false)) {
+ LOG.info("Use bundle cache");
+ long cleanupMs = TimeUnit.SECONDS.toMillis(config.getLong(Constants.BundleCache.CLEANUP_SECONDS,
+ Constants.BundleCache.DEFAULT_CLEANUP_SECONDS));
+ int maxEntries = config.getInt(Constants.BundleCache.MAX_ENTRIES, Constants.BundleCache.DEFAULT_MAX_ENTRIES);
+
+ bundler = new BundleCache(locationFactory.create(Constants.BundleCache.CACHE_DIR),
+ bundler, cleanupMs, maxEntries);
+ }
+ this.bundler = bundler;
}
/**
@@ -253,8 +270,8 @@ public final class YarnTwillRunnerService extends AbstractIdleService implements
final TwillSpecification twillSpec = application.configure();
final String appName = twillSpec.getName();
- return new YarnTwillPreparer(yarnConfig, twillSpec, yarnAppClient, zkClientService, locationFactory, jvmOptions,
- new YarnTwillControllerFactory() {
+ return new YarnTwillPreparer(yarnConfig, twillSpec, yarnAppClient, zkClientService,
+ locationFactory, jvmOptions, bundler, new YarnTwillControllerFactory() {
@Override
public YarnTwillController create(RunId runId, Iterable<LogHandler> logHandlers,
Callable<ProcessController<YarnApplicationReport>> startUp) {
@@ -298,6 +315,9 @@ public final class YarnTwillRunnerService extends AbstractIdleService implements
protected void startUp() throws Exception {
yarnAppClient.startAndWait();
zkClientService.startAndWait();
+ if (bundler instanceof Service) {
+ ((Service) bundler).startAndWait();
+ }
// Create the root node, so that the namespace root would get created if it is missing
// If the exception is caused by node exists, then it's ok. Otherwise propagate the exception.
@@ -334,6 +354,9 @@ public final class YarnTwillRunnerService extends AbstractIdleService implements
}
}
watchCancellable.cancel();
+ if (bundler instanceof Service) {
+ ((Service) bundler).stopAndWait();
+ }
zkClientService.stopAndWait();
yarnAppClient.stopAndWait();
}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/c13e05ba/twill-yarn/src/test/java/org/apache/twill/yarn/YarnTestSuite.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/YarnTestSuite.java b/twill-yarn/src/test/java/org/apache/twill/yarn/YarnTestSuite.java
index 8427041..59884c9 100644
--- a/twill-yarn/src/test/java/org/apache/twill/yarn/YarnTestSuite.java
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/YarnTestSuite.java
@@ -17,9 +17,15 @@
*/
package org.apache.twill.yarn;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Suite;
+import java.io.IOException;
+
/**
* Test suite for all tests with mini yarn cluster.
*/
@@ -42,4 +48,19 @@ import org.junit.runners.Suite;
})
public final class YarnTestSuite {
+ @ClassRule
+ public static TemporaryFolder tmpFolder = new TemporaryFolder();
+
+ @BeforeClass
+ public static void init() throws IOException {
+ YarnTestUtils.init(tmpFolder);
+ YarnTestUtils.runInit = false;
+ YarnTestUtils.runFinish = false;
+ }
+
+ @AfterClass
+ public static void finish() {
+ YarnTestUtils.runFinish = true;
+ YarnTestUtils.finish();
+ }
}