You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@twill.apache.org by ch...@apache.org on 2013/12/13 23:27:41 UTC
[25/28] [TWILL-14] Bootstrapping for the site generation.
Reorganization of the source tree happens:
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/core/src/main/java/org/apache/twill/internal/ProcessLauncher.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/ProcessLauncher.java b/core/src/main/java/org/apache/twill/internal/ProcessLauncher.java
deleted file mode 100644
index e48a226..0000000
--- a/core/src/main/java/org/apache/twill/internal/ProcessLauncher.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal;
-
-import org.apache.twill.api.LocalFile;
-
-import java.util.Map;
-
-/**
- * Class for launching container process.
- *
- * @param <T> Type of the object that contains information about the container that the process is going to launch.
- */
-public interface ProcessLauncher<T> {
-
- /**
- * Returns information about the container that this launch would launch process in.
- */
- T getContainerInfo();
-
- /**
- * Returns a preparer with the given default set of environments, resources and credentials.
- */
- <C> PrepareLaunchContext prepareLaunch(Map<String, String> environments,
- Iterable<LocalFile> resources, C credentials);
-
- /**
- * For setting up the launcher.
- */
- interface PrepareLaunchContext {
-
- ResourcesAdder withResources();
-
- AfterResources noResources();
-
- interface ResourcesAdder {
- MoreResources add(LocalFile localFile);
- }
-
- interface AfterResources {
- EnvironmentAdder withEnvironment();
-
- AfterEnvironment noEnvironment();
- }
-
- interface EnvironmentAdder {
- <V> MoreEnvironment add(String key, V value);
- }
-
- interface MoreEnvironment extends EnvironmentAdder, AfterEnvironment {
- }
-
- interface AfterEnvironment {
- CommandAdder withCommands();
- }
-
- interface MoreResources extends ResourcesAdder, AfterResources { }
-
- interface CommandAdder {
- StdOutSetter add(String cmd, String...args);
- }
-
- interface StdOutSetter {
- StdErrSetter redirectOutput(String stdout);
-
- StdErrSetter noOutput();
- }
-
- interface StdErrSetter {
- MoreCommand redirectError(String stderr);
-
- MoreCommand noError();
- }
-
- interface MoreCommand extends CommandAdder {
- <R> ProcessController<R> launch();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/core/src/main/java/org/apache/twill/internal/SingleRunnableApplication.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/SingleRunnableApplication.java b/core/src/main/java/org/apache/twill/internal/SingleRunnableApplication.java
deleted file mode 100644
index a52afe1..0000000
--- a/core/src/main/java/org/apache/twill/internal/SingleRunnableApplication.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal;
-
-import org.apache.twill.api.ResourceSpecification;
-import org.apache.twill.api.TwillApplication;
-import org.apache.twill.api.TwillRunnable;
-import org.apache.twill.api.TwillRunnableSpecification;
-import org.apache.twill.api.TwillSpecification;
-
-/**
- * A simple {@link org.apache.twill.api.TwillApplication} that contains only one {@link org.apache.twill.api.TwillRunnable}.
- */
-public class SingleRunnableApplication implements TwillApplication {
-
- private final TwillRunnable runnable;
- private final ResourceSpecification resourceSpec;
-
- public SingleRunnableApplication(TwillRunnable runnable, ResourceSpecification resourceSpec) {
- this.runnable = runnable;
- this.resourceSpec = resourceSpec;
- }
-
- @Override
- public TwillSpecification configure() {
- TwillRunnableSpecification runnableSpec = runnable.configure();
- return TwillSpecification.Builder.with()
- .setName(runnableSpec.getName())
- .withRunnable().add(runnableSpec.getName(), runnable, resourceSpec)
- .noLocalFiles()
- .anyOrder()
- .build();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/core/src/main/java/org/apache/twill/internal/TwillContainerController.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/TwillContainerController.java b/core/src/main/java/org/apache/twill/internal/TwillContainerController.java
deleted file mode 100644
index 8b090bd..0000000
--- a/core/src/main/java/org/apache/twill/internal/TwillContainerController.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal;
-
-import org.apache.twill.api.ServiceController;
-import org.apache.twill.internal.state.Message;
-import com.google.common.util.concurrent.ListenableFuture;
-
-/**
- * A {@link ServiceController} that allows sending a message directly. Internal use only.
- */
-public interface TwillContainerController extends ServiceController {
-
- ListenableFuture<Message> sendMessage(Message message);
-
- /**
- * Calls to indicated that the container that this controller is associated with is completed.
- * Any resources it hold will be releases and all pending futures will be cancelled.
- */
- void completed(int exitStatus);
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java b/core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java
deleted file mode 100644
index 63f8732..0000000
--- a/core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java
+++ /dev/null
@@ -1,181 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal;
-
-import org.apache.twill.api.LocalFile;
-import org.apache.twill.api.RunId;
-import org.apache.twill.api.RuntimeSpecification;
-import org.apache.twill.filesystem.Location;
-import org.apache.twill.internal.state.Message;
-import org.apache.twill.internal.state.StateNode;
-import org.apache.twill.launcher.TwillLauncher;
-import org.apache.twill.zookeeper.NodeData;
-import org.apache.twill.zookeeper.ZKClient;
-import com.google.common.util.concurrent.ListenableFuture;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-
-/**
- * This class helps launching a container.
- */
-public final class TwillContainerLauncher {
-
- private static final Logger LOG = LoggerFactory.getLogger(TwillContainerLauncher.class);
-
- private static final double HEAP_MIN_RATIO = 0.7d;
-
- private final RuntimeSpecification runtimeSpec;
- private final ProcessLauncher.PrepareLaunchContext launchContext;
- private final ZKClient zkClient;
- private final int instanceCount;
- private final String jvmOpts;
- private final int reservedMemory;
- private final Location secureStoreLocation;
-
- public TwillContainerLauncher(RuntimeSpecification runtimeSpec, ProcessLauncher.PrepareLaunchContext launchContext,
- ZKClient zkClient, int instanceCount, String jvmOpts, int reservedMemory,
- Location secureStoreLocation) {
- this.runtimeSpec = runtimeSpec;
- this.launchContext = launchContext;
- this.zkClient = zkClient;
- this.instanceCount = instanceCount;
- this.jvmOpts = jvmOpts;
- this.reservedMemory = reservedMemory;
- this.secureStoreLocation = secureStoreLocation;
- }
-
- public TwillContainerController start(RunId runId, int instanceId, Class<?> mainClass, String classPath) {
- ProcessLauncher.PrepareLaunchContext.AfterResources afterResources = null;
- ProcessLauncher.PrepareLaunchContext.ResourcesAdder resourcesAdder = null;
-
- // Adds all file to be localized to container
- if (!runtimeSpec.getLocalFiles().isEmpty()) {
- resourcesAdder = launchContext.withResources();
-
- for (LocalFile localFile : runtimeSpec.getLocalFiles()) {
- afterResources = resourcesAdder.add(localFile);
- }
- }
-
- // Optionally localize secure store.
- try {
- if (secureStoreLocation != null && secureStoreLocation.exists()) {
- if (resourcesAdder == null) {
- resourcesAdder = launchContext.withResources();
- }
- afterResources = resourcesAdder.add(new DefaultLocalFile(Constants.Files.CREDENTIALS,
- secureStoreLocation.toURI(),
- secureStoreLocation.lastModified(),
- secureStoreLocation.length(), false, null));
- }
- } catch (IOException e) {
- LOG.warn("Failed to launch container with secure store {}.", secureStoreLocation.toURI());
- }
-
- if (afterResources == null) {
- afterResources = launchContext.noResources();
- }
-
- int memory = runtimeSpec.getResourceSpecification().getMemorySize();
- if (((double) (memory - reservedMemory) / memory) >= HEAP_MIN_RATIO) {
- // Reduce -Xmx by the reserved memory size.
- memory = runtimeSpec.getResourceSpecification().getMemorySize() - reservedMemory;
- } else {
- // If it is a small VM, just discount it by the min ratio.
- memory = (int) Math.ceil(memory * HEAP_MIN_RATIO);
- }
-
- // Currently no reporting is supported for runnable containers
- ProcessController<Void> processController = afterResources
- .withEnvironment()
- .add(EnvKeys.TWILL_RUN_ID, runId.getId())
- .add(EnvKeys.TWILL_RUNNABLE_NAME, runtimeSpec.getName())
- .add(EnvKeys.TWILL_INSTANCE_ID, Integer.toString(instanceId))
- .add(EnvKeys.TWILL_INSTANCE_COUNT, Integer.toString(instanceCount))
- .withCommands()
- .add("java",
- "-Djava.io.tmpdir=tmp",
- "-Dyarn.container=$" + EnvKeys.YARN_CONTAINER_ID,
- "-Dtwill.runnable=$" + EnvKeys.TWILL_APP_NAME + ".$" + EnvKeys.TWILL_RUNNABLE_NAME,
- "-cp", Constants.Files.LAUNCHER_JAR + ":" + classPath,
- "-Xmx" + memory + "m",
- jvmOpts,
- TwillLauncher.class.getName(),
- Constants.Files.CONTAINER_JAR,
- mainClass.getName(),
- Boolean.TRUE.toString())
- .redirectOutput(Constants.STDOUT).redirectError(Constants.STDERR)
- .launch();
-
- TwillContainerControllerImpl controller = new TwillContainerControllerImpl(zkClient, runId, processController);
- controller.start();
- return controller;
- }
-
- private static final class TwillContainerControllerImpl extends AbstractZKServiceController
- implements TwillContainerController {
-
- private final ProcessController<Void> processController;
-
- protected TwillContainerControllerImpl(ZKClient zkClient, RunId runId,
- ProcessController<Void> processController) {
- super(runId, zkClient);
- this.processController = processController;
- }
-
- @Override
- protected void doStartUp() {
- // No-op
- }
-
- @Override
- protected void doShutDown() {
- // No-op
- }
-
- @Override
- protected void instanceNodeUpdated(NodeData nodeData) {
- // No-op
- }
-
- @Override
- protected void stateNodeUpdated(StateNode stateNode) {
- // No-op
- }
-
- @Override
- public ListenableFuture<Message> sendMessage(Message message) {
- return sendMessage(message, message);
- }
-
- @Override
- public synchronized void completed(int exitStatus) {
- if (exitStatus != 0) { // If a container terminated with exit code != 0, treat it as error
-// fireStateChange(new StateNode(State.FAILED, new StackTraceElement[0]));
- }
- forceShutDown();
- }
-
- @Override
- public void kill() {
- processController.cancel();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/core/src/main/java/org/apache/twill/internal/ZKMessages.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/ZKMessages.java b/core/src/main/java/org/apache/twill/internal/ZKMessages.java
deleted file mode 100644
index 03575dd..0000000
--- a/core/src/main/java/org/apache/twill/internal/ZKMessages.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal;
-
-import org.apache.twill.internal.state.Message;
-import org.apache.twill.internal.state.MessageCodec;
-import org.apache.twill.zookeeper.ZKClient;
-import org.apache.twill.zookeeper.ZKOperations;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
-import org.apache.zookeeper.CreateMode;
-
-/**
- *
- */
-public final class ZKMessages {
-
- /**
- * Creates a message node in zookeeper. The message node created is a PERSISTENT_SEQUENTIAL node.
- *
- * @param zkClient The ZooKeeper client for interacting with ZooKeeper.
- * @param messagePathPrefix ZooKeeper path prefix for the message node.
- * @param message The {@link Message} object for the content of the message node.
- * @param completionResult Object to set to the result future when the message is processed.
- * @param <V> Type of the completion result.
- * @return A {@link ListenableFuture} that will be completed when the message is consumed, which indicated
- * by deletion of the node. If there is exception during the process, it will be reflected
- * to the future returned.
- */
- public static <V> ListenableFuture<V> sendMessage(final ZKClient zkClient, String messagePathPrefix,
- Message message, final V completionResult) {
- SettableFuture<V> result = SettableFuture.create();
- sendMessage(zkClient, messagePathPrefix, message, result, completionResult);
- return result;
- }
-
- /**
- * Creates a message node in zookeeper. The message node created is a PERSISTENT_SEQUENTIAL node.
- *
- * @param zkClient The ZooKeeper client for interacting with ZooKeeper.
- * @param messagePathPrefix ZooKeeper path prefix for the message node.
- * @param message The {@link Message} object for the content of the message node.
- * @param completion A {@link SettableFuture} to reflect the result of message process completion.
- * @param completionResult Object to set to the result future when the message is processed.
- * @param <V> Type of the completion result.
- */
- public static <V> void sendMessage(final ZKClient zkClient, String messagePathPrefix, Message message,
- final SettableFuture<V> completion, final V completionResult) {
-
- // Creates a message and watch for its deletion for completion.
- Futures.addCallback(zkClient.create(messagePathPrefix, MessageCodec.encode(message),
- CreateMode.PERSISTENT_SEQUENTIAL), new FutureCallback<String>() {
- @Override
- public void onSuccess(String path) {
- Futures.addCallback(ZKOperations.watchDeleted(zkClient, path), new FutureCallback<String>() {
- @Override
- public void onSuccess(String result) {
- completion.set(completionResult);
- }
-
- @Override
- public void onFailure(Throwable t) {
- completion.setException(t);
- }
- });
- }
-
- @Override
- public void onFailure(Throwable t) {
- completion.setException(t);
- }
- });
- }
-
- private ZKMessages() {
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/core/src/main/java/org/apache/twill/internal/ZKServiceDecorator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/ZKServiceDecorator.java b/core/src/main/java/org/apache/twill/internal/ZKServiceDecorator.java
deleted file mode 100644
index 7313d33..0000000
--- a/core/src/main/java/org/apache/twill/internal/ZKServiceDecorator.java
+++ /dev/null
@@ -1,482 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal;
-
-import org.apache.twill.api.RunId;
-import org.apache.twill.api.ServiceController;
-import org.apache.twill.common.ServiceListenerAdapter;
-import org.apache.twill.common.Threads;
-import org.apache.twill.internal.json.StackTraceElementCodec;
-import org.apache.twill.internal.json.StateNodeCodec;
-import org.apache.twill.internal.state.Message;
-import org.apache.twill.internal.state.MessageCallback;
-import org.apache.twill.internal.state.MessageCodec;
-import org.apache.twill.internal.state.StateNode;
-import org.apache.twill.internal.state.SystemMessages;
-import org.apache.twill.zookeeper.NodeChildren;
-import org.apache.twill.zookeeper.NodeData;
-import org.apache.twill.zookeeper.OperationFuture;
-import org.apache.twill.zookeeper.ZKClient;
-import org.apache.twill.zookeeper.ZKOperations;
-import com.google.common.base.Charsets;
-import com.google.common.base.Supplier;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.AbstractService;
-import com.google.common.util.concurrent.AsyncFunction;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.Service;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Nullable;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-/**
- * A {@link Service} decorator that wrap another {@link Service} with the service states reflected
- * to ZooKeeper.
- */
-public final class ZKServiceDecorator extends AbstractService {
-
- private static final Logger LOG = LoggerFactory.getLogger(ZKServiceDecorator.class);
-
- private final ZKClient zkClient;
- private final RunId id;
- private final Supplier<? extends JsonElement> liveNodeData;
- private final Service decoratedService;
- private final MessageCallbackCaller messageCallback;
- private ExecutorService callbackExecutor;
-
-
- public ZKServiceDecorator(ZKClient zkClient, RunId id, Supplier<? extends JsonElement> liveNodeData,
- Service decoratedService) {
- this(zkClient, id, liveNodeData, decoratedService, null);
- }
-
- /**
- * Creates a ZKServiceDecorator.
- * @param zkClient ZooKeeper client
- * @param id The run id of the service
- * @param liveNodeData A supplier for providing information writing to live node.
- * @param decoratedService The Service for monitoring state changes
- * @param finalizer An optional Runnable to run when this decorator terminated.
- */
- public ZKServiceDecorator(ZKClient zkClient, RunId id, Supplier <? extends JsonElement> liveNodeData,
- Service decoratedService, @Nullable Runnable finalizer) {
- this.zkClient = zkClient;
- this.id = id;
- this.liveNodeData = liveNodeData;
- this.decoratedService = decoratedService;
- if (decoratedService instanceof MessageCallback) {
- this.messageCallback = new MessageCallbackCaller((MessageCallback) decoratedService, zkClient);
- } else {
- this.messageCallback = new MessageCallbackCaller(zkClient);
- }
- if (finalizer != null) {
- addFinalizer(finalizer);
- }
- }
-
- /**
- * Deletes the given ZK path recursively and create the path again.
- */
- private ListenableFuture<String> deleteAndCreate(final String path, final byte[] data, final CreateMode mode) {
- return Futures.transform(ZKOperations.ignoreError(ZKOperations.recursiveDelete(zkClient, path),
- KeeperException.NoNodeException.class, null),
- new AsyncFunction<String, String>() {
- @Override
- public ListenableFuture<String> apply(String input) throws Exception {
- return zkClient.create(path, data, mode);
- }
- }, Threads.SAME_THREAD_EXECUTOR);
- }
-
- @Override
- protected void doStart() {
- callbackExecutor = Executors.newSingleThreadExecutor(Threads.createDaemonThreadFactory("message-callback"));
- Futures.addCallback(createLiveNode(), new FutureCallback<String>() {
- @Override
- public void onSuccess(String result) {
- // Create nodes for states and messaging
- StateNode stateNode = new StateNode(ServiceController.State.STARTING);
-
- final ListenableFuture<List<String>> createFuture = Futures.allAsList(
- deleteAndCreate(getZKPath("messages"), null, CreateMode.PERSISTENT),
- deleteAndCreate(getZKPath("state"), encodeStateNode(stateNode), CreateMode.PERSISTENT)
- );
-
- createFuture.addListener(new Runnable() {
- @Override
- public void run() {
- try {
- createFuture.get();
- // Starts the decorated service
- decoratedService.addListener(createListener(), Threads.SAME_THREAD_EXECUTOR);
- decoratedService.start();
- } catch (Exception e) {
- notifyFailed(e);
- }
- }
- }, Threads.SAME_THREAD_EXECUTOR);
- }
-
- @Override
- public void onFailure(Throwable t) {
- notifyFailed(t);
- }
- });
- }
-
- @Override
- protected void doStop() {
- // Stops the decorated service
- decoratedService.stop();
- callbackExecutor.shutdownNow();
- }
-
- private void addFinalizer(final Runnable finalizer) {
- addListener(new ServiceListenerAdapter() {
- @Override
- public void terminated(State from) {
- try {
- finalizer.run();
- } catch (Throwable t) {
- LOG.warn("Exception when running finalizer.", t);
- }
- }
-
- @Override
- public void failed(State from, Throwable failure) {
- try {
- finalizer.run();
- } catch (Throwable t) {
- LOG.warn("Exception when running finalizer.", t);
- }
- }
- }, Threads.SAME_THREAD_EXECUTOR);
- }
-
- private OperationFuture<String> createLiveNode() {
- String liveNode = getLiveNodePath();
- LOG.info("Create live node {}{}", zkClient.getConnectString(), liveNode);
-
- JsonObject content = new JsonObject();
- content.add("data", liveNodeData.get());
- return ZKOperations.ignoreError(zkClient.create(liveNode, encodeJson(content), CreateMode.EPHEMERAL),
- KeeperException.NodeExistsException.class, liveNode);
- }
-
- private OperationFuture<String> removeLiveNode() {
- String liveNode = getLiveNodePath();
- LOG.info("Remove live node {}{}", zkClient.getConnectString(), liveNode);
- return ZKOperations.ignoreError(zkClient.delete(liveNode), KeeperException.NoNodeException.class, liveNode);
- }
-
- private OperationFuture<String> removeServiceNode() {
- String serviceNode = String.format("/%s", id.getId());
- LOG.info("Remove service node {}{}", zkClient.getConnectString(), serviceNode);
- return ZKOperations.recursiveDelete(zkClient, serviceNode);
- }
-
- private void watchMessages() {
- final String messagesPath = getZKPath("messages");
- Futures.addCallback(zkClient.getChildren(messagesPath, new Watcher() {
- @Override
- public void process(WatchedEvent event) {
- // TODO: Do we need to deal with other type of events?
- if (event.getType() == Event.EventType.NodeChildrenChanged && decoratedService.isRunning()) {
- watchMessages();
- }
- }
- }), new FutureCallback<NodeChildren>() {
- @Override
- public void onSuccess(NodeChildren result) {
- // Sort by the name, which is the messageId. Assumption is that message ids is ordered by time.
- List<String> messages = Lists.newArrayList(result.getChildren());
- Collections.sort(messages);
- for (String messageId : messages) {
- processMessage(messagesPath + "/" + messageId, messageId);
- }
- }
-
- @Override
- public void onFailure(Throwable t) {
- // TODO: what could be done besides just logging?
- LOG.error("Failed to watch messages.", t);
- }
- });
- }
-
- private void processMessage(final String path, final String messageId) {
- Futures.addCallback(zkClient.getData(path), new FutureCallback<NodeData>() {
- @Override
- public void onSuccess(NodeData result) {
- Message message = MessageCodec.decode(result.getData());
- if (message == null) {
- LOG.error("Failed to decode message for " + messageId + " in " + path);
- listenFailure(zkClient.delete(path, result.getStat().getVersion()));
- return;
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("Message received from " + path + ": " + new String(MessageCodec.encode(message), Charsets.UTF_8));
- }
- if (handleStopMessage(message, getDeleteSupplier(path, result.getStat().getVersion()))) {
- return;
- }
- messageCallback.onReceived(callbackExecutor, path, result.getStat().getVersion(), messageId, message);
- }
-
- @Override
- public void onFailure(Throwable t) {
- LOG.error("Failed to fetch message content.", t);
- }
- });
- }
-
- private <V> boolean handleStopMessage(Message message, final Supplier<OperationFuture<V>> postHandleSupplier) {
- if (message.getType() == Message.Type.SYSTEM && SystemMessages.STOP_COMMAND.equals(message.getCommand())) {
- callbackExecutor.execute(new Runnable() {
- @Override
- public void run() {
- decoratedService.stop().addListener(new Runnable() {
-
- @Override
- public void run() {
- stopServiceOnComplete(postHandleSupplier.get(), ZKServiceDecorator.this);
- }
- }, MoreExecutors.sameThreadExecutor());
- }
- });
- return true;
- }
- return false;
- }
-
-
- private Supplier<OperationFuture<String>> getDeleteSupplier(final String path, final int version) {
- return new Supplier<OperationFuture<String>>() {
- @Override
- public OperationFuture<String> get() {
- return zkClient.delete(path, version);
- }
- };
- }
-
- private Listener createListener() {
- return new DecoratedServiceListener();
- }
-
- private <V> byte[] encode(V data, Class<? extends V> clz) {
- return new GsonBuilder().registerTypeAdapter(StateNode.class, new StateNodeCodec())
- .registerTypeAdapter(StackTraceElement.class, new StackTraceElementCodec())
- .create()
- .toJson(data, clz).getBytes(Charsets.UTF_8);
- }
-
- private byte[] encodeStateNode(StateNode stateNode) {
- return encode(stateNode, StateNode.class);
- }
-
- private <V extends JsonElement> byte[] encodeJson(V json) {
- return new Gson().toJson(json).getBytes(Charsets.UTF_8);
- }
-
- private String getZKPath(String path) {
- return String.format("/%s/%s", id, path);
- }
-
- private String getLiveNodePath() {
- return "/instances/" + id;
- }
-
- private static <V> OperationFuture<V> listenFailure(final OperationFuture<V> operationFuture) {
- operationFuture.addListener(new Runnable() {
-
- @Override
- public void run() {
- try {
- if (!operationFuture.isCancelled()) {
- operationFuture.get();
- }
- } catch (Exception e) {
- // TODO: what could be done besides just logging?
- LOG.error("Operation execution failed for " + operationFuture.getRequestPath(), e);
- }
- }
- }, Threads.SAME_THREAD_EXECUTOR);
- return operationFuture;
- }
-
- private static final class MessageCallbackCaller {
- private final MessageCallback callback;
- private final ZKClient zkClient;
-
- private MessageCallbackCaller(ZKClient zkClient) {
- this(null, zkClient);
- }
-
- private MessageCallbackCaller(MessageCallback callback, ZKClient zkClient) {
- this.callback = callback;
- this.zkClient = zkClient;
- }
-
- public void onReceived(Executor executor, final String path,
- final int version, final String id, final Message message) {
- if (callback == null) {
- // Simply delete the message
- if (LOG.isDebugEnabled()) {
- LOG.debug("Ignoring incoming message from " + path + ": " + message);
- }
- listenFailure(zkClient.delete(path, version));
- return;
- }
-
- executor.execute(new Runnable() {
- @Override
- public void run() {
- try {
- // Message process is synchronous for now. Making it async needs more thoughts about race conditions.
- // The executor is the callbackExecutor which is a single thread executor.
- callback.onReceived(id, message).get();
- } catch (Throwable t) {
- LOG.error("Exception when processing message: {}, {}, {}", id, message, path, t);
- } finally {
- listenFailure(zkClient.delete(path, version));
- }
- }
- });
- }
- }
-
- private final class DecoratedServiceListener implements Listener {
- private volatile boolean zkFailure = false;
-
- @Override
- public void starting() {
- LOG.info("Starting: " + id);
- saveState(ServiceController.State.STARTING);
- }
-
- @Override
- public void running() {
- LOG.info("Running: " + id);
- notifyStarted();
- watchMessages();
- saveState(ServiceController.State.RUNNING);
- }
-
- @Override
- public void stopping(State from) {
- LOG.info("Stopping: " + id);
- saveState(ServiceController.State.STOPPING);
- }
-
- @Override
- public void terminated(State from) {
- LOG.info("Terminated: " + from + " " + id);
- if (zkFailure) {
- return;
- }
-
- ImmutableList<OperationFuture<String>> futures = ImmutableList.of(removeLiveNode(), removeServiceNode());
- final ListenableFuture<List<String>> future = Futures.allAsList(futures);
- Futures.successfulAsList(futures).addListener(new Runnable() {
- @Override
- public void run() {
- try {
- future.get();
- LOG.info("Service and state node removed");
- notifyStopped();
- } catch (Exception e) {
- LOG.warn("Failed to remove ZK nodes.", e);
- notifyFailed(e);
- }
- }
- }, Threads.SAME_THREAD_EXECUTOR);
- }
-
- @Override
- public void failed(State from, final Throwable failure) {
- LOG.info("Failed: {} {}.", from, id, failure);
- if (zkFailure) {
- return;
- }
-
- ImmutableList<OperationFuture<String>> futures = ImmutableList.of(removeLiveNode(), removeServiceNode());
- Futures.successfulAsList(futures).addListener(new Runnable() {
- @Override
- public void run() {
- LOG.info("Service and state node removed");
- notifyFailed(failure);
- }
- }, Threads.SAME_THREAD_EXECUTOR);
- }
-
- private void saveState(ServiceController.State state) {
- if (zkFailure) {
- return;
- }
- StateNode stateNode = new StateNode(state);
- stopOnFailure(zkClient.setData(getZKPath("state"), encodeStateNode(stateNode)));
- }
-
- private <V> void stopOnFailure(final OperationFuture<V> future) {
- future.addListener(new Runnable() {
- @Override
- public void run() {
- try {
- future.get();
- } catch (final Exception e) {
- LOG.error("ZK operation failed", e);
- zkFailure = true;
- decoratedService.stop().addListener(new Runnable() {
- @Override
- public void run() {
- notifyFailed(e);
- }
- }, Threads.SAME_THREAD_EXECUTOR);
- }
- }
- }, Threads.SAME_THREAD_EXECUTOR);
- }
- }
-
- private <V> ListenableFuture<State> stopServiceOnComplete(ListenableFuture <V> future, final Service service) {
- return Futures.transform(future, new AsyncFunction<V, State>() {
- @Override
- public ListenableFuture<State> apply(V input) throws Exception {
- return service.stop();
- }
- });
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/core/src/main/java/org/apache/twill/internal/json/ArgumentsCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/json/ArgumentsCodec.java b/core/src/main/java/org/apache/twill/internal/json/ArgumentsCodec.java
deleted file mode 100644
index 07d4c1d..0000000
--- a/core/src/main/java/org/apache/twill/internal/json/ArgumentsCodec.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.json;
-
-import org.apache.twill.internal.Arguments;
-import com.google.common.collect.ImmutableMultimap;
-import com.google.common.io.InputSupplier;
-import com.google.common.io.OutputSupplier;
-import com.google.common.reflect.TypeToken;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-import com.google.gson.JsonDeserializationContext;
-import com.google.gson.JsonDeserializer;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonParseException;
-import com.google.gson.JsonSerializationContext;
-import com.google.gson.JsonSerializer;
-
-import java.io.IOException;
-import java.io.Reader;
-import java.io.Writer;
-import java.lang.reflect.Type;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-
-/**
- *
- */
-public final class ArgumentsCodec implements JsonSerializer<Arguments>, JsonDeserializer<Arguments> {
-
- private static final Gson GSON = new GsonBuilder().registerTypeAdapter(Arguments.class, new ArgumentsCodec())
- .create();
-
- public static void encode(Arguments arguments, OutputSupplier<? extends Writer> writerSupplier) throws IOException {
- Writer writer = writerSupplier.getOutput();
- try {
- GSON.toJson(arguments, writer);
- } finally {
- writer.close();
- }
- }
-
-
- public static Arguments decode(InputSupplier<? extends Reader> readerSupplier) throws IOException {
- Reader reader = readerSupplier.getInput();
- try {
- return GSON.fromJson(reader, Arguments.class);
- } finally {
- reader.close();
- }
- }
-
- @Override
- public JsonElement serialize(Arguments src, Type typeOfSrc,
- JsonSerializationContext context) {
- JsonObject json = new JsonObject();
- json.add("arguments", context.serialize(src.getArguments()));
- json.add("runnableArguments", context.serialize(src.getRunnableArguments().asMap()));
-
- return json;
- }
-
- @Override
- public Arguments deserialize(JsonElement json, Type typeOfT,
- JsonDeserializationContext context) throws JsonParseException {
- JsonObject jsonObj = json.getAsJsonObject();
- List<String> arguments = context.deserialize(jsonObj.get("arguments"), new TypeToken<List<String>>() {}.getType());
- Map<String, Collection<String>> args = context.deserialize(jsonObj.get("runnableArguments"),
- new TypeToken<Map<String, Collection<String>>>(){
- }.getType());
-
- ImmutableMultimap.Builder<String, String> builder = ImmutableMultimap.builder();
- for (Map.Entry<String, Collection<String>> entry : args.entrySet()) {
- builder.putAll(entry.getKey(), entry.getValue());
- }
- return new Arguments(arguments, builder.build());
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/core/src/main/java/org/apache/twill/internal/json/JsonUtils.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/json/JsonUtils.java b/core/src/main/java/org/apache/twill/internal/json/JsonUtils.java
deleted file mode 100644
index 9556ad8..0000000
--- a/core/src/main/java/org/apache/twill/internal/json/JsonUtils.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.json;
-
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-
-/**
- * Collections of helper functions for json codec.
- */
-public final class JsonUtils {
-
- private JsonUtils() {
- }
-
- /**
- * Returns a String representation of the given property.
- */
- public static String getAsString(JsonObject json, String property) {
- JsonElement jsonElement = json.get(property);
- if (jsonElement.isJsonNull()) {
- return null;
- }
- if (jsonElement.isJsonPrimitive()) {
- return jsonElement.getAsString();
- }
- return jsonElement.toString();
- }
-
- /**
- * Returns a long representation of the given property.
- */
- public static long getAsLong(JsonObject json, String property, long defaultValue) {
- try {
- return json.get(property).getAsLong();
- } catch (Exception e) {
- return defaultValue;
- }
- }
-
- /**
- * Returns a long representation of the given property.
- */
- public static int getAsInt(JsonObject json, String property, int defaultValue) {
- try {
- return json.get(property).getAsInt();
- } catch (Exception e) {
- return defaultValue;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/core/src/main/java/org/apache/twill/internal/json/LocalFileCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/json/LocalFileCodec.java b/core/src/main/java/org/apache/twill/internal/json/LocalFileCodec.java
deleted file mode 100644
index 680a36c..0000000
--- a/core/src/main/java/org/apache/twill/internal/json/LocalFileCodec.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.json;
-
-import org.apache.twill.api.LocalFile;
-import org.apache.twill.internal.DefaultLocalFile;
-import com.google.gson.JsonDeserializationContext;
-import com.google.gson.JsonDeserializer;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonParseException;
-import com.google.gson.JsonSerializationContext;
-import com.google.gson.JsonSerializer;
-
-import java.lang.reflect.Type;
-import java.net.URI;
-
-/**
- *
- */
-public final class LocalFileCodec implements JsonSerializer<LocalFile>, JsonDeserializer<LocalFile> {
-
- @Override
- public JsonElement serialize(LocalFile src, Type typeOfSrc, JsonSerializationContext context) {
- JsonObject json = new JsonObject();
-
- json.addProperty("name", src.getName());
- json.addProperty("uri", src.getURI().toASCIIString());
- json.addProperty("lastModified", src.getLastModified());
- json.addProperty("size", src.getSize());
- json.addProperty("archive", src.isArchive());
- json.addProperty("pattern", src.getPattern());
-
- return json;
- }
-
- @Override
- public LocalFile deserialize(JsonElement json, Type typeOfT,
- JsonDeserializationContext context) throws JsonParseException {
- JsonObject jsonObj = json.getAsJsonObject();
-
- String name = jsonObj.get("name").getAsString();
- URI uri = URI.create(jsonObj.get("uri").getAsString());
- long lastModified = jsonObj.get("lastModified").getAsLong();
- long size = jsonObj.get("size").getAsLong();
- boolean archive = jsonObj.get("archive").getAsBoolean();
- JsonElement pattern = jsonObj.get("pattern");
-
- return new DefaultLocalFile(name, uri, lastModified, size,
- archive, (pattern == null || pattern.isJsonNull()) ? null : pattern.getAsString());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/core/src/main/java/org/apache/twill/internal/json/ResourceReportAdapter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/json/ResourceReportAdapter.java b/core/src/main/java/org/apache/twill/internal/json/ResourceReportAdapter.java
deleted file mode 100644
index e473fe7..0000000
--- a/core/src/main/java/org/apache/twill/internal/json/ResourceReportAdapter.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.json;
-
-import org.apache.twill.api.ResourceReport;
-import org.apache.twill.api.TwillRunResources;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-
-import java.io.Reader;
-import java.io.Writer;
-
-/**
- * This class provides utility to help encode/decode {@link ResourceReport} to/from Json.
- */
-public final class ResourceReportAdapter {
-
- private final Gson gson;
-
- public static ResourceReportAdapter create() {
- return new ResourceReportAdapter();
- }
-
- private ResourceReportAdapter() {
- gson = new GsonBuilder()
- .serializeNulls()
- .registerTypeAdapter(TwillRunResources.class, new TwillRunResourcesCodec())
- .registerTypeAdapter(ResourceReport.class, new ResourceReportCodec())
- .create();
- }
-
- public String toJson(ResourceReport report) {
- return gson.toJson(report, ResourceReport.class);
- }
-
- public void toJson(ResourceReport report, Writer writer) {
- gson.toJson(report, ResourceReport.class, writer);
- }
-
- public ResourceReport fromJson(String json) {
- return gson.fromJson(json, ResourceReport.class);
- }
-
- public ResourceReport fromJson(Reader reader) {
- return gson.fromJson(reader, ResourceReport.class);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/core/src/main/java/org/apache/twill/internal/json/ResourceReportCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/json/ResourceReportCodec.java b/core/src/main/java/org/apache/twill/internal/json/ResourceReportCodec.java
deleted file mode 100644
index 884d889..0000000
--- a/core/src/main/java/org/apache/twill/internal/json/ResourceReportCodec.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.json;
-
-import org.apache.twill.api.ResourceReport;
-import org.apache.twill.api.TwillRunResources;
-import org.apache.twill.internal.DefaultResourceReport;
-import com.google.gson.JsonDeserializationContext;
-import com.google.gson.JsonDeserializer;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonParseException;
-import com.google.gson.JsonSerializationContext;
-import com.google.gson.JsonSerializer;
-import com.google.gson.reflect.TypeToken;
-
-import java.lang.reflect.Type;
-import java.util.Collection;
-import java.util.Map;
-
-/**
- * Codec for serializing and deserializing a {@link ResourceReport} object using json.
- */
-public final class ResourceReportCodec implements JsonSerializer<ResourceReport>,
- JsonDeserializer<ResourceReport> {
-
- @Override
- public JsonElement serialize(ResourceReport src, Type typeOfSrc, JsonSerializationContext context) {
- JsonObject json = new JsonObject();
-
- json.addProperty("appMasterId", src.getApplicationId());
- json.add("appMasterResources", context.serialize(
- src.getAppMasterResources(), new TypeToken<TwillRunResources>(){}.getType()));
- json.add("runnableResources", context.serialize(
- src.getResources(), new TypeToken<Map<String, Collection<TwillRunResources>>>(){}.getType()));
-
- return json;
- }
-
- @Override
- public ResourceReport deserialize(JsonElement json, Type typeOfT,
- JsonDeserializationContext context) throws JsonParseException {
- JsonObject jsonObj = json.getAsJsonObject();
- String appMasterId = jsonObj.get("appMasterId").getAsString();
- TwillRunResources masterResources = context.deserialize(
- jsonObj.get("appMasterResources"), TwillRunResources.class);
- Map<String, Collection<TwillRunResources>> resources = context.deserialize(
- jsonObj.get("runnableResources"), new TypeToken<Map<String, Collection<TwillRunResources>>>(){}.getType());
-
- return new DefaultResourceReport(appMasterId, masterResources, resources);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/core/src/main/java/org/apache/twill/internal/json/ResourceSpecificationCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/json/ResourceSpecificationCodec.java b/core/src/main/java/org/apache/twill/internal/json/ResourceSpecificationCodec.java
deleted file mode 100644
index bea73c4..0000000
--- a/core/src/main/java/org/apache/twill/internal/json/ResourceSpecificationCodec.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.json;
-
-import org.apache.twill.api.ResourceSpecification;
-import org.apache.twill.internal.DefaultResourceSpecification;
-import com.google.gson.JsonDeserializationContext;
-import com.google.gson.JsonDeserializer;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonParseException;
-import com.google.gson.JsonSerializationContext;
-import com.google.gson.JsonSerializer;
-
-import java.lang.reflect.Type;
-
-/**
- *
- */
-final class ResourceSpecificationCodec implements JsonSerializer<ResourceSpecification>,
- JsonDeserializer<ResourceSpecification> {
-
- @Override
- public JsonElement serialize(ResourceSpecification src, Type typeOfSrc, JsonSerializationContext context) {
- JsonObject json = new JsonObject();
-
- json.addProperty("cores", src.getVirtualCores());
- json.addProperty("memorySize", src.getMemorySize());
- json.addProperty("instances", src.getInstances());
- json.addProperty("uplink", src.getUplink());
- json.addProperty("downlink", src.getDownlink());
-
- return json;
- }
-
- @Override
- public ResourceSpecification deserialize(JsonElement json, Type typeOfT,
- JsonDeserializationContext context) throws JsonParseException {
- JsonObject jsonObj = json.getAsJsonObject();
- return new DefaultResourceSpecification(jsonObj.get("cores").getAsInt(),
- jsonObj.get("memorySize").getAsInt(),
- jsonObj.get("instances").getAsInt(),
- jsonObj.get("uplink").getAsInt(),
- jsonObj.get("downlink").getAsInt());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/core/src/main/java/org/apache/twill/internal/json/RuntimeSpecificationCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/json/RuntimeSpecificationCodec.java b/core/src/main/java/org/apache/twill/internal/json/RuntimeSpecificationCodec.java
deleted file mode 100644
index 867f4a8..0000000
--- a/core/src/main/java/org/apache/twill/internal/json/RuntimeSpecificationCodec.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.json;
-
-import org.apache.twill.api.LocalFile;
-import org.apache.twill.api.ResourceSpecification;
-import org.apache.twill.api.RuntimeSpecification;
-import org.apache.twill.api.TwillRunnableSpecification;
-import org.apache.twill.internal.DefaultRuntimeSpecification;
-import com.google.common.reflect.TypeToken;
-import com.google.gson.JsonDeserializationContext;
-import com.google.gson.JsonDeserializer;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonParseException;
-import com.google.gson.JsonSerializationContext;
-import com.google.gson.JsonSerializer;
-
-import java.lang.reflect.Type;
-import java.util.Collection;
-
-/**
- *
- */
-final class RuntimeSpecificationCodec implements JsonSerializer<RuntimeSpecification>,
- JsonDeserializer<RuntimeSpecification> {
-
- @Override
- public JsonElement serialize(RuntimeSpecification src, Type typeOfSrc, JsonSerializationContext context) {
- JsonObject json = new JsonObject();
- json.addProperty("name", src.getName());
- json.add("runnable", context.serialize(src.getRunnableSpecification(), TwillRunnableSpecification.class));
- json.add("resources", context.serialize(src.getResourceSpecification(), ResourceSpecification.class));
- json.add("files", context.serialize(src.getLocalFiles(), new TypeToken<Collection<LocalFile>>(){}.getType()));
-
- return json;
- }
-
- @Override
- public RuntimeSpecification deserialize(JsonElement json, Type typeOfT,
- JsonDeserializationContext context) throws JsonParseException {
- JsonObject jsonObj = json.getAsJsonObject();
-
- String name = jsonObj.get("name").getAsString();
- TwillRunnableSpecification runnable = context.deserialize(jsonObj.get("runnable"),
- TwillRunnableSpecification.class);
- ResourceSpecification resources = context.deserialize(jsonObj.get("resources"),
- ResourceSpecification.class);
- Collection<LocalFile> files = context.deserialize(jsonObj.get("files"),
- new TypeToken<Collection<LocalFile>>(){}.getType());
-
- return new DefaultRuntimeSpecification(name, runnable, resources, files);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/core/src/main/java/org/apache/twill/internal/json/StackTraceElementCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/json/StackTraceElementCodec.java b/core/src/main/java/org/apache/twill/internal/json/StackTraceElementCodec.java
deleted file mode 100644
index 9a57b46..0000000
--- a/core/src/main/java/org/apache/twill/internal/json/StackTraceElementCodec.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.json;
-
-import com.google.gson.JsonDeserializationContext;
-import com.google.gson.JsonDeserializer;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonParseException;
-import com.google.gson.JsonSerializationContext;
-import com.google.gson.JsonSerializer;
-
-import java.lang.reflect.Type;
-
-/**
- *
- */
-public final class StackTraceElementCodec implements JsonSerializer<StackTraceElement>,
- JsonDeserializer<StackTraceElement> {
-
- @Override
- public StackTraceElement deserialize(JsonElement json, Type typeOfT,
- JsonDeserializationContext context) throws JsonParseException {
- JsonObject jsonObj = json.getAsJsonObject();
- return new StackTraceElement(JsonUtils.getAsString(jsonObj, "className"),
- JsonUtils.getAsString(jsonObj, "method"),
- JsonUtils.getAsString(jsonObj, "file"),
- JsonUtils.getAsInt(jsonObj, "line", -1));
- }
-
- @Override
- public JsonElement serialize(StackTraceElement src, Type typeOfSrc, JsonSerializationContext context) {
- JsonObject jsonObj = new JsonObject();
- jsonObj.addProperty("className", src.getClassName());
- jsonObj.addProperty("method", src.getMethodName());
- jsonObj.addProperty("file", src.getFileName());
- jsonObj.addProperty("line", src.getLineNumber());
-
- return jsonObj;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/core/src/main/java/org/apache/twill/internal/json/StateNodeCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/json/StateNodeCodec.java b/core/src/main/java/org/apache/twill/internal/json/StateNodeCodec.java
deleted file mode 100644
index c1e9d1c..0000000
--- a/core/src/main/java/org/apache/twill/internal/json/StateNodeCodec.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.json;
-
-import org.apache.twill.api.ServiceController;
-import org.apache.twill.internal.state.StateNode;
-import com.google.gson.JsonDeserializationContext;
-import com.google.gson.JsonDeserializer;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonParseException;
-import com.google.gson.JsonSerializationContext;
-import com.google.gson.JsonSerializer;
-
-import java.lang.reflect.Type;
-
-/**
- *
- */
-public final class StateNodeCodec implements JsonSerializer<StateNode>, JsonDeserializer<StateNode> {
-
- @Override
- public StateNode deserialize(JsonElement json, Type typeOfT,
- JsonDeserializationContext context) throws JsonParseException {
- JsonObject jsonObj = json.getAsJsonObject();
- ServiceController.State state = ServiceController.State.valueOf(jsonObj.get("state").getAsString());
- String errorMessage = jsonObj.has("errorMessage") ? jsonObj.get("errorMessage").getAsString() : null;
-
- return new StateNode(state, errorMessage,
- context.<StackTraceElement[]>deserialize(jsonObj.get("stackTraces"), StackTraceElement[].class));
- }
-
- @Override
- public JsonElement serialize(StateNode src, Type typeOfSrc, JsonSerializationContext context) {
- JsonObject jsonObj = new JsonObject();
- jsonObj.addProperty("state", src.getState().name());
- if (src.getErrorMessage() != null) {
- jsonObj.addProperty("errorMessage", src.getErrorMessage());
- }
- if (src.getStackTraces() != null) {
- jsonObj.add("stackTraces", context.serialize(src.getStackTraces(), StackTraceElement[].class));
- }
- return jsonObj;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/core/src/main/java/org/apache/twill/internal/json/TwillRunResourcesCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/json/TwillRunResourcesCodec.java b/core/src/main/java/org/apache/twill/internal/json/TwillRunResourcesCodec.java
deleted file mode 100644
index 8951173..0000000
--- a/core/src/main/java/org/apache/twill/internal/json/TwillRunResourcesCodec.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.json;
-
-import org.apache.twill.api.TwillRunResources;
-import org.apache.twill.internal.DefaultTwillRunResources;
-import com.google.gson.JsonDeserializationContext;
-import com.google.gson.JsonDeserializer;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonParseException;
-import com.google.gson.JsonSerializationContext;
-import com.google.gson.JsonSerializer;
-
-import java.lang.reflect.Type;
-
-/**
- * Codec for serializing and deserializing a {@link org.apache.twill.api.TwillRunResources} object using json.
- */
-public final class TwillRunResourcesCodec implements JsonSerializer<TwillRunResources>,
- JsonDeserializer<TwillRunResources> {
-
- @Override
- public JsonElement serialize(TwillRunResources src, Type typeOfSrc, JsonSerializationContext context) {
- JsonObject json = new JsonObject();
-
- json.addProperty("containerId", src.getContainerId());
- json.addProperty("instanceId", src.getInstanceId());
- json.addProperty("host", src.getHost());
- json.addProperty("memoryMB", src.getMemoryMB());
- json.addProperty("virtualCores", src.getVirtualCores());
-
- return json;
- }
-
- @Override
- public TwillRunResources deserialize(JsonElement json, Type typeOfT,
- JsonDeserializationContext context) throws JsonParseException {
- JsonObject jsonObj = json.getAsJsonObject();
- return new DefaultTwillRunResources(jsonObj.get("instanceId").getAsInt(),
- jsonObj.get("containerId").getAsString(),
- jsonObj.get("virtualCores").getAsInt(),
- jsonObj.get("memoryMB").getAsInt(),
- jsonObj.get("host").getAsString());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/core/src/main/java/org/apache/twill/internal/json/TwillRunnableSpecificationCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/json/TwillRunnableSpecificationCodec.java b/core/src/main/java/org/apache/twill/internal/json/TwillRunnableSpecificationCodec.java
deleted file mode 100644
index f37c1e8..0000000
--- a/core/src/main/java/org/apache/twill/internal/json/TwillRunnableSpecificationCodec.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.json;
-
-import org.apache.twill.api.TwillRunnableSpecification;
-import org.apache.twill.internal.DefaultTwillRunnableSpecification;
-import com.google.common.reflect.TypeToken;
-import com.google.gson.JsonDeserializationContext;
-import com.google.gson.JsonDeserializer;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonParseException;
-import com.google.gson.JsonSerializationContext;
-import com.google.gson.JsonSerializer;
-
-import java.lang.reflect.Type;
-import java.util.Map;
-
-/**
- *
- */
-final class TwillRunnableSpecificationCodec implements JsonSerializer<TwillRunnableSpecification>,
- JsonDeserializer<TwillRunnableSpecification> {
-
- @Override
- public JsonElement serialize(TwillRunnableSpecification src, Type typeOfSrc, JsonSerializationContext context) {
- JsonObject json = new JsonObject();
-
- json.addProperty("classname", src.getClassName());
- json.addProperty("name", src.getName());
- json.add("arguments", context.serialize(src.getConfigs(), new TypeToken<Map<String, String>>(){}.getType()));
-
- return json;
- }
-
- @Override
- public TwillRunnableSpecification deserialize(JsonElement json, Type typeOfT,
- JsonDeserializationContext context) throws JsonParseException {
- JsonObject jsonObj = json.getAsJsonObject();
-
- String className = jsonObj.get("classname").getAsString();
- String name = jsonObj.get("name").getAsString();
- Map<String, String> arguments = context.deserialize(jsonObj.get("arguments"),
- new TypeToken<Map<String, String>>(){}.getType());
-
- return new DefaultTwillRunnableSpecification(className, name, arguments);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/core/src/main/java/org/apache/twill/internal/json/TwillSpecificationAdapter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/json/TwillSpecificationAdapter.java b/core/src/main/java/org/apache/twill/internal/json/TwillSpecificationAdapter.java
deleted file mode 100644
index 67c15a2..0000000
--- a/core/src/main/java/org/apache/twill/internal/json/TwillSpecificationAdapter.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.json;
-
-import com.google.common.base.Charsets;
-import com.google.common.collect.Maps;
-import com.google.common.io.Files;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-import com.google.gson.TypeAdapter;
-import com.google.gson.TypeAdapterFactory;
-import com.google.gson.reflect.TypeToken;
-import com.google.gson.stream.JsonReader;
-import com.google.gson.stream.JsonToken;
-import com.google.gson.stream.JsonWriter;
-import org.apache.twill.api.EventHandlerSpecification;
-import org.apache.twill.api.LocalFile;
-import org.apache.twill.api.ResourceSpecification;
-import org.apache.twill.api.RuntimeSpecification;
-import org.apache.twill.api.TwillRunnableSpecification;
-import org.apache.twill.api.TwillSpecification;
-import org.apache.twill.internal.json.TwillSpecificationCodec.EventHandlerSpecificationCoder;
-import org.apache.twill.internal.json.TwillSpecificationCodec.TwillSpecificationOrderCoder;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.Reader;
-import java.io.Writer;
-import java.lang.reflect.ParameterizedType;
-import java.lang.reflect.Type;
-import java.util.Map;
-
-/**
- *
- */
-public final class TwillSpecificationAdapter {
-
- private final Gson gson;
-
- public static TwillSpecificationAdapter create() {
- return new TwillSpecificationAdapter();
- }
-
- private TwillSpecificationAdapter() {
- gson = new GsonBuilder()
- .serializeNulls()
- .registerTypeAdapter(TwillSpecification.class, new TwillSpecificationCodec())
- .registerTypeAdapter(TwillSpecification.Order.class, new TwillSpecificationOrderCoder())
- .registerTypeAdapter(EventHandlerSpecification.class, new EventHandlerSpecificationCoder())
- .registerTypeAdapter(RuntimeSpecification.class, new RuntimeSpecificationCodec())
- .registerTypeAdapter(TwillRunnableSpecification.class, new TwillRunnableSpecificationCodec())
- .registerTypeAdapter(ResourceSpecification.class, new ResourceSpecificationCodec())
- .registerTypeAdapter(LocalFile.class, new LocalFileCodec())
- .registerTypeAdapterFactory(new TwillSpecificationTypeAdapterFactory())
- .create();
- }
-
- public String toJson(TwillSpecification spec) {
- return gson.toJson(spec, TwillSpecification.class);
- }
-
- public void toJson(TwillSpecification spec, Writer writer) {
- gson.toJson(spec, TwillSpecification.class, writer);
- }
-
- public void toJson(TwillSpecification spec, File file) throws IOException {
- Writer writer = Files.newWriter(file, Charsets.UTF_8);
- try {
- toJson(spec, writer);
- } finally {
- writer.close();
- }
- }
-
- public TwillSpecification fromJson(String json) {
- return gson.fromJson(json, TwillSpecification.class);
- }
-
- public TwillSpecification fromJson(Reader reader) {
- return gson.fromJson(reader, TwillSpecification.class);
- }
-
- public TwillSpecification fromJson(File file) throws IOException {
- Reader reader = Files.newReader(file, Charsets.UTF_8);
- try {
- return fromJson(reader);
- } finally {
- reader.close();
- }
- }
-
- // This is to get around gson ignoring of inner class
- private static final class TwillSpecificationTypeAdapterFactory implements TypeAdapterFactory {
-
- @Override
- public <T> TypeAdapter<T> create(Gson gson, TypeToken<T> type) {
- Class<?> rawType = type.getRawType();
- if (!Map.class.isAssignableFrom(rawType)) {
- return null;
- }
- Type[] typeArgs = ((ParameterizedType) type.getType()).getActualTypeArguments();
- TypeToken<?> keyType = TypeToken.get(typeArgs[0]);
- TypeToken<?> valueType = TypeToken.get(typeArgs[1]);
- if (keyType.getRawType() != String.class) {
- return null;
- }
- return (TypeAdapter<T>) mapAdapter(gson, valueType);
- }
-
- private <V> TypeAdapter<Map<String, V>> mapAdapter(Gson gson, TypeToken<V> valueType) {
- final TypeAdapter<V> valueAdapter = gson.getAdapter(valueType);
-
- return new TypeAdapter<Map<String, V>>() {
- @Override
- public void write(JsonWriter writer, Map<String, V> map) throws IOException {
- if (map == null) {
- writer.nullValue();
- return;
- }
- writer.beginObject();
- for (Map.Entry<String, V> entry : map.entrySet()) {
- writer.name(entry.getKey());
- valueAdapter.write(writer, entry.getValue());
- }
- writer.endObject();
- }
-
- @Override
- public Map<String, V> read(JsonReader reader) throws IOException {
- if (reader.peek() == JsonToken.NULL) {
- reader.nextNull();
- return null;
- }
- if (reader.peek() != JsonToken.BEGIN_OBJECT) {
- return null;
- }
- Map<String, V> map = Maps.newHashMap();
- reader.beginObject();
- while (reader.peek() != JsonToken.END_OBJECT) {
- map.put(reader.nextName(), valueAdapter.read(reader));
- }
- reader.endObject();
- return map;
- }
- };
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/core/src/main/java/org/apache/twill/internal/json/TwillSpecificationCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/json/TwillSpecificationCodec.java b/core/src/main/java/org/apache/twill/internal/json/TwillSpecificationCodec.java
deleted file mode 100644
index 5d88350..0000000
--- a/core/src/main/java/org/apache/twill/internal/json/TwillSpecificationCodec.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.json;
-
-import org.apache.twill.api.EventHandlerSpecification;
-import org.apache.twill.api.RuntimeSpecification;
-import org.apache.twill.api.TwillSpecification;
-import org.apache.twill.internal.DefaultEventHandlerSpecification;
-import org.apache.twill.internal.DefaultTwillSpecification;
-import com.google.common.reflect.TypeToken;
-import com.google.gson.JsonDeserializationContext;
-import com.google.gson.JsonDeserializer;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonParseException;
-import com.google.gson.JsonSerializationContext;
-import com.google.gson.JsonSerializer;
-
-import java.lang.reflect.Type;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-/**
- * An implementation of gson serializer/deserializer {@link org.apache.twill.api.TwillSpecification}.
- */
-final class TwillSpecificationCodec implements JsonSerializer<TwillSpecification>,
- JsonDeserializer<TwillSpecification> {
-
- @Override
- public JsonElement serialize(TwillSpecification src, Type typeOfSrc, JsonSerializationContext context) {
- JsonObject json = new JsonObject();
- json.addProperty("name", src.getName());
- json.add("runnables", context.serialize(src.getRunnables(),
- new TypeToken<Map<String, RuntimeSpecification>>(){}.getType()));
- json.add("orders", context.serialize(src.getOrders(),
- new TypeToken<List<TwillSpecification.Order>>(){}.getType()));
- EventHandlerSpecification eventHandler = src.getEventHandler();
- if (eventHandler != null) {
- json.add("handler", context.serialize(eventHandler, EventHandlerSpecification.class));
- }
-
- return json;
- }
-
- @Override
- public TwillSpecification deserialize(JsonElement json, Type typeOfT,
- JsonDeserializationContext context) throws JsonParseException {
- JsonObject jsonObj = json.getAsJsonObject();
-
- String name = jsonObj.get("name").getAsString();
- Map<String, RuntimeSpecification> runnables = context.deserialize(
- jsonObj.get("runnables"), new TypeToken<Map<String, RuntimeSpecification>>(){}.getType());
- List<TwillSpecification.Order> orders = context.deserialize(
- jsonObj.get("orders"), new TypeToken<List<TwillSpecification.Order>>(){}.getType());
-
- JsonElement handler = jsonObj.get("handler");
- EventHandlerSpecification eventHandler = null;
- if (handler != null && !handler.isJsonNull()) {
- eventHandler = context.deserialize(handler, EventHandlerSpecification.class);
- }
-
- return new DefaultTwillSpecification(name, runnables, orders, eventHandler);
- }
-
- static final class TwillSpecificationOrderCoder implements JsonSerializer<TwillSpecification.Order>,
- JsonDeserializer<TwillSpecification.Order> {
-
- @Override
- public JsonElement serialize(TwillSpecification.Order src, Type typeOfSrc, JsonSerializationContext context) {
- JsonObject json = new JsonObject();
- json.add("names", context.serialize(src.getNames(), new TypeToken<Set<String>>(){}.getType()));
- json.addProperty("type", src.getType().name());
- return json;
- }
-
- @Override
- public TwillSpecification.Order deserialize(JsonElement json, Type typeOfT,
- JsonDeserializationContext context) throws JsonParseException {
- JsonObject jsonObj = json.getAsJsonObject();
-
- Set<String> names = context.deserialize(jsonObj.get("names"), new TypeToken<Set<String>>(){}.getType());
- TwillSpecification.Order.Type type = TwillSpecification.Order.Type.valueOf(jsonObj.get("type").getAsString());
-
- return new DefaultTwillSpecification.DefaultOrder(names, type);
- }
- }
-
- static final class EventHandlerSpecificationCoder implements JsonSerializer<EventHandlerSpecification>,
- JsonDeserializer<EventHandlerSpecification> {
-
- @Override
- public JsonElement serialize(EventHandlerSpecification src, Type typeOfSrc, JsonSerializationContext context) {
- JsonObject json = new JsonObject();
- json.addProperty("classname", src.getClassName());
- json.add("configs", context.serialize(src.getConfigs(), new TypeToken<Map<String, String>>(){}.getType()));
- return json;
- }
-
- @Override
- public EventHandlerSpecification deserialize(JsonElement json, Type typeOfT,
- JsonDeserializationContext context) throws JsonParseException {
- JsonObject jsonObj = json.getAsJsonObject();
- String className = jsonObj.get("classname").getAsString();
- Map<String, String> configs = context.deserialize(jsonObj.get("configs"),
- new TypeToken<Map<String, String>>() {
- }.getType());
-
- return new DefaultEventHandlerSpecification(className, configs);
- }
- }
-}