You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@twill.apache.org by ch...@apache.org on 2013/12/12 23:00:00 UTC
[18/28] Making maven site works.
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-core/src/main/java/org/apache/twill/internal/ZKServiceDecorator.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/ZKServiceDecorator.java b/twill-core/src/main/java/org/apache/twill/internal/ZKServiceDecorator.java
new file mode 100644
index 0000000..7313d33
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/ZKServiceDecorator.java
@@ -0,0 +1,482 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal;
+
+import org.apache.twill.api.RunId;
+import org.apache.twill.api.ServiceController;
+import org.apache.twill.common.ServiceListenerAdapter;
+import org.apache.twill.common.Threads;
+import org.apache.twill.internal.json.StackTraceElementCodec;
+import org.apache.twill.internal.json.StateNodeCodec;
+import org.apache.twill.internal.state.Message;
+import org.apache.twill.internal.state.MessageCallback;
+import org.apache.twill.internal.state.MessageCodec;
+import org.apache.twill.internal.state.StateNode;
+import org.apache.twill.internal.state.SystemMessages;
+import org.apache.twill.zookeeper.NodeChildren;
+import org.apache.twill.zookeeper.NodeData;
+import org.apache.twill.zookeeper.OperationFuture;
+import org.apache.twill.zookeeper.ZKClient;
+import org.apache.twill.zookeeper.ZKOperations;
+import com.google.common.base.Charsets;
+import com.google.common.base.Supplier;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.AbstractService;
+import com.google.common.util.concurrent.AsyncFunction;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.Service;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * A {@link Service} decorator that wrap another {@link Service} with the service states reflected
+ * to ZooKeeper.
+ */
+public final class ZKServiceDecorator extends AbstractService {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ZKServiceDecorator.class);
+
+ private final ZKClient zkClient;
+ private final RunId id;
+ private final Supplier<? extends JsonElement> liveNodeData;
+ private final Service decoratedService;
+ private final MessageCallbackCaller messageCallback;
+ private ExecutorService callbackExecutor;
+
+
+ public ZKServiceDecorator(ZKClient zkClient, RunId id, Supplier<? extends JsonElement> liveNodeData,
+ Service decoratedService) {
+ this(zkClient, id, liveNodeData, decoratedService, null);
+ }
+
+ /**
+ * Creates a ZKServiceDecorator.
+ * @param zkClient ZooKeeper client
+ * @param id The run id of the service
+ * @param liveNodeData A supplier for providing information writing to live node.
+ * @param decoratedService The Service for monitoring state changes
+ * @param finalizer An optional Runnable to run when this decorator terminated.
+ */
+ public ZKServiceDecorator(ZKClient zkClient, RunId id, Supplier <? extends JsonElement> liveNodeData,
+ Service decoratedService, @Nullable Runnable finalizer) {
+ this.zkClient = zkClient;
+ this.id = id;
+ this.liveNodeData = liveNodeData;
+ this.decoratedService = decoratedService;
+ if (decoratedService instanceof MessageCallback) {
+ this.messageCallback = new MessageCallbackCaller((MessageCallback) decoratedService, zkClient);
+ } else {
+ this.messageCallback = new MessageCallbackCaller(zkClient);
+ }
+ if (finalizer != null) {
+ addFinalizer(finalizer);
+ }
+ }
+
+ /**
+ * Deletes the given ZK path recursively and create the path again.
+ */
+ private ListenableFuture<String> deleteAndCreate(final String path, final byte[] data, final CreateMode mode) {
+ return Futures.transform(ZKOperations.ignoreError(ZKOperations.recursiveDelete(zkClient, path),
+ KeeperException.NoNodeException.class, null),
+ new AsyncFunction<String, String>() {
+ @Override
+ public ListenableFuture<String> apply(String input) throws Exception {
+ return zkClient.create(path, data, mode);
+ }
+ }, Threads.SAME_THREAD_EXECUTOR);
+ }
+
+ @Override
+ protected void doStart() {
+ callbackExecutor = Executors.newSingleThreadExecutor(Threads.createDaemonThreadFactory("message-callback"));
+ Futures.addCallback(createLiveNode(), new FutureCallback<String>() {
+ @Override
+ public void onSuccess(String result) {
+ // Create nodes for states and messaging
+ StateNode stateNode = new StateNode(ServiceController.State.STARTING);
+
+ final ListenableFuture<List<String>> createFuture = Futures.allAsList(
+ deleteAndCreate(getZKPath("messages"), null, CreateMode.PERSISTENT),
+ deleteAndCreate(getZKPath("state"), encodeStateNode(stateNode), CreateMode.PERSISTENT)
+ );
+
+ createFuture.addListener(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ createFuture.get();
+ // Starts the decorated service
+ decoratedService.addListener(createListener(), Threads.SAME_THREAD_EXECUTOR);
+ decoratedService.start();
+ } catch (Exception e) {
+ notifyFailed(e);
+ }
+ }
+ }, Threads.SAME_THREAD_EXECUTOR);
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ notifyFailed(t);
+ }
+ });
+ }
+
+ @Override
+ protected void doStop() {
+ // Stops the decorated service
+ decoratedService.stop();
+ callbackExecutor.shutdownNow();
+ }
+
+ private void addFinalizer(final Runnable finalizer) {
+ addListener(new ServiceListenerAdapter() {
+ @Override
+ public void terminated(State from) {
+ try {
+ finalizer.run();
+ } catch (Throwable t) {
+ LOG.warn("Exception when running finalizer.", t);
+ }
+ }
+
+ @Override
+ public void failed(State from, Throwable failure) {
+ try {
+ finalizer.run();
+ } catch (Throwable t) {
+ LOG.warn("Exception when running finalizer.", t);
+ }
+ }
+ }, Threads.SAME_THREAD_EXECUTOR);
+ }
+
+ private OperationFuture<String> createLiveNode() {
+ String liveNode = getLiveNodePath();
+ LOG.info("Create live node {}{}", zkClient.getConnectString(), liveNode);
+
+ JsonObject content = new JsonObject();
+ content.add("data", liveNodeData.get());
+ return ZKOperations.ignoreError(zkClient.create(liveNode, encodeJson(content), CreateMode.EPHEMERAL),
+ KeeperException.NodeExistsException.class, liveNode);
+ }
+
+ private OperationFuture<String> removeLiveNode() {
+ String liveNode = getLiveNodePath();
+ LOG.info("Remove live node {}{}", zkClient.getConnectString(), liveNode);
+ return ZKOperations.ignoreError(zkClient.delete(liveNode), KeeperException.NoNodeException.class, liveNode);
+ }
+
+ private OperationFuture<String> removeServiceNode() {
+ String serviceNode = String.format("/%s", id.getId());
+ LOG.info("Remove service node {}{}", zkClient.getConnectString(), serviceNode);
+ return ZKOperations.recursiveDelete(zkClient, serviceNode);
+ }
+
+ private void watchMessages() {
+ final String messagesPath = getZKPath("messages");
+ Futures.addCallback(zkClient.getChildren(messagesPath, new Watcher() {
+ @Override
+ public void process(WatchedEvent event) {
+ // TODO: Do we need to deal with other type of events?
+ if (event.getType() == Event.EventType.NodeChildrenChanged && decoratedService.isRunning()) {
+ watchMessages();
+ }
+ }
+ }), new FutureCallback<NodeChildren>() {
+ @Override
+ public void onSuccess(NodeChildren result) {
+ // Sort by the name, which is the messageId. Assumption is that message ids is ordered by time.
+ List<String> messages = Lists.newArrayList(result.getChildren());
+ Collections.sort(messages);
+ for (String messageId : messages) {
+ processMessage(messagesPath + "/" + messageId, messageId);
+ }
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ // TODO: what could be done besides just logging?
+ LOG.error("Failed to watch messages.", t);
+ }
+ });
+ }
+
+ private void processMessage(final String path, final String messageId) {
+ Futures.addCallback(zkClient.getData(path), new FutureCallback<NodeData>() {
+ @Override
+ public void onSuccess(NodeData result) {
+ Message message = MessageCodec.decode(result.getData());
+ if (message == null) {
+ LOG.error("Failed to decode message for " + messageId + " in " + path);
+ listenFailure(zkClient.delete(path, result.getStat().getVersion()));
+ return;
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Message received from " + path + ": " + new String(MessageCodec.encode(message), Charsets.UTF_8));
+ }
+ if (handleStopMessage(message, getDeleteSupplier(path, result.getStat().getVersion()))) {
+ return;
+ }
+ messageCallback.onReceived(callbackExecutor, path, result.getStat().getVersion(), messageId, message);
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ LOG.error("Failed to fetch message content.", t);
+ }
+ });
+ }
+
+ private <V> boolean handleStopMessage(Message message, final Supplier<OperationFuture<V>> postHandleSupplier) {
+ if (message.getType() == Message.Type.SYSTEM && SystemMessages.STOP_COMMAND.equals(message.getCommand())) {
+ callbackExecutor.execute(new Runnable() {
+ @Override
+ public void run() {
+ decoratedService.stop().addListener(new Runnable() {
+
+ @Override
+ public void run() {
+ stopServiceOnComplete(postHandleSupplier.get(), ZKServiceDecorator.this);
+ }
+ }, MoreExecutors.sameThreadExecutor());
+ }
+ });
+ return true;
+ }
+ return false;
+ }
+
+
+ private Supplier<OperationFuture<String>> getDeleteSupplier(final String path, final int version) {
+ return new Supplier<OperationFuture<String>>() {
+ @Override
+ public OperationFuture<String> get() {
+ return zkClient.delete(path, version);
+ }
+ };
+ }
+
+ private Listener createListener() {
+ return new DecoratedServiceListener();
+ }
+
+ private <V> byte[] encode(V data, Class<? extends V> clz) {
+ return new GsonBuilder().registerTypeAdapter(StateNode.class, new StateNodeCodec())
+ .registerTypeAdapter(StackTraceElement.class, new StackTraceElementCodec())
+ .create()
+ .toJson(data, clz).getBytes(Charsets.UTF_8);
+ }
+
+ private byte[] encodeStateNode(StateNode stateNode) {
+ return encode(stateNode, StateNode.class);
+ }
+
+ private <V extends JsonElement> byte[] encodeJson(V json) {
+ return new Gson().toJson(json).getBytes(Charsets.UTF_8);
+ }
+
+ private String getZKPath(String path) {
+ return String.format("/%s/%s", id, path);
+ }
+
+ private String getLiveNodePath() {
+ return "/instances/" + id;
+ }
+
+ private static <V> OperationFuture<V> listenFailure(final OperationFuture<V> operationFuture) {
+ operationFuture.addListener(new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ if (!operationFuture.isCancelled()) {
+ operationFuture.get();
+ }
+ } catch (Exception e) {
+ // TODO: what could be done besides just logging?
+ LOG.error("Operation execution failed for " + operationFuture.getRequestPath(), e);
+ }
+ }
+ }, Threads.SAME_THREAD_EXECUTOR);
+ return operationFuture;
+ }
+
+ private static final class MessageCallbackCaller {
+ private final MessageCallback callback;
+ private final ZKClient zkClient;
+
+ private MessageCallbackCaller(ZKClient zkClient) {
+ this(null, zkClient);
+ }
+
+ private MessageCallbackCaller(MessageCallback callback, ZKClient zkClient) {
+ this.callback = callback;
+ this.zkClient = zkClient;
+ }
+
+ public void onReceived(Executor executor, final String path,
+ final int version, final String id, final Message message) {
+ if (callback == null) {
+ // Simply delete the message
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Ignoring incoming message from " + path + ": " + message);
+ }
+ listenFailure(zkClient.delete(path, version));
+ return;
+ }
+
+ executor.execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ // Message process is synchronous for now. Making it async needs more thoughts about race conditions.
+ // The executor is the callbackExecutor which is a single thread executor.
+ callback.onReceived(id, message).get();
+ } catch (Throwable t) {
+ LOG.error("Exception when processing message: {}, {}, {}", id, message, path, t);
+ } finally {
+ listenFailure(zkClient.delete(path, version));
+ }
+ }
+ });
+ }
+ }
+
+ private final class DecoratedServiceListener implements Listener {
+ private volatile boolean zkFailure = false;
+
+ @Override
+ public void starting() {
+ LOG.info("Starting: " + id);
+ saveState(ServiceController.State.STARTING);
+ }
+
+ @Override
+ public void running() {
+ LOG.info("Running: " + id);
+ notifyStarted();
+ watchMessages();
+ saveState(ServiceController.State.RUNNING);
+ }
+
+ @Override
+ public void stopping(State from) {
+ LOG.info("Stopping: " + id);
+ saveState(ServiceController.State.STOPPING);
+ }
+
+ @Override
+ public void terminated(State from) {
+ LOG.info("Terminated: " + from + " " + id);
+ if (zkFailure) {
+ return;
+ }
+
+ ImmutableList<OperationFuture<String>> futures = ImmutableList.of(removeLiveNode(), removeServiceNode());
+ final ListenableFuture<List<String>> future = Futures.allAsList(futures);
+ Futures.successfulAsList(futures).addListener(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ future.get();
+ LOG.info("Service and state node removed");
+ notifyStopped();
+ } catch (Exception e) {
+ LOG.warn("Failed to remove ZK nodes.", e);
+ notifyFailed(e);
+ }
+ }
+ }, Threads.SAME_THREAD_EXECUTOR);
+ }
+
+ @Override
+ public void failed(State from, final Throwable failure) {
+ LOG.info("Failed: {} {}.", from, id, failure);
+ if (zkFailure) {
+ return;
+ }
+
+ ImmutableList<OperationFuture<String>> futures = ImmutableList.of(removeLiveNode(), removeServiceNode());
+ Futures.successfulAsList(futures).addListener(new Runnable() {
+ @Override
+ public void run() {
+ LOG.info("Service and state node removed");
+ notifyFailed(failure);
+ }
+ }, Threads.SAME_THREAD_EXECUTOR);
+ }
+
+ private void saveState(ServiceController.State state) {
+ if (zkFailure) {
+ return;
+ }
+ StateNode stateNode = new StateNode(state);
+ stopOnFailure(zkClient.setData(getZKPath("state"), encodeStateNode(stateNode)));
+ }
+
+ private <V> void stopOnFailure(final OperationFuture<V> future) {
+ future.addListener(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ future.get();
+ } catch (final Exception e) {
+ LOG.error("ZK operation failed", e);
+ zkFailure = true;
+ decoratedService.stop().addListener(new Runnable() {
+ @Override
+ public void run() {
+ notifyFailed(e);
+ }
+ }, Threads.SAME_THREAD_EXECUTOR);
+ }
+ }
+ }, Threads.SAME_THREAD_EXECUTOR);
+ }
+ }
+
+ private <V> ListenableFuture<State> stopServiceOnComplete(ListenableFuture <V> future, final Service service) {
+ return Futures.transform(future, new AsyncFunction<V, State>() {
+ @Override
+ public ListenableFuture<State> apply(V input) throws Exception {
+ return service.stop();
+ }
+ });
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-core/src/main/java/org/apache/twill/internal/json/ArgumentsCodec.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/json/ArgumentsCodec.java b/twill-core/src/main/java/org/apache/twill/internal/json/ArgumentsCodec.java
new file mode 100644
index 0000000..07d4c1d
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/json/ArgumentsCodec.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.json;
+
+import org.apache.twill.internal.Arguments;
+import com.google.common.collect.ImmutableMultimap;
+import com.google.common.io.InputSupplier;
+import com.google.common.io.OutputSupplier;
+import com.google.common.reflect.TypeToken;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonDeserializationContext;
+import com.google.gson.JsonDeserializer;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParseException;
+import com.google.gson.JsonSerializationContext;
+import com.google.gson.JsonSerializer;
+
+import java.io.IOException;
+import java.io.Reader;
+import java.io.Writer;
+import java.lang.reflect.Type;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+/**
+ *
+ */
+public final class ArgumentsCodec implements JsonSerializer<Arguments>, JsonDeserializer<Arguments> {
+
+ private static final Gson GSON = new GsonBuilder().registerTypeAdapter(Arguments.class, new ArgumentsCodec())
+ .create();
+
+ public static void encode(Arguments arguments, OutputSupplier<? extends Writer> writerSupplier) throws IOException {
+ Writer writer = writerSupplier.getOutput();
+ try {
+ GSON.toJson(arguments, writer);
+ } finally {
+ writer.close();
+ }
+ }
+
+
+ public static Arguments decode(InputSupplier<? extends Reader> readerSupplier) throws IOException {
+ Reader reader = readerSupplier.getInput();
+ try {
+ return GSON.fromJson(reader, Arguments.class);
+ } finally {
+ reader.close();
+ }
+ }
+
+ @Override
+ public JsonElement serialize(Arguments src, Type typeOfSrc,
+ JsonSerializationContext context) {
+ JsonObject json = new JsonObject();
+ json.add("arguments", context.serialize(src.getArguments()));
+ json.add("runnableArguments", context.serialize(src.getRunnableArguments().asMap()));
+
+ return json;
+ }
+
+ @Override
+ public Arguments deserialize(JsonElement json, Type typeOfT,
+ JsonDeserializationContext context) throws JsonParseException {
+ JsonObject jsonObj = json.getAsJsonObject();
+ List<String> arguments = context.deserialize(jsonObj.get("arguments"), new TypeToken<List<String>>() {}.getType());
+ Map<String, Collection<String>> args = context.deserialize(jsonObj.get("runnableArguments"),
+ new TypeToken<Map<String, Collection<String>>>(){
+ }.getType());
+
+ ImmutableMultimap.Builder<String, String> builder = ImmutableMultimap.builder();
+ for (Map.Entry<String, Collection<String>> entry : args.entrySet()) {
+ builder.putAll(entry.getKey(), entry.getValue());
+ }
+ return new Arguments(arguments, builder.build());
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-core/src/main/java/org/apache/twill/internal/json/JsonUtils.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/json/JsonUtils.java b/twill-core/src/main/java/org/apache/twill/internal/json/JsonUtils.java
new file mode 100644
index 0000000..9556ad8
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/json/JsonUtils.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.json;
+
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+
+/**
+ * Collections of helper functions for json codec.
+ */
+public final class JsonUtils {
+
+ private JsonUtils() {
+ }
+
+ /**
+ * Returns a String representation of the given property.
+ */
+ public static String getAsString(JsonObject json, String property) {
+ JsonElement jsonElement = json.get(property);
+ if (jsonElement.isJsonNull()) {
+ return null;
+ }
+ if (jsonElement.isJsonPrimitive()) {
+ return jsonElement.getAsString();
+ }
+ return jsonElement.toString();
+ }
+
+ /**
+ * Returns a long representation of the given property.
+ */
+ public static long getAsLong(JsonObject json, String property, long defaultValue) {
+ try {
+ return json.get(property).getAsLong();
+ } catch (Exception e) {
+ return defaultValue;
+ }
+ }
+
+ /**
+ * Returns a long representation of the given property.
+ */
+ public static int getAsInt(JsonObject json, String property, int defaultValue) {
+ try {
+ return json.get(property).getAsInt();
+ } catch (Exception e) {
+ return defaultValue;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-core/src/main/java/org/apache/twill/internal/json/LocalFileCodec.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/json/LocalFileCodec.java b/twill-core/src/main/java/org/apache/twill/internal/json/LocalFileCodec.java
new file mode 100644
index 0000000..680a36c
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/json/LocalFileCodec.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.json;
+
+import org.apache.twill.api.LocalFile;
+import org.apache.twill.internal.DefaultLocalFile;
+import com.google.gson.JsonDeserializationContext;
+import com.google.gson.JsonDeserializer;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParseException;
+import com.google.gson.JsonSerializationContext;
+import com.google.gson.JsonSerializer;
+
+import java.lang.reflect.Type;
+import java.net.URI;
+
+/**
+ *
+ */
+public final class LocalFileCodec implements JsonSerializer<LocalFile>, JsonDeserializer<LocalFile> {
+
+ @Override
+ public JsonElement serialize(LocalFile src, Type typeOfSrc, JsonSerializationContext context) {
+ JsonObject json = new JsonObject();
+
+ json.addProperty("name", src.getName());
+ json.addProperty("uri", src.getURI().toASCIIString());
+ json.addProperty("lastModified", src.getLastModified());
+ json.addProperty("size", src.getSize());
+ json.addProperty("archive", src.isArchive());
+ json.addProperty("pattern", src.getPattern());
+
+ return json;
+ }
+
+ @Override
+ public LocalFile deserialize(JsonElement json, Type typeOfT,
+ JsonDeserializationContext context) throws JsonParseException {
+ JsonObject jsonObj = json.getAsJsonObject();
+
+ String name = jsonObj.get("name").getAsString();
+ URI uri = URI.create(jsonObj.get("uri").getAsString());
+ long lastModified = jsonObj.get("lastModified").getAsLong();
+ long size = jsonObj.get("size").getAsLong();
+ boolean archive = jsonObj.get("archive").getAsBoolean();
+ JsonElement pattern = jsonObj.get("pattern");
+
+ return new DefaultLocalFile(name, uri, lastModified, size,
+ archive, (pattern == null || pattern.isJsonNull()) ? null : pattern.getAsString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-core/src/main/java/org/apache/twill/internal/json/ResourceReportAdapter.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/json/ResourceReportAdapter.java b/twill-core/src/main/java/org/apache/twill/internal/json/ResourceReportAdapter.java
new file mode 100644
index 0000000..e473fe7
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/json/ResourceReportAdapter.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.json;
+
+import org.apache.twill.api.ResourceReport;
+import org.apache.twill.api.TwillRunResources;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+
+import java.io.Reader;
+import java.io.Writer;
+
+/**
+ * This class provides utility to help encode/decode {@link ResourceReport} to/from Json.
+ */
+public final class ResourceReportAdapter {
+
+ private final Gson gson;
+
+ public static ResourceReportAdapter create() {
+ return new ResourceReportAdapter();
+ }
+
+ private ResourceReportAdapter() {
+ gson = new GsonBuilder()
+ .serializeNulls()
+ .registerTypeAdapter(TwillRunResources.class, new TwillRunResourcesCodec())
+ .registerTypeAdapter(ResourceReport.class, new ResourceReportCodec())
+ .create();
+ }
+
+ public String toJson(ResourceReport report) {
+ return gson.toJson(report, ResourceReport.class);
+ }
+
+ public void toJson(ResourceReport report, Writer writer) {
+ gson.toJson(report, ResourceReport.class, writer);
+ }
+
+ public ResourceReport fromJson(String json) {
+ return gson.fromJson(json, ResourceReport.class);
+ }
+
+ public ResourceReport fromJson(Reader reader) {
+ return gson.fromJson(reader, ResourceReport.class);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-core/src/main/java/org/apache/twill/internal/json/ResourceReportCodec.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/json/ResourceReportCodec.java b/twill-core/src/main/java/org/apache/twill/internal/json/ResourceReportCodec.java
new file mode 100644
index 0000000..884d889
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/json/ResourceReportCodec.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.json;
+
+import org.apache.twill.api.ResourceReport;
+import org.apache.twill.api.TwillRunResources;
+import org.apache.twill.internal.DefaultResourceReport;
+import com.google.gson.JsonDeserializationContext;
+import com.google.gson.JsonDeserializer;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParseException;
+import com.google.gson.JsonSerializationContext;
+import com.google.gson.JsonSerializer;
+import com.google.gson.reflect.TypeToken;
+
+import java.lang.reflect.Type;
+import java.util.Collection;
+import java.util.Map;
+
+/**
+ * Codec for serializing and deserializing a {@link ResourceReport} object using json.
+ */
+public final class ResourceReportCodec implements JsonSerializer<ResourceReport>,
+ JsonDeserializer<ResourceReport> {
+
+ @Override
+ public JsonElement serialize(ResourceReport src, Type typeOfSrc, JsonSerializationContext context) {
+ JsonObject json = new JsonObject();
+
+ json.addProperty("appMasterId", src.getApplicationId());
+ json.add("appMasterResources", context.serialize(
+ src.getAppMasterResources(), new TypeToken<TwillRunResources>(){}.getType()));
+ json.add("runnableResources", context.serialize(
+ src.getResources(), new TypeToken<Map<String, Collection<TwillRunResources>>>(){}.getType()));
+
+ return json;
+ }
+
+ @Override
+ public ResourceReport deserialize(JsonElement json, Type typeOfT,
+ JsonDeserializationContext context) throws JsonParseException {
+ JsonObject jsonObj = json.getAsJsonObject();
+ String appMasterId = jsonObj.get("appMasterId").getAsString();
+ TwillRunResources masterResources = context.deserialize(
+ jsonObj.get("appMasterResources"), TwillRunResources.class);
+ Map<String, Collection<TwillRunResources>> resources = context.deserialize(
+ jsonObj.get("runnableResources"), new TypeToken<Map<String, Collection<TwillRunResources>>>(){}.getType());
+
+ return new DefaultResourceReport(appMasterId, masterResources, resources);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-core/src/main/java/org/apache/twill/internal/json/ResourceSpecificationCodec.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/json/ResourceSpecificationCodec.java b/twill-core/src/main/java/org/apache/twill/internal/json/ResourceSpecificationCodec.java
new file mode 100644
index 0000000..bea73c4
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/json/ResourceSpecificationCodec.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.json;
+
+import org.apache.twill.api.ResourceSpecification;
+import org.apache.twill.internal.DefaultResourceSpecification;
+import com.google.gson.JsonDeserializationContext;
+import com.google.gson.JsonDeserializer;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParseException;
+import com.google.gson.JsonSerializationContext;
+import com.google.gson.JsonSerializer;
+
+import java.lang.reflect.Type;
+
+/**
+ *
+ */
+final class ResourceSpecificationCodec implements JsonSerializer<ResourceSpecification>,
+ JsonDeserializer<ResourceSpecification> {
+
+ @Override
+ public JsonElement serialize(ResourceSpecification src, Type typeOfSrc, JsonSerializationContext context) {
+ JsonObject json = new JsonObject();
+
+ json.addProperty("cores", src.getVirtualCores());
+ json.addProperty("memorySize", src.getMemorySize());
+ json.addProperty("instances", src.getInstances());
+ json.addProperty("uplink", src.getUplink());
+ json.addProperty("downlink", src.getDownlink());
+
+ return json;
+ }
+
+ @Override
+ public ResourceSpecification deserialize(JsonElement json, Type typeOfT,
+ JsonDeserializationContext context) throws JsonParseException {
+ JsonObject jsonObj = json.getAsJsonObject();
+ return new DefaultResourceSpecification(jsonObj.get("cores").getAsInt(),
+ jsonObj.get("memorySize").getAsInt(),
+ jsonObj.get("instances").getAsInt(),
+ jsonObj.get("uplink").getAsInt(),
+ jsonObj.get("downlink").getAsInt());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-core/src/main/java/org/apache/twill/internal/json/RuntimeSpecificationCodec.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/json/RuntimeSpecificationCodec.java b/twill-core/src/main/java/org/apache/twill/internal/json/RuntimeSpecificationCodec.java
new file mode 100644
index 0000000..867f4a8
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/json/RuntimeSpecificationCodec.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.json;
+
+import org.apache.twill.api.LocalFile;
+import org.apache.twill.api.ResourceSpecification;
+import org.apache.twill.api.RuntimeSpecification;
+import org.apache.twill.api.TwillRunnableSpecification;
+import org.apache.twill.internal.DefaultRuntimeSpecification;
+import com.google.common.reflect.TypeToken;
+import com.google.gson.JsonDeserializationContext;
+import com.google.gson.JsonDeserializer;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParseException;
+import com.google.gson.JsonSerializationContext;
+import com.google.gson.JsonSerializer;
+
+import java.lang.reflect.Type;
+import java.util.Collection;
+
+/**
+ *
+ */
+final class RuntimeSpecificationCodec implements JsonSerializer<RuntimeSpecification>,
+ JsonDeserializer<RuntimeSpecification> {
+
+ @Override
+ public JsonElement serialize(RuntimeSpecification src, Type typeOfSrc, JsonSerializationContext context) {
+ JsonObject json = new JsonObject();
+ json.addProperty("name", src.getName());
+ json.add("runnable", context.serialize(src.getRunnableSpecification(), TwillRunnableSpecification.class));
+ json.add("resources", context.serialize(src.getResourceSpecification(), ResourceSpecification.class));
+ json.add("files", context.serialize(src.getLocalFiles(), new TypeToken<Collection<LocalFile>>(){}.getType()));
+
+ return json;
+ }
+
+ @Override
+ public RuntimeSpecification deserialize(JsonElement json, Type typeOfT,
+ JsonDeserializationContext context) throws JsonParseException {
+ JsonObject jsonObj = json.getAsJsonObject();
+
+ String name = jsonObj.get("name").getAsString();
+ TwillRunnableSpecification runnable = context.deserialize(jsonObj.get("runnable"),
+ TwillRunnableSpecification.class);
+ ResourceSpecification resources = context.deserialize(jsonObj.get("resources"),
+ ResourceSpecification.class);
+ Collection<LocalFile> files = context.deserialize(jsonObj.get("files"),
+ new TypeToken<Collection<LocalFile>>(){}.getType());
+
+ return new DefaultRuntimeSpecification(name, runnable, resources, files);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-core/src/main/java/org/apache/twill/internal/json/StackTraceElementCodec.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/json/StackTraceElementCodec.java b/twill-core/src/main/java/org/apache/twill/internal/json/StackTraceElementCodec.java
new file mode 100644
index 0000000..9a57b46
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/json/StackTraceElementCodec.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.json;
+
+import com.google.gson.JsonDeserializationContext;
+import com.google.gson.JsonDeserializer;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParseException;
+import com.google.gson.JsonSerializationContext;
+import com.google.gson.JsonSerializer;
+
+import java.lang.reflect.Type;
+
+/**
+ *
+ */
+public final class StackTraceElementCodec implements JsonSerializer<StackTraceElement>,
+ JsonDeserializer<StackTraceElement> {
+
+ @Override
+ public StackTraceElement deserialize(JsonElement json, Type typeOfT,
+ JsonDeserializationContext context) throws JsonParseException {
+ JsonObject jsonObj = json.getAsJsonObject();
+ return new StackTraceElement(JsonUtils.getAsString(jsonObj, "className"),
+ JsonUtils.getAsString(jsonObj, "method"),
+ JsonUtils.getAsString(jsonObj, "file"),
+ JsonUtils.getAsInt(jsonObj, "line", -1));
+ }
+
+ @Override
+ public JsonElement serialize(StackTraceElement src, Type typeOfSrc, JsonSerializationContext context) {
+ JsonObject jsonObj = new JsonObject();
+ jsonObj.addProperty("className", src.getClassName());
+ jsonObj.addProperty("method", src.getMethodName());
+ jsonObj.addProperty("file", src.getFileName());
+ jsonObj.addProperty("line", src.getLineNumber());
+
+ return jsonObj;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-core/src/main/java/org/apache/twill/internal/json/StateNodeCodec.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/json/StateNodeCodec.java b/twill-core/src/main/java/org/apache/twill/internal/json/StateNodeCodec.java
new file mode 100644
index 0000000..c1e9d1c
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/json/StateNodeCodec.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.json;
+
+import org.apache.twill.api.ServiceController;
+import org.apache.twill.internal.state.StateNode;
+import com.google.gson.JsonDeserializationContext;
+import com.google.gson.JsonDeserializer;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParseException;
+import com.google.gson.JsonSerializationContext;
+import com.google.gson.JsonSerializer;
+
+import java.lang.reflect.Type;
+
+/**
+ *
+ */
+public final class StateNodeCodec implements JsonSerializer<StateNode>, JsonDeserializer<StateNode> {
+
+ @Override
+ public StateNode deserialize(JsonElement json, Type typeOfT,
+ JsonDeserializationContext context) throws JsonParseException {
+ JsonObject jsonObj = json.getAsJsonObject();
+ ServiceController.State state = ServiceController.State.valueOf(jsonObj.get("state").getAsString());
+ String errorMessage = jsonObj.has("errorMessage") ? jsonObj.get("errorMessage").getAsString() : null;
+
+ return new StateNode(state, errorMessage,
+ context.<StackTraceElement[]>deserialize(jsonObj.get("stackTraces"), StackTraceElement[].class));
+ }
+
+ @Override
+ public JsonElement serialize(StateNode src, Type typeOfSrc, JsonSerializationContext context) {
+ JsonObject jsonObj = new JsonObject();
+ jsonObj.addProperty("state", src.getState().name());
+ if (src.getErrorMessage() != null) {
+ jsonObj.addProperty("errorMessage", src.getErrorMessage());
+ }
+ if (src.getStackTraces() != null) {
+ jsonObj.add("stackTraces", context.serialize(src.getStackTraces(), StackTraceElement[].class));
+ }
+ return jsonObj;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-core/src/main/java/org/apache/twill/internal/json/TwillRunResourcesCodec.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/json/TwillRunResourcesCodec.java b/twill-core/src/main/java/org/apache/twill/internal/json/TwillRunResourcesCodec.java
new file mode 100644
index 0000000..8951173
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/json/TwillRunResourcesCodec.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.json;
+
+import org.apache.twill.api.TwillRunResources;
+import org.apache.twill.internal.DefaultTwillRunResources;
+import com.google.gson.JsonDeserializationContext;
+import com.google.gson.JsonDeserializer;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParseException;
+import com.google.gson.JsonSerializationContext;
+import com.google.gson.JsonSerializer;
+
+import java.lang.reflect.Type;
+
+/**
+ * Codec for serializing and deserializing a {@link org.apache.twill.api.TwillRunResources} object using json.
+ */
+public final class TwillRunResourcesCodec implements JsonSerializer<TwillRunResources>,
+ JsonDeserializer<TwillRunResources> {
+
+ @Override
+ public JsonElement serialize(TwillRunResources src, Type typeOfSrc, JsonSerializationContext context) {
+ JsonObject json = new JsonObject();
+
+ json.addProperty("containerId", src.getContainerId());
+ json.addProperty("instanceId", src.getInstanceId());
+ json.addProperty("host", src.getHost());
+ json.addProperty("memoryMB", src.getMemoryMB());
+ json.addProperty("virtualCores", src.getVirtualCores());
+
+ return json;
+ }
+
+ @Override
+ public TwillRunResources deserialize(JsonElement json, Type typeOfT,
+ JsonDeserializationContext context) throws JsonParseException {
+ JsonObject jsonObj = json.getAsJsonObject();
+ return new DefaultTwillRunResources(jsonObj.get("instanceId").getAsInt(),
+ jsonObj.get("containerId").getAsString(),
+ jsonObj.get("virtualCores").getAsInt(),
+ jsonObj.get("memoryMB").getAsInt(),
+ jsonObj.get("host").getAsString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-core/src/main/java/org/apache/twill/internal/json/TwillRunnableSpecificationCodec.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/json/TwillRunnableSpecificationCodec.java b/twill-core/src/main/java/org/apache/twill/internal/json/TwillRunnableSpecificationCodec.java
new file mode 100644
index 0000000..f37c1e8
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/json/TwillRunnableSpecificationCodec.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.json;
+
+import org.apache.twill.api.TwillRunnableSpecification;
+import org.apache.twill.internal.DefaultTwillRunnableSpecification;
+import com.google.common.reflect.TypeToken;
+import com.google.gson.JsonDeserializationContext;
+import com.google.gson.JsonDeserializer;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParseException;
+import com.google.gson.JsonSerializationContext;
+import com.google.gson.JsonSerializer;
+
+import java.lang.reflect.Type;
+import java.util.Map;
+
+/**
+ *
+ */
+final class TwillRunnableSpecificationCodec implements JsonSerializer<TwillRunnableSpecification>,
+ JsonDeserializer<TwillRunnableSpecification> {
+
+ @Override
+ public JsonElement serialize(TwillRunnableSpecification src, Type typeOfSrc, JsonSerializationContext context) {
+ JsonObject json = new JsonObject();
+
+ json.addProperty("classname", src.getClassName());
+ json.addProperty("name", src.getName());
+ json.add("arguments", context.serialize(src.getConfigs(), new TypeToken<Map<String, String>>(){}.getType()));
+
+ return json;
+ }
+
+ @Override
+ public TwillRunnableSpecification deserialize(JsonElement json, Type typeOfT,
+ JsonDeserializationContext context) throws JsonParseException {
+ JsonObject jsonObj = json.getAsJsonObject();
+
+ String className = jsonObj.get("classname").getAsString();
+ String name = jsonObj.get("name").getAsString();
+ Map<String, String> arguments = context.deserialize(jsonObj.get("arguments"),
+ new TypeToken<Map<String, String>>(){}.getType());
+
+ return new DefaultTwillRunnableSpecification(className, name, arguments);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-core/src/main/java/org/apache/twill/internal/json/TwillSpecificationAdapter.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/json/TwillSpecificationAdapter.java b/twill-core/src/main/java/org/apache/twill/internal/json/TwillSpecificationAdapter.java
new file mode 100644
index 0000000..67c15a2
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/json/TwillSpecificationAdapter.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.json;
+
+import com.google.common.base.Charsets;
+import com.google.common.collect.Maps;
+import com.google.common.io.Files;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.TypeAdapter;
+import com.google.gson.TypeAdapterFactory;
+import com.google.gson.reflect.TypeToken;
+import com.google.gson.stream.JsonReader;
+import com.google.gson.stream.JsonToken;
+import com.google.gson.stream.JsonWriter;
+import org.apache.twill.api.EventHandlerSpecification;
+import org.apache.twill.api.LocalFile;
+import org.apache.twill.api.ResourceSpecification;
+import org.apache.twill.api.RuntimeSpecification;
+import org.apache.twill.api.TwillRunnableSpecification;
+import org.apache.twill.api.TwillSpecification;
+import org.apache.twill.internal.json.TwillSpecificationCodec.EventHandlerSpecificationCoder;
+import org.apache.twill.internal.json.TwillSpecificationCodec.TwillSpecificationOrderCoder;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Reader;
+import java.io.Writer;
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+import java.util.Map;
+
+/**
+ *
+ */
+public final class TwillSpecificationAdapter {
+
+ private final Gson gson;
+
+ public static TwillSpecificationAdapter create() {
+ return new TwillSpecificationAdapter();
+ }
+
+ private TwillSpecificationAdapter() {
+ gson = new GsonBuilder()
+ .serializeNulls()
+ .registerTypeAdapter(TwillSpecification.class, new TwillSpecificationCodec())
+ .registerTypeAdapter(TwillSpecification.Order.class, new TwillSpecificationOrderCoder())
+ .registerTypeAdapter(EventHandlerSpecification.class, new EventHandlerSpecificationCoder())
+ .registerTypeAdapter(RuntimeSpecification.class, new RuntimeSpecificationCodec())
+ .registerTypeAdapter(TwillRunnableSpecification.class, new TwillRunnableSpecificationCodec())
+ .registerTypeAdapter(ResourceSpecification.class, new ResourceSpecificationCodec())
+ .registerTypeAdapter(LocalFile.class, new LocalFileCodec())
+ .registerTypeAdapterFactory(new TwillSpecificationTypeAdapterFactory())
+ .create();
+ }
+
+ public String toJson(TwillSpecification spec) {
+ return gson.toJson(spec, TwillSpecification.class);
+ }
+
+ public void toJson(TwillSpecification spec, Writer writer) {
+ gson.toJson(spec, TwillSpecification.class, writer);
+ }
+
+ public void toJson(TwillSpecification spec, File file) throws IOException {
+ Writer writer = Files.newWriter(file, Charsets.UTF_8);
+ try {
+ toJson(spec, writer);
+ } finally {
+ writer.close();
+ }
+ }
+
+ public TwillSpecification fromJson(String json) {
+ return gson.fromJson(json, TwillSpecification.class);
+ }
+
+ public TwillSpecification fromJson(Reader reader) {
+ return gson.fromJson(reader, TwillSpecification.class);
+ }
+
+ public TwillSpecification fromJson(File file) throws IOException {
+ Reader reader = Files.newReader(file, Charsets.UTF_8);
+ try {
+ return fromJson(reader);
+ } finally {
+ reader.close();
+ }
+ }
+
+ // This is to get around gson ignoring of inner class
+ private static final class TwillSpecificationTypeAdapterFactory implements TypeAdapterFactory {
+
+ @Override
+ public <T> TypeAdapter<T> create(Gson gson, TypeToken<T> type) {
+ Class<?> rawType = type.getRawType();
+ if (!Map.class.isAssignableFrom(rawType)) {
+ return null;
+ }
+ Type[] typeArgs = ((ParameterizedType) type.getType()).getActualTypeArguments();
+ TypeToken<?> keyType = TypeToken.get(typeArgs[0]);
+ TypeToken<?> valueType = TypeToken.get(typeArgs[1]);
+ if (keyType.getRawType() != String.class) {
+ return null;
+ }
+ return (TypeAdapter<T>) mapAdapter(gson, valueType);
+ }
+
+ private <V> TypeAdapter<Map<String, V>> mapAdapter(Gson gson, TypeToken<V> valueType) {
+ final TypeAdapter<V> valueAdapter = gson.getAdapter(valueType);
+
+ return new TypeAdapter<Map<String, V>>() {
+ @Override
+ public void write(JsonWriter writer, Map<String, V> map) throws IOException {
+ if (map == null) {
+ writer.nullValue();
+ return;
+ }
+ writer.beginObject();
+ for (Map.Entry<String, V> entry : map.entrySet()) {
+ writer.name(entry.getKey());
+ valueAdapter.write(writer, entry.getValue());
+ }
+ writer.endObject();
+ }
+
+ @Override
+ public Map<String, V> read(JsonReader reader) throws IOException {
+ if (reader.peek() == JsonToken.NULL) {
+ reader.nextNull();
+ return null;
+ }
+ if (reader.peek() != JsonToken.BEGIN_OBJECT) {
+ return null;
+ }
+ Map<String, V> map = Maps.newHashMap();
+ reader.beginObject();
+ while (reader.peek() != JsonToken.END_OBJECT) {
+ map.put(reader.nextName(), valueAdapter.read(reader));
+ }
+ reader.endObject();
+ return map;
+ }
+ };
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-core/src/main/java/org/apache/twill/internal/json/TwillSpecificationCodec.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/json/TwillSpecificationCodec.java b/twill-core/src/main/java/org/apache/twill/internal/json/TwillSpecificationCodec.java
new file mode 100644
index 0000000..5d88350
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/json/TwillSpecificationCodec.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.json;
+
+import org.apache.twill.api.EventHandlerSpecification;
+import org.apache.twill.api.RuntimeSpecification;
+import org.apache.twill.api.TwillSpecification;
+import org.apache.twill.internal.DefaultEventHandlerSpecification;
+import org.apache.twill.internal.DefaultTwillSpecification;
+import com.google.common.reflect.TypeToken;
+import com.google.gson.JsonDeserializationContext;
+import com.google.gson.JsonDeserializer;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParseException;
+import com.google.gson.JsonSerializationContext;
+import com.google.gson.JsonSerializer;
+
+import java.lang.reflect.Type;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * An implementation of gson serializer/deserializer {@link org.apache.twill.api.TwillSpecification}.
+ */
+final class TwillSpecificationCodec implements JsonSerializer<TwillSpecification>,
+ JsonDeserializer<TwillSpecification> {
+
+ @Override
+ public JsonElement serialize(TwillSpecification src, Type typeOfSrc, JsonSerializationContext context) {
+ JsonObject json = new JsonObject();
+ json.addProperty("name", src.getName());
+ json.add("runnables", context.serialize(src.getRunnables(),
+ new TypeToken<Map<String, RuntimeSpecification>>(){}.getType()));
+ json.add("orders", context.serialize(src.getOrders(),
+ new TypeToken<List<TwillSpecification.Order>>(){}.getType()));
+ EventHandlerSpecification eventHandler = src.getEventHandler();
+ if (eventHandler != null) {
+ json.add("handler", context.serialize(eventHandler, EventHandlerSpecification.class));
+ }
+
+ return json;
+ }
+
+ @Override
+ public TwillSpecification deserialize(JsonElement json, Type typeOfT,
+ JsonDeserializationContext context) throws JsonParseException {
+ JsonObject jsonObj = json.getAsJsonObject();
+
+ String name = jsonObj.get("name").getAsString();
+ Map<String, RuntimeSpecification> runnables = context.deserialize(
+ jsonObj.get("runnables"), new TypeToken<Map<String, RuntimeSpecification>>(){}.getType());
+ List<TwillSpecification.Order> orders = context.deserialize(
+ jsonObj.get("orders"), new TypeToken<List<TwillSpecification.Order>>(){}.getType());
+
+ JsonElement handler = jsonObj.get("handler");
+ EventHandlerSpecification eventHandler = null;
+ if (handler != null && !handler.isJsonNull()) {
+ eventHandler = context.deserialize(handler, EventHandlerSpecification.class);
+ }
+
+ return new DefaultTwillSpecification(name, runnables, orders, eventHandler);
+ }
+
+ static final class TwillSpecificationOrderCoder implements JsonSerializer<TwillSpecification.Order>,
+ JsonDeserializer<TwillSpecification.Order> {
+
+ @Override
+ public JsonElement serialize(TwillSpecification.Order src, Type typeOfSrc, JsonSerializationContext context) {
+ JsonObject json = new JsonObject();
+ json.add("names", context.serialize(src.getNames(), new TypeToken<Set<String>>(){}.getType()));
+ json.addProperty("type", src.getType().name());
+ return json;
+ }
+
+ @Override
+ public TwillSpecification.Order deserialize(JsonElement json, Type typeOfT,
+ JsonDeserializationContext context) throws JsonParseException {
+ JsonObject jsonObj = json.getAsJsonObject();
+
+ Set<String> names = context.deserialize(jsonObj.get("names"), new TypeToken<Set<String>>(){}.getType());
+ TwillSpecification.Order.Type type = TwillSpecification.Order.Type.valueOf(jsonObj.get("type").getAsString());
+
+ return new DefaultTwillSpecification.DefaultOrder(names, type);
+ }
+ }
+
+ static final class EventHandlerSpecificationCoder implements JsonSerializer<EventHandlerSpecification>,
+ JsonDeserializer<EventHandlerSpecification> {
+
+ @Override
+ public JsonElement serialize(EventHandlerSpecification src, Type typeOfSrc, JsonSerializationContext context) {
+ JsonObject json = new JsonObject();
+ json.addProperty("classname", src.getClassName());
+ json.add("configs", context.serialize(src.getConfigs(), new TypeToken<Map<String, String>>(){}.getType()));
+ return json;
+ }
+
+ @Override
+ public EventHandlerSpecification deserialize(JsonElement json, Type typeOfT,
+ JsonDeserializationContext context) throws JsonParseException {
+ JsonObject jsonObj = json.getAsJsonObject();
+ String className = jsonObj.get("classname").getAsString();
+ Map<String, String> configs = context.deserialize(jsonObj.get("configs"),
+ new TypeToken<Map<String, String>>() {
+ }.getType());
+
+ return new DefaultEventHandlerSpecification(className, configs);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-core/src/main/java/org/apache/twill/internal/kafka/EmbeddedKafkaServer.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/EmbeddedKafkaServer.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/EmbeddedKafkaServer.java
new file mode 100644
index 0000000..14dfc70
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/EmbeddedKafkaServer.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.kafka;
+
+import com.google.common.base.Throwables;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.AbstractIdleService;
+
+import java.io.File;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ *
+ */
+public final class EmbeddedKafkaServer extends AbstractIdleService {
+
+ private static final String KAFAK_CONFIG_CLASS = "kafka.server.KafkaConfig";
+ private static final String KAFKA_SERVER_CLASS = "kafka.server.KafkaServerStartable";
+
+ private final Object server;
+
+ public EmbeddedKafkaServer(File kafkaDir, Properties properties) {
+ this(createClassLoader(kafkaDir), properties);
+ }
+
+ public EmbeddedKafkaServer(ClassLoader classLoader, Properties properties) {
+ try {
+ Class<?> configClass = classLoader.loadClass(KAFAK_CONFIG_CLASS);
+ Object config = configClass.getConstructor(Properties.class).newInstance(properties);
+
+ Class<?> serverClass = classLoader.loadClass(KAFKA_SERVER_CLASS);
+ server = serverClass.getConstructor(configClass).newInstance(config);
+ } catch (Exception e) {
+ throw Throwables.propagate(e);
+ }
+ }
+
+ @Override
+ protected void startUp() throws Exception {
+ server.getClass().getMethod("startup").invoke(server);
+ }
+
+ @Override
+ protected void shutDown() throws Exception {
+ server.getClass().getMethod("shutdown").invoke(server);
+ server.getClass().getMethod("awaitShutdown").invoke(server);
+ }
+
+ private static ClassLoader createClassLoader(File kafkaDir) {
+ ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
+ ClassLoader thisClassLoader = EmbeddedKafkaServer.class.getClassLoader();
+ ClassLoader parent = contextClassLoader != null
+ ? contextClassLoader
+ : thisClassLoader != null
+ ? thisClassLoader : ClassLoader.getSystemClassLoader();
+
+ return new URLClassLoader(findJars(kafkaDir, Lists.<URL>newArrayList()).toArray(new URL[0]), parent);
+ }
+
+ private static List<URL> findJars(File dir, List<URL> urls) {
+ try {
+ for (File file : dir.listFiles()) {
+ if (file.isDirectory()) {
+ findJars(file, urls);
+ } else if (file.getName().endsWith(".jar")) {
+ urls.add(file.toURI().toURL());
+ }
+ }
+ return urls;
+ } catch (MalformedURLException e) {
+ throw Throwables.propagate(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-core/src/main/java/org/apache/twill/internal/kafka/client/AbstractCompressedMessageSetEncoder.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/AbstractCompressedMessageSetEncoder.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/AbstractCompressedMessageSetEncoder.java
new file mode 100644
index 0000000..a9c3381
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/AbstractCompressedMessageSetEncoder.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.kafka.client;
+
+import com.google.common.base.Throwables;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBufferOutputStream;
+import org.jboss.netty.buffer.ChannelBuffers;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * A base implementation of {@link MessageSetEncoder} that do message compression.
+ */
+abstract class AbstractCompressedMessageSetEncoder extends AbstractMessageSetEncoder {
+
+ private final Compression compression;
+ private ChannelBufferOutputStream os;
+ private OutputStream compressedOutput;
+
+
+ protected AbstractCompressedMessageSetEncoder(Compression compression) {
+ this.compression = compression;
+ try {
+ this.os = new ChannelBufferOutputStream(ChannelBuffers.dynamicBuffer());
+ this.compressedOutput = createCompressedStream(os);
+ } catch (IOException e) {
+ // Should never happen
+ throw Throwables.propagate(e);
+ }
+ }
+
+ @Override
+ public final MessageSetEncoder add(ChannelBuffer payload) {
+ try {
+ ChannelBuffer encoded = encodePayload(payload);
+ encoded.readBytes(compressedOutput, encoded.readableBytes());
+ } catch (IOException e) {
+ throw Throwables.propagate(e);
+ }
+ return this;
+
+ }
+
+ @Override
+ public final ChannelBuffer finish() {
+ try {
+ compressedOutput.close();
+ ChannelBuffer buf = prefixLength(encodePayload(os.buffer(), compression));
+ compressedOutput = createCompressedStream(os);
+ os.buffer().clear();
+
+ return buf;
+
+ } catch (IOException e) {
+ throw Throwables.propagate(e);
+ }
+
+ }
+
+ protected abstract OutputStream createCompressedStream(OutputStream os) throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-core/src/main/java/org/apache/twill/internal/kafka/client/AbstractMessageSetEncoder.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/AbstractMessageSetEncoder.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/AbstractMessageSetEncoder.java
new file mode 100644
index 0000000..9955d6a
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/AbstractMessageSetEncoder.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.kafka.client;
+
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+
+import java.util.zip.CRC32;
+
+/**
+ * A base implementation of {@link MessageSetEncoder}.
+ */
+abstract class AbstractMessageSetEncoder implements MessageSetEncoder {
+
+ private static final ThreadLocal<CRC32> CRC32_LOCAL = new ThreadLocal<CRC32>() {
+ @Override
+ protected CRC32 initialValue() {
+ return new CRC32();
+ }
+ };
+
+ protected final int computeCRC32(ChannelBuffer buffer) {
+ CRC32 crc32 = CRC32_LOCAL.get();
+ crc32.reset();
+
+ if (buffer.hasArray()) {
+ crc32.update(buffer.array(), buffer.arrayOffset() + buffer.readerIndex(), buffer.readableBytes());
+ } else {
+ byte[] bytes = new byte[buffer.readableBytes()];
+ buffer.getBytes(buffer.readerIndex(), bytes);
+ crc32.update(bytes);
+ }
+ return (int) crc32.getValue();
+ }
+
+ protected final ChannelBuffer encodePayload(ChannelBuffer payload) {
+ return encodePayload(payload, Compression.NONE);
+ }
+
+ protected final ChannelBuffer encodePayload(ChannelBuffer payload, Compression compression) {
+ ChannelBuffer header = ChannelBuffers.buffer(10);
+
+ int crc = computeCRC32(payload);
+
+ int magic = ((compression == Compression.NONE) ? 0 : 1);
+
+ // Message length = 1 byte magic + (optional 1 compression byte) + 4 bytes crc + payload length
+ header.writeInt(5 + magic + payload.readableBytes());
+ // Magic number = 0 for non-compressed data
+ header.writeByte(magic);
+ if (magic > 0) {
+ header.writeByte(compression.getCode());
+ }
+ header.writeInt(crc);
+
+ return ChannelBuffers.wrappedBuffer(header, payload);
+ }
+
+ protected final ChannelBuffer prefixLength(ChannelBuffer buffer) {
+ ChannelBuffer sizeBuf = ChannelBuffers.buffer(4);
+ sizeBuf.writeInt(buffer.readableBytes());
+ return ChannelBuffers.wrappedBuffer(sizeBuf, buffer);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-core/src/main/java/org/apache/twill/internal/kafka/client/BasicFetchedMessage.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/BasicFetchedMessage.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/BasicFetchedMessage.java
new file mode 100644
index 0000000..286bf82
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/BasicFetchedMessage.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.kafka.client;
+
+import org.apache.twill.kafka.client.FetchedMessage;
+
+import java.nio.ByteBuffer;
+
+/**
+ *
+ */
+final class BasicFetchedMessage implements FetchedMessage {
+
+ private final long offset;
+ private final ByteBuffer buffer;
+
+ BasicFetchedMessage(long offset, ByteBuffer buffer) {
+ this.offset = offset;
+ this.buffer = buffer;
+ }
+
+ @Override
+ public long getOffset() {
+ return offset;
+ }
+
+ @Override
+ public ByteBuffer getBuffer() {
+ return buffer;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-core/src/main/java/org/apache/twill/internal/kafka/client/Bufferer.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/Bufferer.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/Bufferer.java
new file mode 100644
index 0000000..c1fb4f2
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/Bufferer.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.kafka.client;
+
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+
+/**
+ * A class to help buffering data of format [len][payload-of-len].
+ */
+final class Bufferer {
+
+ private ChannelBuffer currentBuffer = null;
+ private int currentSize = -1;
+
+ void apply(ChannelBuffer buffer) {
+ currentBuffer = concatBuffer(currentBuffer, buffer);
+ }
+
+ /**
+ * Returns the buffer if the buffer data is ready to be consumed,
+ * otherwise return {@link ChannelBuffers#EMPTY_BUFFER}.
+ */
+ ChannelBuffer getNext() {
+ if (currentSize < 0) {
+ if (currentBuffer.readableBytes() < 4) {
+ return ChannelBuffers.EMPTY_BUFFER;
+ }
+ currentSize = currentBuffer.readInt();
+ }
+
+ // Keep buffering if less then required number of bytes
+ if (currentBuffer.readableBytes() < currentSize) {
+ return ChannelBuffers.EMPTY_BUFFER;
+ }
+
+ ChannelBuffer result = currentBuffer.readSlice(currentSize);
+ currentSize = -1;
+
+ return result;
+ }
+
+ private ChannelBuffer concatBuffer(ChannelBuffer current, ChannelBuffer buffer) {
+ return current == null ? buffer : ChannelBuffers.wrappedBuffer(current, buffer);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-core/src/main/java/org/apache/twill/internal/kafka/client/Compression.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/Compression.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/Compression.java
new file mode 100644
index 0000000..3355b9f
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/Compression.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.kafka.client;
+
+/**
+ * Enum for indicating compression method.
+ */
+public enum Compression {
+ NONE(0),
+ GZIP(1),
+ SNAPPY(2);
+
+ private final int code;
+
+ Compression(int code) {
+ this.code = code;
+ }
+
+ public int getCode() {
+ return code;
+ }
+
+ public static Compression fromCode(int code) {
+ switch (code) {
+ case 0:
+ return NONE;
+ case 1:
+ return GZIP;
+ case 2:
+ return SNAPPY;
+ }
+ throw new IllegalArgumentException("Unknown compression code.");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-core/src/main/java/org/apache/twill/internal/kafka/client/ConnectionPool.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/ConnectionPool.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/ConnectionPool.java
new file mode 100644
index 0000000..c2865ba
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/ConnectionPool.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.kafka.client;
+
+import com.google.common.collect.Maps;
+import org.jboss.netty.bootstrap.ClientBootstrap;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelFutureListener;
+import org.jboss.netty.channel.group.ChannelGroup;
+import org.jboss.netty.channel.group.ChannelGroupFuture;
+import org.jboss.netty.channel.group.ChannelGroupFutureListener;
+import org.jboss.netty.channel.group.DefaultChannelGroup;
+
+import java.net.InetSocketAddress;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * Provides netty socket connection reuse.
+ */
+final class ConnectionPool {
+
+ private final ClientBootstrap bootstrap;
+ private final ChannelGroup channelGroup;
+ private final ConcurrentMap<InetSocketAddress, Queue<ChannelFuture>> connections;
+
+ /**
+ * For releasing a connection back to the pool.
+ */
+ interface ConnectionReleaser {
+ void release();
+ }
+
+ /**
+ * Result of a connect request.
+ */
+ interface ConnectResult extends ConnectionReleaser {
+ ChannelFuture getChannelFuture();
+ }
+
+ ConnectionPool(ClientBootstrap bootstrap) {
+ this.bootstrap = bootstrap;
+ this.channelGroup = new DefaultChannelGroup();
+ this.connections = Maps.newConcurrentMap();
+ }
+
+ ConnectResult connect(InetSocketAddress address) {
+ Queue<ChannelFuture> channelFutures = connections.get(address);
+ if (channelFutures == null) {
+ channelFutures = new ConcurrentLinkedQueue<ChannelFuture>();
+ Queue<ChannelFuture> result = connections.putIfAbsent(address, channelFutures);
+ channelFutures = result == null ? channelFutures : result;
+ }
+
+ ChannelFuture channelFuture = channelFutures.poll();
+ while (channelFuture != null) {
+ if (channelFuture.isSuccess() && channelFuture.getChannel().isConnected()) {
+ return new SimpleConnectResult(address, channelFuture);
+ }
+ channelFuture = channelFutures.poll();
+ }
+
+ channelFuture = bootstrap.connect(address);
+ channelFuture.addListener(new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ if (future.isSuccess()) {
+ channelGroup.add(future.getChannel());
+ }
+ }
+ });
+ return new SimpleConnectResult(address, channelFuture);
+ }
+
+ ChannelGroupFuture close() {
+ ChannelGroupFuture result = channelGroup.close();
+ result.addListener(new ChannelGroupFutureListener() {
+ @Override
+ public void operationComplete(ChannelGroupFuture future) throws Exception {
+ bootstrap.releaseExternalResources();
+ }
+ });
+ return result;
+ }
+
+ private final class SimpleConnectResult implements ConnectResult {
+
+ private final InetSocketAddress address;
+ private final ChannelFuture future;
+
+
+ private SimpleConnectResult(InetSocketAddress address, ChannelFuture future) {
+ this.address = address;
+ this.future = future;
+ }
+
+ @Override
+ public ChannelFuture getChannelFuture() {
+ return future;
+ }
+
+ @Override
+ public void release() {
+ if (future.isSuccess()) {
+ connections.get(address).offer(future);
+ }
+ }
+ }
+}