You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@twill.apache.org by ch...@apache.org on 2013/12/12 23:00:05 UTC
[23/28] Making maven site works.
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/core/src/main/java/org/apache/twill/internal/logging/KafkaAppender.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/logging/KafkaAppender.java b/core/src/main/java/org/apache/twill/internal/logging/KafkaAppender.java
deleted file mode 100644
index 12818ef..0000000
--- a/core/src/main/java/org/apache/twill/internal/logging/KafkaAppender.java
+++ /dev/null
@@ -1,303 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.logging;
-
-import org.apache.twill.common.Services;
-import org.apache.twill.common.Threads;
-import org.apache.twill.internal.kafka.client.Compression;
-import org.apache.twill.internal.kafka.client.SimpleKafkaClient;
-import org.apache.twill.kafka.client.KafkaClient;
-import org.apache.twill.kafka.client.PreparePublish;
-import org.apache.twill.zookeeper.RetryStrategies;
-import org.apache.twill.zookeeper.ZKClientService;
-import org.apache.twill.zookeeper.ZKClientServices;
-import org.apache.twill.zookeeper.ZKClients;
-import ch.qos.logback.classic.pattern.ClassOfCallerConverter;
-import ch.qos.logback.classic.pattern.FileOfCallerConverter;
-import ch.qos.logback.classic.pattern.LineOfCallerConverter;
-import ch.qos.logback.classic.pattern.MethodOfCallerConverter;
-import ch.qos.logback.classic.spi.ILoggingEvent;
-import ch.qos.logback.classic.spi.IThrowableProxy;
-import ch.qos.logback.classic.spi.StackTraceElementProxy;
-import ch.qos.logback.core.AppenderBase;
-import com.google.common.base.Charsets;
-import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
-import com.google.common.collect.Iterables;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.gson.stream.JsonWriter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.StringWriter;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-
-/**
- *
- */
-public final class KafkaAppender extends AppenderBase<ILoggingEvent> {
-
- private static final Logger LOG = LoggerFactory.getLogger(KafkaAppender.class);
-
- private final LogEventConverter eventConverter;
- private final AtomicReference<PreparePublish> publisher;
- private final Runnable flushTask;
- /**
- * Rough count of how many entries are being buffered. It's just approximate, not exact.
- */
- private final AtomicInteger bufferedSize;
-
- private ZKClientService zkClientService;
- private KafkaClient kafkaClient;
- private String zkConnectStr;
- private String hostname;
- private String topic;
- private Queue<String> buffer;
- private int flushLimit = 20;
- private int flushPeriod = 100;
- private ScheduledExecutorService scheduler;
-
- public KafkaAppender() {
- eventConverter = new LogEventConverter();
- publisher = new AtomicReference<PreparePublish>();
- flushTask = createFlushTask();
- bufferedSize = new AtomicInteger();
- buffer = new ConcurrentLinkedQueue<String>();
- }
-
- /**
- * Sets the zookeeper connection string. Called by slf4j.
- */
- @SuppressWarnings("unused")
- public void setZookeeper(String zkConnectStr) {
- this.zkConnectStr = zkConnectStr;
- }
-
- /**
- * Sets the hostname. Called by slf4j.
- */
- @SuppressWarnings("unused")
- public void setHostname(String hostname) {
- this.hostname = hostname;
- }
-
- /**
- * Sets the topic name for publishing logs. Called by slf4j.
- */
- @SuppressWarnings("unused")
- public void setTopic(String topic) {
- this.topic = topic;
- }
-
- /**
- * Sets the maximum number of cached log entries before performing an force flush. Called by slf4j.
- */
- @SuppressWarnings("unused")
- public void setFlushLimit(int flushLimit) {
- this.flushLimit = flushLimit;
- }
-
- /**
- * Sets the periodic flush time in milliseconds. Called by slf4j.
- */
- @SuppressWarnings("unused")
- public void setFlushPeriod(int flushPeriod) {
- this.flushPeriod = flushPeriod;
- }
-
- @Override
- public void start() {
- Preconditions.checkNotNull(zkConnectStr);
-
- scheduler = Executors.newSingleThreadScheduledExecutor(Threads.createDaemonThreadFactory("kafka-logger"));
-
- zkClientService = ZKClientServices.delegate(
- ZKClients.reWatchOnExpire(
- ZKClients.retryOnFailure(ZKClientService.Builder.of(zkConnectStr).build(),
- RetryStrategies.fixDelay(1, TimeUnit.SECONDS))));
-
- kafkaClient = new SimpleKafkaClient(zkClientService);
- Futures.addCallback(Services.chainStart(zkClientService, kafkaClient), new FutureCallback<Object>() {
- @Override
- public void onSuccess(Object result) {
- LOG.info("Kafka client started: " + zkConnectStr);
- publisher.set(kafkaClient.preparePublish(topic, Compression.SNAPPY));
- scheduler.scheduleWithFixedDelay(flushTask, 0, flushPeriod, TimeUnit.MILLISECONDS);
- }
-
- @Override
- public void onFailure(Throwable t) {
- // Fail to talk to kafka. Other than logging, what can be done?
- LOG.error("Failed to start kafka client.", t);
- }
- });
-
- super.start();
- }
-
- @Override
- public void stop() {
- super.stop();
- scheduler.shutdownNow();
- Futures.getUnchecked(Services.chainStop(kafkaClient, zkClientService));
- }
-
- public void forceFlush() {
- try {
- publishLogs().get(2, TimeUnit.SECONDS);
- } catch (Exception e) {
- LOG.error("Failed to publish last batch of log.", e);
- }
- }
-
- @Override
- protected void append(ILoggingEvent eventObject) {
- buffer.offer(eventConverter.convert(eventObject));
- if (bufferedSize.incrementAndGet() >= flushLimit && publisher.get() != null) {
- // Try to do a extra flush
- scheduler.submit(flushTask);
- }
- }
-
- private ListenableFuture<Integer> publishLogs() {
- // If the publisher is not available, simply returns a completed future.
- PreparePublish publisher = KafkaAppender.this.publisher.get();
- if (publisher == null) {
- return Futures.immediateFuture(0);
- }
-
- int count = 0;
- for (String json : Iterables.consumingIterable(buffer)) {
- publisher.add(Charsets.UTF_8.encode(json), 0);
- count++;
- }
- // Nothing to publish, simply returns a completed future.
- if (count == 0) {
- return Futures.immediateFuture(0);
- }
-
- bufferedSize.set(0);
- final int finalCount = count;
- return Futures.transform(publisher.publish(), new Function<Object, Integer>() {
- @Override
- public Integer apply(Object input) {
- return finalCount;
- }
- });
- }
-
- /**
- * Creates a {@link Runnable} that writes all logs in the buffer into kafka.
- * @return The Runnable task
- */
- private Runnable createFlushTask() {
- return new Runnable() {
- @Override
- public void run() {
- Futures.addCallback(publishLogs(), new FutureCallback<Integer>() {
- @Override
- public void onSuccess(Integer result) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Log entries published, size=" + result);
- }
- }
-
- @Override
- public void onFailure(Throwable t) {
- LOG.error("Failed to push logs to kafka. Log entries dropped.", t);
- }
- });
- }
- };
- }
-
- /**
- * Helper class to convert {@link ILoggingEvent} into json string.
- */
- private final class LogEventConverter {
-
- private final ClassOfCallerConverter classNameConverter = new ClassOfCallerConverter();
- private final MethodOfCallerConverter methodConverter = new MethodOfCallerConverter();
- private final FileOfCallerConverter fileConverter = new FileOfCallerConverter();
- private final LineOfCallerConverter lineConverter = new LineOfCallerConverter();
-
- private String convert(ILoggingEvent event) {
- StringWriter result = new StringWriter();
- JsonWriter writer = new JsonWriter(result);
-
- try {
- try {
- writer.beginObject();
- writer.name("name").value(event.getLoggerName());
- writer.name("host").value(hostname);
- writer.name("timestamp").value(Long.toString(event.getTimeStamp()));
- writer.name("level").value(event.getLevel().toString());
- writer.name("className").value(classNameConverter.convert(event));
- writer.name("method").value(methodConverter.convert(event));
- writer.name("file").value(fileConverter.convert(event));
- writer.name("line").value(lineConverter.convert(event));
- writer.name("thread").value(event.getThreadName());
- writer.name("message").value(event.getFormattedMessage());
- writer.name("stackTraces");
- encodeStackTraces(event.getThrowableProxy(), writer);
-
- writer.endObject();
- } finally {
- writer.close();
- }
- } catch (IOException e) {
- throw Throwables.propagate(e);
- }
-
- return result.toString();
- }
-
- private void encodeStackTraces(IThrowableProxy throwable, JsonWriter writer) throws IOException {
- writer.beginArray();
- try {
- if (throwable == null) {
- return;
- }
-
- for (StackTraceElementProxy stackTrace : throwable.getStackTraceElementProxyArray()) {
- writer.beginObject();
-
- StackTraceElement element = stackTrace.getStackTraceElement();
- writer.name("className").value(element.getClassName());
- writer.name("method").value(element.getMethodName());
- writer.name("file").value(element.getFileName());
- writer.name("line").value(element.getLineNumber());
-
- writer.endObject();
- }
- } finally {
- writer.endArray();
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/core/src/main/java/org/apache/twill/internal/logging/KafkaTwillRunnable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/logging/KafkaTwillRunnable.java b/core/src/main/java/org/apache/twill/internal/logging/KafkaTwillRunnable.java
deleted file mode 100644
index c1695de..0000000
--- a/core/src/main/java/org/apache/twill/internal/logging/KafkaTwillRunnable.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.logging;
-
-import org.apache.twill.api.Command;
-import org.apache.twill.api.TwillContext;
-import org.apache.twill.api.TwillRunnable;
-import org.apache.twill.api.TwillRunnableSpecification;
-import org.apache.twill.internal.EnvKeys;
-import org.apache.twill.internal.kafka.EmbeddedKafkaServer;
-import org.apache.twill.internal.utils.Networks;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
-import com.google.common.collect.ImmutableMap;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.CountDownLatch;
-
-/**
- * A {@link org.apache.twill.api.TwillRunnable} for managing Kafka server.
- */
-public final class KafkaTwillRunnable implements TwillRunnable {
-
- private static final Logger LOG = LoggerFactory.getLogger(KafkaTwillRunnable.class);
-
- private final String kafkaDir;
- private EmbeddedKafkaServer server;
- private CountDownLatch stopLatch;
-
- public KafkaTwillRunnable(String kafkaDir) {
- this.kafkaDir = kafkaDir;
- }
-
- @Override
- public TwillRunnableSpecification configure() {
- return TwillRunnableSpecification.Builder.with()
- .setName("kafka")
- .withConfigs(ImmutableMap.of("kafkaDir", kafkaDir))
- .build();
- }
-
- @Override
- public void initialize(TwillContext context) {
- Map<String, String> args = context.getSpecification().getConfigs();
- String zkConnectStr = System.getenv(EnvKeys.TWILL_LOG_KAFKA_ZK);
- stopLatch = new CountDownLatch(1);
-
- try {
- server = new EmbeddedKafkaServer(new File(args.get("kafkaDir")), generateKafkaConfig(zkConnectStr));
- server.startAndWait();
- } catch (Exception e) {
- throw Throwables.propagate(e);
- }
- }
-
- @Override
- public void handleCommand(Command command) throws Exception {
- }
-
- @Override
- public void stop() {
- stopLatch.countDown();
- }
-
- @Override
- public void destroy() {
- server.stopAndWait();
- }
-
- @Override
- public void run() {
- try {
- stopLatch.await();
- } catch (InterruptedException e) {
- LOG.info("Running thread interrupted, shutting down kafka server.", e);
- }
- }
-
- private Properties generateKafkaConfig(String zkConnectStr) {
- int port = Networks.getRandomPort();
- Preconditions.checkState(port > 0, "Failed to get random port.");
-
- Properties prop = new Properties();
- prop.setProperty("log.dir", new File("kafka-logs").getAbsolutePath());
- prop.setProperty("zk.connect", zkConnectStr);
- prop.setProperty("num.threads", "8");
- prop.setProperty("port", Integer.toString(port));
- prop.setProperty("log.flush.interval", "10000");
- prop.setProperty("max.socket.request.bytes", "104857600");
- prop.setProperty("log.cleanup.interval.mins", "1");
- prop.setProperty("log.default.flush.scheduler.interval.ms", "1000");
- prop.setProperty("zk.connectiontimeout.ms", "1000000");
- prop.setProperty("socket.receive.buffer", "1048576");
- prop.setProperty("enable.zookeeper", "true");
- prop.setProperty("log.retention.hours", "168");
- prop.setProperty("brokerid", "0");
- prop.setProperty("socket.send.buffer", "1048576");
- prop.setProperty("num.partitions", "1");
- prop.setProperty("log.file.size", "536870912");
- prop.setProperty("log.default.flush.interval.ms", "1000");
- return prop;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/core/src/main/java/org/apache/twill/internal/logging/LogEntryDecoder.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/logging/LogEntryDecoder.java b/core/src/main/java/org/apache/twill/internal/logging/LogEntryDecoder.java
deleted file mode 100644
index dc11666..0000000
--- a/core/src/main/java/org/apache/twill/internal/logging/LogEntryDecoder.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.logging;
-
-import org.apache.twill.api.logging.LogEntry;
-import org.apache.twill.internal.json.JsonUtils;
-import com.google.gson.JsonDeserializationContext;
-import com.google.gson.JsonDeserializer;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonParseException;
-
-import java.lang.reflect.Type;
-
-/**
- * A {@link com.google.gson.Gson} decoder for {@link LogEntry}.
- */
-public final class LogEntryDecoder implements JsonDeserializer<LogEntry> {
-
- @Override
- public LogEntry deserialize(JsonElement json, Type typeOfT,
- JsonDeserializationContext context) throws JsonParseException {
- if (!json.isJsonObject()) {
- return null;
- }
- JsonObject jsonObj = json.getAsJsonObject();
-
- final String name = JsonUtils.getAsString(jsonObj, "name");
- final String host = JsonUtils.getAsString(jsonObj, "host");
- final long timestamp = JsonUtils.getAsLong(jsonObj, "timestamp", 0);
- LogEntry.Level l;
- try {
- l = LogEntry.Level.valueOf(JsonUtils.getAsString(jsonObj, "level"));
- } catch (Exception e) {
- l = LogEntry.Level.FATAL;
- }
- final LogEntry.Level logLevel = l;
- final String className = JsonUtils.getAsString(jsonObj, "className");
- final String method = JsonUtils.getAsString(jsonObj, "method");
- final String file = JsonUtils.getAsString(jsonObj, "file");
- final String line = JsonUtils.getAsString(jsonObj, "line");
- final String thread = JsonUtils.getAsString(jsonObj, "thread");
- final String message = JsonUtils.getAsString(jsonObj, "message");
-
- final StackTraceElement[] stackTraces = context.deserialize(jsonObj.get("stackTraces").getAsJsonArray(),
- StackTraceElement[].class);
-
- return new LogEntry() {
- @Override
- public String getLoggerName() {
- return name;
- }
-
- @Override
- public String getHost() {
- return host;
- }
-
- @Override
- public long getTimestamp() {
- return timestamp;
- }
-
- @Override
- public Level getLogLevel() {
- return logLevel;
- }
-
- @Override
- public String getSourceClassName() {
- return className;
- }
-
- @Override
- public String getSourceMethodName() {
- return method;
- }
-
- @Override
- public String getFileName() {
- return file;
- }
-
- @Override
- public int getLineNumber() {
- if (line.equals("?")) {
- return -1;
- } else {
- return Integer.parseInt(line);
- }
- }
-
- @Override
- public String getThreadName() {
- return thread;
- }
-
- @Override
- public String getMessage() {
- return message;
- }
-
- @Override
- public StackTraceElement[] getStackTraces() {
- return stackTraces;
- }
- };
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/core/src/main/java/org/apache/twill/internal/logging/Loggings.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/logging/Loggings.java b/core/src/main/java/org/apache/twill/internal/logging/Loggings.java
deleted file mode 100644
index 9baed63..0000000
--- a/core/src/main/java/org/apache/twill/internal/logging/Loggings.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.logging;
-
-import ch.qos.logback.classic.Logger;
-import ch.qos.logback.classic.LoggerContext;
-import ch.qos.logback.classic.spi.ILoggingEvent;
-import ch.qos.logback.core.Appender;
-import org.slf4j.ILoggerFactory;
-import org.slf4j.LoggerFactory;
-
-/**
- *
- */
-public final class Loggings {
-
- public static void forceFlush() {
- ILoggerFactory loggerFactory = LoggerFactory.getILoggerFactory();
-
- if (loggerFactory instanceof LoggerContext) {
- Appender<ILoggingEvent> appender = ((LoggerContext) loggerFactory).getLogger(Logger.ROOT_LOGGER_NAME)
- .getAppender("KAFKA");
- if (appender != null && appender instanceof KafkaAppender) {
- ((KafkaAppender) appender).forceFlush();
- }
- }
- }
-
- private Loggings() {
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/core/src/main/java/org/apache/twill/internal/package-info.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/package-info.java b/core/src/main/java/org/apache/twill/internal/package-info.java
deleted file mode 100644
index a8459e0..0000000
--- a/core/src/main/java/org/apache/twill/internal/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * This package provides internal classes for Twill.
- */
-package org.apache.twill.internal;
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/core/src/main/java/org/apache/twill/internal/state/Message.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/state/Message.java b/core/src/main/java/org/apache/twill/internal/state/Message.java
deleted file mode 100644
index 6c3e719..0000000
--- a/core/src/main/java/org/apache/twill/internal/state/Message.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.state;
-
-import org.apache.twill.api.Command;
-
-/**
- *
- */
-public interface Message {
-
- /**
- * Type of message.
- */
- enum Type {
- SYSTEM,
- USER
- }
-
- /**
- * Scope of the message.
- */
- enum Scope {
- APPLICATION,
- ALL_RUNNABLE,
- RUNNABLE
- }
-
- Type getType();
-
- Scope getScope();
-
- /**
- * @return the name of the target runnable if scope is {@link Scope#RUNNABLE} or {@code null} otherwise.
- */
- String getRunnableName();
-
- Command getCommand();
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/core/src/main/java/org/apache/twill/internal/state/MessageCallback.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/state/MessageCallback.java b/core/src/main/java/org/apache/twill/internal/state/MessageCallback.java
deleted file mode 100644
index f94eaa3..0000000
--- a/core/src/main/java/org/apache/twill/internal/state/MessageCallback.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.state;
-
-import com.google.common.util.concurrent.ListenableFuture;
-
-/**
- *
- */
-public interface MessageCallback {
-
- /**
- * Called when a message is received.
- * @param message Message being received.
- * @return A {@link ListenableFuture} that would be completed when message processing is completed or failed.
- * The result of the future should be the input message Id if succeeded.
- */
- ListenableFuture<String> onReceived(String messageId, Message message);
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/core/src/main/java/org/apache/twill/internal/state/MessageCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/state/MessageCodec.java b/core/src/main/java/org/apache/twill/internal/state/MessageCodec.java
deleted file mode 100644
index 176f620..0000000
--- a/core/src/main/java/org/apache/twill/internal/state/MessageCodec.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.state;
-
-import org.apache.twill.api.Command;
-import com.google.common.base.Charsets;
-import com.google.common.reflect.TypeToken;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-import com.google.gson.JsonDeserializationContext;
-import com.google.gson.JsonDeserializer;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonParseException;
-import com.google.gson.JsonSerializationContext;
-import com.google.gson.JsonSerializer;
-
-import java.lang.reflect.Type;
-import java.util.Map;
-
-/**
- *
- */
-public final class MessageCodec {
-
- private static final Type OPTIONS_TYPE = new TypeToken<Map<String, String>>() {}.getType();
- private static final Gson GSON = new GsonBuilder()
- .registerTypeAdapter(Message.class, new MessageAdapter())
- .registerTypeAdapter(Command.class, new CommandAdapter())
- .create();
-
- /**
- * Decodes a {@link Message} from the given byte array.
- * @param bytes byte array to be decoded
- * @return Message decoded or {@code null} if fails to decode.
- */
- public static Message decode(byte[] bytes) {
- if (bytes == null) {
- return null;
- }
- String content = new String(bytes, Charsets.UTF_8);
- return GSON.fromJson(content, Message.class);
- }
-
- /**
- * Encodes a {@link Message} into byte array. Revserse of {@link #decode(byte[])} method.
- * @param message Message to be encoded
- * @return byte array representing the encoded message.
- */
- public static byte[] encode(Message message) {
- return GSON.toJson(message, Message.class).getBytes(Charsets.UTF_8);
- }
-
- /**
- * Gson codec for {@link Message} object.
- */
- private static final class MessageAdapter implements JsonSerializer<Message>, JsonDeserializer<Message> {
-
- @Override
- public Message deserialize(JsonElement json, Type typeOfT,
- JsonDeserializationContext context) throws JsonParseException {
- JsonObject jsonObj = json.getAsJsonObject();
-
- Message.Type type = Message.Type.valueOf(jsonObj.get("type").getAsString());
- Message.Scope scope = Message.Scope.valueOf(jsonObj.get("scope").getAsString());
- JsonElement name = jsonObj.get("runnableName");
- String runnableName = (name == null || name.isJsonNull()) ? null : name.getAsString();
- Command command = context.deserialize(jsonObj.get("command"), Command.class);
-
- return new SimpleMessage(type, scope, runnableName, command);
- }
-
- @Override
- public JsonElement serialize(Message message, Type typeOfSrc, JsonSerializationContext context) {
- JsonObject jsonObj = new JsonObject();
- jsonObj.addProperty("type", message.getType().name());
- jsonObj.addProperty("scope", message.getScope().name());
- jsonObj.addProperty("runnableName", message.getRunnableName());
- jsonObj.add("command", context.serialize(message.getCommand(), Command.class));
-
- return jsonObj;
- }
- }
-
- /**
- * Gson codec for {@link Command} object.
- */
- private static final class CommandAdapter implements JsonSerializer<Command>, JsonDeserializer<Command> {
-
- @Override
- public Command deserialize(JsonElement json, Type typeOfT,
- JsonDeserializationContext context) throws JsonParseException {
- JsonObject jsonObj = json.getAsJsonObject();
- return Command.Builder.of(jsonObj.get("command").getAsString())
- .addOptions(context.<Map<String, String>>deserialize(jsonObj.get("options"), OPTIONS_TYPE))
- .build();
- }
-
- @Override
- public JsonElement serialize(Command command, Type typeOfSrc, JsonSerializationContext context) {
- JsonObject jsonObj = new JsonObject();
- jsonObj.addProperty("command", command.getCommand());
- jsonObj.add("options", context.serialize(command.getOptions(), OPTIONS_TYPE));
- return jsonObj;
- }
- }
-
- private MessageCodec() {
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/core/src/main/java/org/apache/twill/internal/state/Messages.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/state/Messages.java b/core/src/main/java/org/apache/twill/internal/state/Messages.java
deleted file mode 100644
index 9783d62..0000000
--- a/core/src/main/java/org/apache/twill/internal/state/Messages.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.state;
-
-import org.apache.twill.api.Command;
-
-/**
- * Factory class for creating instances of {@link Message}.
- */
-public final class Messages {
-
- /**
- * Creates a {@link Message.Type#USER} type {@link Message} that sends the giving {@link Command} to a
- * particular runnable.
- *
- * @param runnableName Name of the runnable.
- * @param command The user command to send.
- * @return A new instance of {@link Message}.
- */
- public static Message createForRunnable(String runnableName, Command command) {
- return new SimpleMessage(Message.Type.USER, Message.Scope.RUNNABLE, runnableName, command);
- }
-
- /**
- * Creates a {@link Message.Type#USER} type {@link Message} that sends the giving {@link Command} to all
- * runnables.
- *
- * @param command The user command to send.
- * @return A new instance of {@link Message}.
- */
- public static Message createForAll(Command command) {
- return new SimpleMessage(Message.Type.USER, Message.Scope.ALL_RUNNABLE, null, command);
- }
-
- private Messages() {
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/core/src/main/java/org/apache/twill/internal/state/SimpleMessage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/state/SimpleMessage.java b/core/src/main/java/org/apache/twill/internal/state/SimpleMessage.java
deleted file mode 100644
index e146e56..0000000
--- a/core/src/main/java/org/apache/twill/internal/state/SimpleMessage.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.state;
-
-import org.apache.twill.api.Command;
-import com.google.common.base.Objects;
-
-/**
- *
- */
-final class SimpleMessage implements Message {
-
- private final Type type;
- private final Scope scope;
- private final String runnableName;
- private final Command command;
-
- SimpleMessage(Type type, Scope scope, String runnableName, Command command) {
- this.type = type;
- this.scope = scope;
- this.runnableName = runnableName;
- this.command = command;
- }
-
- @Override
- public Type getType() {
- return type;
- }
-
- @Override
- public Scope getScope() {
- return scope;
- }
-
- @Override
- public String getRunnableName() {
- return runnableName;
- }
-
- @Override
- public Command getCommand() {
- return command;
- }
-
- @Override
- public String toString() {
- return Objects.toStringHelper(Message.class)
- .add("type", type)
- .add("scope", scope)
- .add("runnable", runnableName)
- .add("command", command)
- .toString();
- }
-
- @Override
- public int hashCode() {
- return Objects.hashCode(type, scope, runnableName, command);
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj == this) {
- return true;
- }
- if (!(obj instanceof Message)) {
- return false;
- }
- Message other = (Message) obj;
- return type == other.getType()
- && scope == other.getScope()
- && Objects.equal(runnableName, other.getRunnableName())
- && Objects.equal(command, other.getCommand());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/core/src/main/java/org/apache/twill/internal/state/StateNode.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/state/StateNode.java b/core/src/main/java/org/apache/twill/internal/state/StateNode.java
deleted file mode 100644
index d66f8a2..0000000
--- a/core/src/main/java/org/apache/twill/internal/state/StateNode.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.state;
-
-import org.apache.twill.api.ServiceController;
-import com.google.common.util.concurrent.Service;
-
-/**
- *
- */
-public final class StateNode {
-
- private final ServiceController.State state;
- private final String errorMessage;
- private final StackTraceElement[] stackTraces;
-
- /**
- * Constructs a StateNode with the given state.
- */
- public StateNode(ServiceController.State state) {
- this(state, null, null);
- }
-
- /**
- * Constructs a StateNode with {@link ServiceController.State#FAILED} caused by the given error.
- */
- public StateNode(Throwable error) {
- this(Service.State.FAILED, error.getMessage(), error.getStackTrace());
- }
-
- /**
- * Constructs a StateNode with the given state, error and stacktraces.
- * This constructor should only be used by the StateNodeCodec.
- */
- public StateNode(ServiceController.State state, String errorMessage, StackTraceElement[] stackTraces) {
- this.state = state;
- this.errorMessage = errorMessage;
- this.stackTraces = stackTraces;
- }
-
- public ServiceController.State getState() {
- return state;
- }
-
- public String getErrorMessage() {
- return errorMessage;
- }
-
- public StackTraceElement[] getStackTraces() {
- return stackTraces;
- }
-
- @Override
- public String toString() {
- StringBuilder builder = new StringBuilder("state=").append(state);
-
- if (errorMessage != null) {
- builder.append("\n").append("error=").append(errorMessage);
- }
- if (stackTraces != null) {
- builder.append("\n");
- for (StackTraceElement stackTrace : stackTraces) {
- builder.append("\tat ").append(stackTrace.toString()).append("\n");
- }
- }
- return builder.toString();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/core/src/main/java/org/apache/twill/internal/state/SystemMessages.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/state/SystemMessages.java b/core/src/main/java/org/apache/twill/internal/state/SystemMessages.java
deleted file mode 100644
index 9877121..0000000
--- a/core/src/main/java/org/apache/twill/internal/state/SystemMessages.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.state;
-
-import org.apache.twill.api.Command;
-import com.google.common.base.Preconditions;
-
-/**
- * Collection of predefined system messages.
- */
-public final class SystemMessages {
-
- public static final Command STOP_COMMAND = Command.Builder.of("stop").build();
- public static final Message SECURE_STORE_UPDATED = new SimpleMessage(
- Message.Type.SYSTEM, Message.Scope.APPLICATION, null, Command.Builder.of("secureStoreUpdated").build());
-
- public static Message stopApplication() {
- return new SimpleMessage(Message.Type.SYSTEM, Message.Scope.APPLICATION, null, STOP_COMMAND);
- }
-
- public static Message stopRunnable(String runnableName) {
- return new SimpleMessage(Message.Type.SYSTEM, Message.Scope.RUNNABLE, runnableName, STOP_COMMAND);
- }
-
- public static Message setInstances(String runnableName, int instances) {
- Preconditions.checkArgument(instances > 0, "Instances should be > 0.");
- return new SimpleMessage(Message.Type.SYSTEM, Message.Scope.RUNNABLE, runnableName,
- Command.Builder.of("instances").addOption("count", Integer.toString(instances)).build());
- }
-
- private SystemMessages() {
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/core/src/main/java/org/apache/twill/internal/utils/Dependencies.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/utils/Dependencies.java b/core/src/main/java/org/apache/twill/internal/utils/Dependencies.java
deleted file mode 100644
index 015b9f5..0000000
--- a/core/src/main/java/org/apache/twill/internal/utils/Dependencies.java
+++ /dev/null
@@ -1,323 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.utils;
-
-import com.google.common.base.Throwables;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import com.google.common.io.ByteStreams;
-import org.objectweb.asm.AnnotationVisitor;
-import org.objectweb.asm.ClassReader;
-import org.objectweb.asm.ClassVisitor;
-import org.objectweb.asm.FieldVisitor;
-import org.objectweb.asm.Label;
-import org.objectweb.asm.MethodVisitor;
-import org.objectweb.asm.Opcodes;
-import org.objectweb.asm.Type;
-import org.objectweb.asm.signature.SignatureReader;
-import org.objectweb.asm.signature.SignatureVisitor;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.MalformedURLException;
-import java.net.URI;
-import java.net.URL;
-import java.util.Queue;
-import java.util.Set;
-
-/**
- * Utility class to help find out class dependencies.
- */
-public final class Dependencies {
-
- /**
- * Represents a callback for accepting a class during dependency traversal.
- */
- public interface ClassAcceptor {
- /**
- * Invoked when a class is being found as a dependency.
- *
- * @param className Name of the class.
- * @param classUrl URL for the class resource.
- * @param classPathUrl URL for the class path resource that contains the class resource.
- * If the URL protocol is {@code file}, it would be the path to root package.
- * If the URL protocol is {@code jar}, it would be the jar file.
- * @return true keep finding dependencies on the given class.
- */
- boolean accept(String className, URL classUrl, URL classPathUrl);
- }
-
- public static void findClassDependencies(ClassLoader classLoader,
- ClassAcceptor acceptor,
- String...classesToResolve) throws IOException {
- findClassDependencies(classLoader, acceptor, ImmutableList.copyOf(classesToResolve));
- }
-
- /**
- * Finds the class dependencies of the given class.
- * @param classLoader ClassLoader for finding class bytecode.
- * @param acceptor Predicate to accept a found class and its bytecode.
- * @param classesToResolve Classes for looking for dependencies.
- * @throws IOException Thrown where there is error when loading in class bytecode.
- */
- public static void findClassDependencies(ClassLoader classLoader,
- ClassAcceptor acceptor,
- Iterable<String> classesToResolve) throws IOException {
-
- final Set<String> seenClasses = Sets.newHashSet(classesToResolve);
- final Queue<String> classes = Lists.newLinkedList(classesToResolve);
-
- // Breadth-first-search classes dependencies.
- while (!classes.isEmpty()) {
- String className = classes.remove();
- URL classUrl = getClassURL(className, classLoader);
- if (classUrl == null) {
- continue;
- }
-
- // Call the accept to see if it accept the current class.
- if (!acceptor.accept(className, classUrl, getClassPathURL(className, classUrl))) {
- continue;
- }
-
- InputStream is = classUrl.openStream();
- try {
- // Visit the bytecode to lookup classes that the visiting class is depended on.
- new ClassReader(ByteStreams.toByteArray(is)).accept(new DependencyClassVisitor(new DependencyAcceptor() {
- @Override
- public void accept(String className) {
- // See if the class is accepted
- if (seenClasses.add(className)) {
- classes.add(className);
- }
- }
- }), ClassReader.SKIP_DEBUG + ClassReader.SKIP_FRAMES);
- } finally {
- is.close();
- }
- }
- }
-
- /**
- * Returns the URL for loading the class bytecode of the given class, or null if it is not found or if it is
- * a system class.
- */
- private static URL getClassURL(String className, ClassLoader classLoader) {
- String resourceName = className.replace('.', '/') + ".class";
- return classLoader.getResource(resourceName);
- }
-
- private static URL getClassPathURL(String className, URL classUrl) {
- try {
- if ("file".equals(classUrl.getProtocol())) {
- String path = classUrl.getFile();
- // Compute the directory container the class.
- int endIdx = path.length() - className.length() - ".class".length();
- if (endIdx > 1) {
- // If it is not the root directory, return the end index to remove the trailing '/'.
- endIdx--;
- }
- return new URL("file", "", -1, path.substring(0, endIdx));
- }
- if ("jar".equals(classUrl.getProtocol())) {
- String path = classUrl.getFile();
- return URI.create(path.substring(0, path.indexOf("!/"))).toURL();
- }
- } catch (MalformedURLException e) {
- throw Throwables.propagate(e);
- }
- throw new IllegalStateException("Unsupported class URL: " + classUrl);
- }
-
- /**
- * A private interface for accepting a dependent class that is found during bytecode inspection.
- */
- private interface DependencyAcceptor {
- void accept(String className);
- }
-
- /**
- * ASM ClassVisitor for extracting classes dependencies.
- */
- private static final class DependencyClassVisitor extends ClassVisitor {
-
- private final SignatureVisitor signatureVisitor;
- private final DependencyAcceptor acceptor;
-
- public DependencyClassVisitor(DependencyAcceptor acceptor) {
- super(Opcodes.ASM4);
- this.acceptor = acceptor;
- this.signatureVisitor = new SignatureVisitor(Opcodes.ASM4) {
- private String currentClass;
-
- @Override
- public void visitClassType(String name) {
- currentClass = name;
- addClass(name);
- }
-
- @Override
- public void visitInnerClassType(String name) {
- addClass(currentClass + "$" + name);
- }
- };
- }
-
- @Override
- public void visit(int version, int access, String name, String signature, String superName, String[] interfaces) {
- addClass(name);
-
- if (signature != null) {
- new SignatureReader(signature).accept(signatureVisitor);
- } else {
- addClass(superName);
- addClasses(interfaces);
- }
- }
-
- @Override
- public void visitOuterClass(String owner, String name, String desc) {
- addClass(owner);
- }
-
- @Override
- public AnnotationVisitor visitAnnotation(String desc, boolean visible) {
- addType(Type.getType(desc));
- return null;
- }
-
- @Override
- public void visitInnerClass(String name, String outerName, String innerName, int access) {
- addClass(name);
- }
-
- @Override
- public FieldVisitor visitField(int access, String name, String desc, String signature, Object value) {
- if (signature != null) {
- new SignatureReader(signature).acceptType(signatureVisitor);
- } else {
- addType(Type.getType(desc));
- }
-
- return new FieldVisitor(Opcodes.ASM4) {
- @Override
- public AnnotationVisitor visitAnnotation(String desc, boolean visible) {
- addType(Type.getType(desc));
- return null;
- }
- };
- }
-
- @Override
- public MethodVisitor visitMethod(int access, String name, String desc, String signature, String[] exceptions) {
- if (signature != null) {
- new SignatureReader(signature).accept(signatureVisitor);
- } else {
- addMethod(desc);
- }
- addClasses(exceptions);
-
- return new MethodVisitor(Opcodes.ASM4) {
- @Override
- public AnnotationVisitor visitAnnotation(String desc, boolean visible) {
- addType(Type.getType(desc));
- return null;
- }
-
- @Override
- public AnnotationVisitor visitParameterAnnotation(int parameter, String desc, boolean visible) {
- addType(Type.getType(desc));
- return null;
- }
-
- @Override
- public void visitTypeInsn(int opcode, String type) {
- addType(Type.getObjectType(type));
- }
-
- @Override
- public void visitFieldInsn(int opcode, String owner, String name, String desc) {
- addType(Type.getObjectType(owner));
- addType(Type.getType(desc));
- }
-
- @Override
- public void visitMethodInsn(int opcode, String owner, String name, String desc) {
- addType(Type.getObjectType(owner));
- addMethod(desc);
- }
-
- @Override
- public void visitLdcInsn(Object cst) {
- if (cst instanceof Type) {
- addType((Type) cst);
- }
- }
-
- @Override
- public void visitMultiANewArrayInsn(String desc, int dims) {
- addType(Type.getType(desc));
- }
-
- @Override
- public void visitLocalVariable(String name, String desc, String signature, Label start, Label end, int index) {
- if (signature != null) {
- new SignatureReader(signature).acceptType(signatureVisitor);
- } else {
- addType(Type.getType(desc));
- }
- }
- };
- }
-
- private void addClass(String internalName) {
- if (internalName == null || internalName.startsWith("java/")) {
- return;
- }
- acceptor.accept(Type.getObjectType(internalName).getClassName());
- }
-
- private void addClasses(String[] classes) {
- if (classes != null) {
- for (String clz : classes) {
- addClass(clz);
- }
- }
- }
-
- private void addType(Type type) {
- if (type.getSort() == Type.ARRAY) {
- type = type.getElementType();
- }
- if (type.getSort() == Type.OBJECT) {
- addClass(type.getInternalName());
- }
- }
-
- private void addMethod(String desc) {
- addType(Type.getReturnType(desc));
- for (Type type : Type.getArgumentTypes(desc)) {
- addType(type);
- }
- }
- }
-
- private Dependencies() {
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/core/src/main/java/org/apache/twill/internal/utils/Instances.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/utils/Instances.java b/core/src/main/java/org/apache/twill/internal/utils/Instances.java
deleted file mode 100644
index 28bfce9..0000000
--- a/core/src/main/java/org/apache/twill/internal/utils/Instances.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.utils;
-
-import com.google.common.base.Defaults;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
-import com.google.common.reflect.TypeToken;
-
-import java.lang.reflect.Constructor;
-import java.lang.reflect.Field;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.lang.reflect.Modifier;
-
-/**
- * Utility class to help instantiate object instance from class.
- */
-public final class Instances {
-
- private static final Object UNSAFE;
- private static final Method UNSAFE_NEW_INSTANCE;
-
- static {
- Object unsafe;
- Method newInstance;
- try {
- Class<?> clz = Class.forName("sun.misc.Unsafe");
- Field f = clz.getDeclaredField("theUnsafe");
- f.setAccessible(true);
- unsafe = f.get(null);
-
- newInstance = clz.getMethod("allocateInstance", Class.class);
- } catch (Exception e) {
- unsafe = null;
- newInstance = null;
- }
- UNSAFE = unsafe;
- UNSAFE_NEW_INSTANCE = newInstance;
- }
-
- /**
- * Creates a new instance of the given class. It will use the default constructor if it is presents.
- * Otherwise it will try to use {@link sun.misc.Unsafe#allocateInstance(Class)} to create the instance.
- * @param clz Class of object to be instantiated.
- * @param <T> Type of the class
- * @return An instance of type {@code <T>}
- */
- @SuppressWarnings("unchecked")
- public static <T> T newInstance(Class<T> clz) {
- try {
- try {
- Constructor<T> cons = clz.getDeclaredConstructor();
- if (!cons.isAccessible()) {
- cons.setAccessible(true);
- }
- return cons.newInstance();
- } catch (Exception e) {
- // Try to use Unsafe
- Preconditions.checkState(UNSAFE != null, "Fail to instantiate with Unsafe.");
- return unsafeCreate(clz);
- }
- } catch (Exception e) {
- throw Throwables.propagate(e);
- }
- }
-
-
- /**
- * Creates an instance of the given using Unsafe. It also initialize all fields into default values.
- */
- private static <T> T unsafeCreate(Class<T> clz) throws InvocationTargetException, IllegalAccessException {
- T instance = (T) UNSAFE_NEW_INSTANCE.invoke(UNSAFE, clz);
-
- for (TypeToken<?> type : TypeToken.of(clz).getTypes().classes()) {
- if (Object.class.equals(type.getRawType())) {
- break;
- }
- for (Field field : type.getRawType().getDeclaredFields()) {
- if (Modifier.isStatic(field.getModifiers())) {
- continue;
- }
- if (!field.isAccessible()) {
- field.setAccessible(true);
- }
- field.set(instance, Defaults.defaultValue(field.getType()));
- }
- }
-
- return instance;
- }
-
-
- private Instances() {
- // Protect instantiation of this class
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/core/src/main/java/org/apache/twill/internal/utils/Networks.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/utils/Networks.java b/core/src/main/java/org/apache/twill/internal/utils/Networks.java
deleted file mode 100644
index 8e7d736..0000000
--- a/core/src/main/java/org/apache/twill/internal/utils/Networks.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.utils;
-
-import java.io.IOException;
-import java.net.ServerSocket;
-
-/**
- *
- */
-public final class Networks {
-
- /**
- * Find a random free port in localhost for binding.
- * @return A port number or -1 for failure.
- */
- public static int getRandomPort() {
- try {
- ServerSocket socket = new ServerSocket(0);
- try {
- return socket.getLocalPort();
- } finally {
- socket.close();
- }
- } catch (IOException e) {
- return -1;
- }
- }
-
- private Networks() {
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/core/src/main/java/org/apache/twill/internal/utils/Paths.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/utils/Paths.java b/core/src/main/java/org/apache/twill/internal/utils/Paths.java
deleted file mode 100644
index aeee09f..0000000
--- a/core/src/main/java/org/apache/twill/internal/utils/Paths.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.utils;
-
-import com.google.common.io.Files;
-
-/**
- *
- */
-public final class Paths {
-
-
- public static String appendSuffix(String extractFrom, String appendTo) {
- String suffix = getExtension(extractFrom);
- if (!suffix.isEmpty()) {
- return appendTo + '.' + suffix;
- }
- return appendTo;
- }
-
- public static String getExtension(String path) {
- if (path.endsWith(".tar.gz")) {
- return "tar.gz";
- }
-
- return Files.getFileExtension(path);
- }
-
- private Paths() {
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/core/src/main/java/org/apache/twill/kafka/client/FetchException.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/kafka/client/FetchException.java b/core/src/main/java/org/apache/twill/kafka/client/FetchException.java
deleted file mode 100644
index acccf04..0000000
--- a/core/src/main/java/org/apache/twill/kafka/client/FetchException.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.kafka.client;
-
-/**
- *
- */
-public final class FetchException extends RuntimeException {
-
- private final ErrorCode errorCode;
-
- public FetchException(String message, ErrorCode errorCode) {
- super(message);
- this.errorCode = errorCode;
- }
-
- public ErrorCode getErrorCode() {
- return errorCode;
- }
-
- @Override
- public String toString() {
- return String.format("%s. Error code: %s", super.toString(), errorCode);
- }
-
- public enum ErrorCode {
- UNKNOWN(-1),
- OK(0),
- OFFSET_OUT_OF_RANGE(1),
- INVALID_MESSAGE(2),
- WRONG_PARTITION(3),
- INVALID_FETCH_SIZE(4);
-
- private final int code;
-
- ErrorCode(int code) {
- this.code = code;
- }
-
- public int getCode() {
- return code;
- }
-
- public static ErrorCode fromCode(int code) {
- switch (code) {
- case -1:
- return UNKNOWN;
- case 0:
- return OK;
- case 1:
- return OFFSET_OUT_OF_RANGE;
- case 2:
- return INVALID_MESSAGE;
- case 3:
- return WRONG_PARTITION;
- case 4:
- return INVALID_FETCH_SIZE;
- }
- throw new IllegalArgumentException("Unknown error code");
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/core/src/main/java/org/apache/twill/kafka/client/FetchedMessage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/kafka/client/FetchedMessage.java b/core/src/main/java/org/apache/twill/kafka/client/FetchedMessage.java
deleted file mode 100644
index 65e140f..0000000
--- a/core/src/main/java/org/apache/twill/kafka/client/FetchedMessage.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.kafka.client;
-
-import java.nio.ByteBuffer;
-
-/**
- * Represents a message fetched from kafka broker.
- */
-public interface FetchedMessage {
-
- /**
- * Returns the message offset.
- */
- long getOffset();
-
- /**
- * Returns the message payload.
- */
- ByteBuffer getBuffer();
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/core/src/main/java/org/apache/twill/kafka/client/KafkaClient.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/kafka/client/KafkaClient.java b/core/src/main/java/org/apache/twill/kafka/client/KafkaClient.java
deleted file mode 100644
index 496195b..0000000
--- a/core/src/main/java/org/apache/twill/kafka/client/KafkaClient.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.kafka.client;
-
-import org.apache.twill.internal.kafka.client.Compression;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.Service;
-
-import java.util.Iterator;
-
-/**
- * This interface provides methods for interacting with kafka broker. It also
- * extends from {@link Service} for lifecycle management. The {@link #start()} method
- * must be called prior to other methods in this class. When instance of this class
- * is not needed, call {@link #stop()}} to release any resources that it holds.
- */
-public interface KafkaClient extends Service {
-
- PreparePublish preparePublish(String topic, Compression compression);
-
- Iterator<FetchedMessage> consume(String topic, int partition, long offset, int maxSize);
-
- /**
- * Fetches offset from the given topic and partition.
- * @param topic Topic to fetch from.
- * @param partition Partition to fetch from.
- * @param time The first offset of every segment file for a given partition with a modified time less than time.
- * {@code -1} for latest offset, {@code -2} for earliest offset.
- * @param maxOffsets Maximum number of offsets to fetch.
- * @return A Future that carry the result as an array of offsets in descending order.
- * The size of the result array would not be larger than maxOffsets. If there is any error during the fetch,
- * the exception will be carried in the exception.
- */
- ListenableFuture<long[]> getOffset(String topic, int partition, long time, int maxOffsets);
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/core/src/main/java/org/apache/twill/kafka/client/PreparePublish.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/kafka/client/PreparePublish.java b/core/src/main/java/org/apache/twill/kafka/client/PreparePublish.java
deleted file mode 100644
index 5db4abb..0000000
--- a/core/src/main/java/org/apache/twill/kafka/client/PreparePublish.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.kafka.client;
-
-import com.google.common.util.concurrent.ListenableFuture;
-
-import java.nio.ByteBuffer;
-
-/**
- * This interface is for preparing to publish a set of messages to kafka.
- */
-public interface PreparePublish {
-
- PreparePublish add(byte[] payload, Object partitionKey);
-
- PreparePublish add(ByteBuffer payload, Object partitionKey);
-
- ListenableFuture<?> publish();
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/core/src/main/java/org/apache/twill/kafka/client/package-info.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/kafka/client/package-info.java b/core/src/main/java/org/apache/twill/kafka/client/package-info.java
deleted file mode 100644
index ea3bf20..0000000
--- a/core/src/main/java/org/apache/twill/kafka/client/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * This package provides a pure java Kafka client interface.
- */
-package org.apache.twill.kafka.client;
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/core/src/main/java/org/apache/twill/launcher/TwillLauncher.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/launcher/TwillLauncher.java b/core/src/main/java/org/apache/twill/launcher/TwillLauncher.java
deleted file mode 100644
index 2c8c1ef..0000000
--- a/core/src/main/java/org/apache/twill/launcher/TwillLauncher.java
+++ /dev/null
@@ -1,236 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.launcher;
-
-import java.io.BufferedOutputStream;
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.OutputStream;
-import java.lang.reflect.Method;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.net.URLClassLoader;
-import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.jar.JarEntry;
-import java.util.jar.JarInputStream;
-
-/**
- * A launcher for application from a archive jar.
- * This class should have no dependencies on any library except the J2SE one.
- * This class should not import any thing except java.*
- */
-public final class TwillLauncher {
-
- private static final int TEMP_DIR_ATTEMPTS = 20;
-
- /**
- * Main method to unpackage a jar and run the mainClass.main() method.
- * @param args args[0] is the path to jar file, args[1] is the class name of the mainClass.
- * The rest of args will be passed the mainClass unmodified.
- */
- public static void main(String[] args) throws Exception {
- if (args.length < 3) {
- System.out.println("Usage: java " + TwillLauncher.class.getName() + " [jarFile] [mainClass] [use_classpath]");
- return;
- }
-
- File file = new File(args[0]);
- final File targetDir = createTempDir("twill.launcher");
-
- Runtime.getRuntime().addShutdownHook(new Thread() {
- @Override
- public void run() {
- System.out.println("Cleanup directory " + targetDir);
- deleteDir(targetDir);
- }
- });
-
- System.out.println("UnJar " + file + " to " + targetDir);
- unJar(file, targetDir);
-
- // Create ClassLoader
- URLClassLoader classLoader = createClassLoader(targetDir, Boolean.parseBoolean(args[2]));
- Thread.currentThread().setContextClassLoader(classLoader);
-
- System.out.println("Launch class with classpath: " + Arrays.toString(classLoader.getURLs()));
-
- Class<?> mainClass = classLoader.loadClass(args[1]);
- Method mainMethod = mainClass.getMethod("main", String[].class);
- String[] arguments = Arrays.copyOfRange(args, 3, args.length);
- System.out.println("Launching main: " + mainMethod + " " + Arrays.toString(arguments));
- mainMethod.invoke(mainClass, new Object[]{arguments});
- System.out.println("Main class completed.");
-
- System.out.println("Launcher completed");
- }
-
- /**
- * This method is copied from Guava Files.createTempDir().
- */
- private static File createTempDir(String prefix) throws IOException {
- File baseDir = new File(System.getProperty("java.io.tmpdir"));
- if (!baseDir.isDirectory() && !baseDir.mkdirs()) {
- throw new IOException("Tmp directory not exists: " + baseDir.getAbsolutePath());
- }
-
- String baseName = prefix + "-" + System.currentTimeMillis() + "-";
-
- for (int counter = 0; counter < TEMP_DIR_ATTEMPTS; counter++) {
- File tempDir = new File(baseDir, baseName + counter);
- if (tempDir.mkdir()) {
- return tempDir;
- }
- }
- throw new IOException("Failed to create directory within "
- + TEMP_DIR_ATTEMPTS + " attempts (tried "
- + baseName + "0 to " + baseName + (TEMP_DIR_ATTEMPTS - 1) + ')');
- }
-
- private static void unJar(File jarFile, File targetDir) throws IOException {
- JarInputStream jarInput = new JarInputStream(new FileInputStream(jarFile));
- try {
- JarEntry jarEntry = jarInput.getNextJarEntry();
- while (jarEntry != null) {
- File target = new File(targetDir, jarEntry.getName());
- if (jarEntry.isDirectory()) {
- target.mkdirs();
- } else {
- target.getParentFile().mkdirs();
- copy(jarInput, target);
- }
- jarEntry = jarInput.getNextJarEntry();
- }
- } finally {
- jarInput.close();
- }
- }
-
- private static void copy(InputStream is, File file) throws IOException {
- byte[] buf = new byte[8192];
- OutputStream os = new BufferedOutputStream(new FileOutputStream(file));
- try {
- int len = is.read(buf);
- while (len != -1) {
- os.write(buf, 0, len);
- len = is.read(buf);
- }
- } finally {
- os.close();
- }
- }
-
- private static URLClassLoader createClassLoader(File dir, boolean useClassPath) {
- try {
- List<URL> urls = new ArrayList<URL>();
- urls.add(dir.toURI().toURL());
- urls.add(new File(dir, "classes").toURI().toURL());
- urls.add(new File(dir, "resources").toURI().toURL());
-
- File libDir = new File(dir, "lib");
- File[] files = libDir.listFiles();
- if (files != null) {
- for (File file : files) {
- if (file.getName().endsWith(".jar")) {
- urls.add(file.toURI().toURL());
- }
- }
- }
-
- if (useClassPath) {
- InputStream is = ClassLoader.getSystemResourceAsStream("classpath");
- if (is != null) {
- try {
- BufferedReader reader = new BufferedReader(new InputStreamReader(is, Charset.forName("UTF-8")));
- String line = reader.readLine();
- if (line != null) {
- for (String path : line.split(":")) {
- urls.addAll(getClassPaths(path));
- }
- }
- } finally {
- is.close();
- }
- }
- }
-
- return new URLClassLoader(urls.toArray(new URL[0]));
-
- } catch (Exception e) {
- throw new IllegalStateException(e);
- }
- }
-
- private static Collection<URL> getClassPaths(String path) throws MalformedURLException {
- String classpath = expand(path);
- if (classpath.endsWith("/*")) {
- // Grab all .jar files
- File dir = new File(classpath.substring(0, classpath.length() - 2));
- File[] files = dir.listFiles();
- if (files == null || files.length == 0) {
- return singleItem(dir.toURI().toURL());
- }
-
- List<URL> result = new ArrayList<URL>(files.length);
- for (File file : files) {
- if (file.getName().endsWith(".jar")) {
- result.add(file.toURI().toURL());
- }
- }
- return result;
- } else {
- return singleItem(new File(classpath).toURI().toURL());
- }
- }
-
- private static Collection<URL> singleItem(URL url) {
- List<URL> result = new ArrayList<URL>(1);
- result.add(url);
- return result;
- }
-
- private static String expand(String value) {
- String result = value;
- for (Map.Entry<String, String> entry : System.getenv().entrySet()) {
- result = result.replace("$" + entry.getKey(), entry.getValue());
- result = result.replace("${" + entry.getKey() + "}", entry.getValue());
- }
- return result;
- }
-
- private static void deleteDir(File dir) {
- File[] files = dir.listFiles();
- if (files == null || files.length == 0) {
- dir.delete();
- return;
- }
- for (File file : files) {
- deleteDir(file);
- }
- dir.delete();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/core/src/main/resources/kafka-0.7.2.tgz
----------------------------------------------------------------------
diff --git a/core/src/main/resources/kafka-0.7.2.tgz b/core/src/main/resources/kafka-0.7.2.tgz
deleted file mode 100644
index 24178d9..0000000
Binary files a/core/src/main/resources/kafka-0.7.2.tgz and /dev/null differ