You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@twill.apache.org by ch...@apache.org on 2014/04/22 08:06:01 UTC

[04/50] [abbrv] [TWILL-27] Upgrade to Kafka-0.8

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/fcbe54d5/twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java b/twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java
new file mode 100644
index 0000000..d53ee98
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.kafka.client;
+
+import org.apache.twill.common.Cancellable;
+
+import java.util.Iterator;
+
+/**
+ * A consumer for consuming (reading) messages published to a Kafka server cluster.
+ */
+public interface KafkaConsumer {
+
+  /**
+   * Callback for receiving new messages.
+   */
+  interface MessageCallback {
+
+    /**
+     * Invoked when new messages is available.
+     * @param messages Iterator of new messages. The {@link FetchedMessage} instance maybe reused in the Iterator
+     *                 and across different invocation.
+     */
+    void onReceived(Iterator<FetchedMessage> messages);
+
+    /**
+     * Invoked when message consumption is stopped. When this method is invoked,
+     * no more {@link #onReceived(java.util.Iterator)} will get triggered.
+     */
+    void finished();
+  }
+
+  /**
+   * A builder for preparing message consumption.
+   */
+  interface Preparer {
+
+    /**
+     * Consumes messages from a given offset. If the given offset is invalid, it'll start consuming from the
+     * latest offset.
+     * @param topic Topic to consume from.
+     * @param partition Partition in the topic to consume from.
+     * @param offset Offset to starts with.
+     * @return This {@link Preparer} instance.
+     */
+    Preparer add(String topic, int partition, long offset);
+
+    /**
+     * Consumes messages from the earliest message available.
+     * @param topic Topic to consume from.
+     * @param partition Partition in the topic to consume from.
+     * @return This {@link Preparer} instance.
+     */
+    Preparer addFromBeginning(String topic, int partition);
+
+    /**
+     * Consumes messages from the latest message.
+     * @param topic Topic to consume from.
+     * @param partition Partition in the topic to consume from.
+     * @return This {@link Preparer} instance.
+     */
+    Preparer addLatest(String topic, int partition);
+
+    /**
+     * Starts the consumption as being configured by this {@link Preparer}.
+     * @param callback The {@link MessageCallback} for receiving new messages.
+     * @return A {@link Cancellable} for cancelling message consumption.
+     */
+    Cancellable consume(MessageCallback callback);
+  }
+
+  /**
+   * Prepares for message consumption.
+   * @return A {@link Preparer} to setup details about message consumption.
+   */
+  Preparer prepare();
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/fcbe54d5/twill-core/src/main/java/org/apache/twill/kafka/client/KafkaPublisher.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/kafka/client/KafkaPublisher.java b/twill-core/src/main/java/org/apache/twill/kafka/client/KafkaPublisher.java
new file mode 100644
index 0000000..bffce97
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/kafka/client/KafkaPublisher.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.kafka.client;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+import java.nio.ByteBuffer;
+
+/**
+ * This interface is for publishing data to Kafka.
+ */
+public interface KafkaPublisher {
+
+  /**
+   * A Preparer for preparing to publish messages to a given topic. An instance of this class could be reused
+   * to send more messages after {@link #send()} is called.
+   */
+  interface Preparer {
+    /**
+     * Adds the given message to the message set, partitioned with the given partition key.
+     * @param message Remaining bytes in the ByteBuffer will be used as message payload. This method would
+     *                consume the ByteBuffer, meaning after this method returns, the remaining bytes in the
+     *                ByteBuffer would be {@code 0}.
+     * @param partitionKey Key for computing the partition Id to publish to. The {@link Object#hashCode()} method
+     *                     will be invoke to compute the id.
+     * @return
+     */
+    Preparer add(ByteBuffer message, Object partitionKey);
+
+    /**
+     * Sends all the messages being added through the {@link #add} method.
+     *
+     * @return A {@link ListenableFuture} that will be completed when the send action is done. If publish is succeeded,
+     *         it returns number of messages published, otherwise the failure reason will be carried in the future.
+     *         The {@link ListenableFuture#cancel(boolean)} method has no effect on the publish action.
+     */
+    ListenableFuture<Integer> send();
+  }
+
+  /**
+   * Represents the desired level of publish acknowledgment.
+   */
+  enum Ack {
+    /**
+     * Not wait for ack.
+     */
+    FIRE_AND_FORGET(0),
+
+    /**
+     * Waits for the leader received data.
+     */
+    LEADER_RECEIVED(1),
+
+    /**
+     * Waits for all replicas received data.
+     */
+    ALL_RECEIVED(-1);
+
+    private final int ack;
+
+    private Ack(int ack) {
+      this.ack = ack;
+    }
+
+    /**
+     * Returns the numerical ack number as understand by Kafka server.
+     */
+    public int getAck() {
+      return ack;
+    }
+  }
+
+  /**
+   * Prepares to publish to a given topic.
+   *
+   * @param topic Name of the topic.
+   * @return A {@link Preparer} to prepare for publishing.
+   */
+  Preparer prepare(String topic);
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/fcbe54d5/twill-core/src/main/java/org/apache/twill/kafka/client/PreparePublish.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/kafka/client/PreparePublish.java b/twill-core/src/main/java/org/apache/twill/kafka/client/PreparePublish.java
deleted file mode 100644
index 5db4abb..0000000
--- a/twill-core/src/main/java/org/apache/twill/kafka/client/PreparePublish.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.kafka.client;
-
-import com.google.common.util.concurrent.ListenableFuture;
-
-import java.nio.ByteBuffer;
-
-/**
- * This interface is for preparing to publish a set of messages to kafka.
- */
-public interface PreparePublish {
-
-  PreparePublish add(byte[] payload, Object partitionKey);
-
-  PreparePublish add(ByteBuffer payload, Object partitionKey);
-
-  ListenableFuture<?> publish();
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/fcbe54d5/twill-core/src/main/java/org/apache/twill/kafka/client/TopicPartition.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/kafka/client/TopicPartition.java b/twill-core/src/main/java/org/apache/twill/kafka/client/TopicPartition.java
new file mode 100644
index 0000000..87040be
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/kafka/client/TopicPartition.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.kafka.client;
+
+import com.google.common.base.Objects;
+
+/**
+ * Represents a combination of topic and partition.
+ */
+public class TopicPartition {
+
+  private final String topic;
+  private final int partition;
+
+  public TopicPartition(String topic, int partition) {
+    this.topic = topic;
+    this.partition = partition;
+  }
+
+  public final String getTopic() {
+    return topic;
+  }
+
+  public final int getPartition() {
+    return partition;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    TopicPartition other = (TopicPartition) o;
+    return partition == other.partition && topic.equals(other.topic);
+  }
+
+  @Override
+  public int hashCode() {
+    int result = topic.hashCode();
+    result = 31 * result + partition;
+    return result;
+  }
+
+  @Override
+  public String toString() {
+    return Objects.toStringHelper(this)
+      .add("topic", topic)
+      .add("partition", partition)
+      .toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/fcbe54d5/twill-core/src/main/java/org/apache/twill/kafka/client/package-info.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/kafka/client/package-info.java b/twill-core/src/main/java/org/apache/twill/kafka/client/package-info.java
index ea3bf20..cc22d12 100644
--- a/twill-core/src/main/java/org/apache/twill/kafka/client/package-info.java
+++ b/twill-core/src/main/java/org/apache/twill/kafka/client/package-info.java
@@ -16,6 +16,6 @@
  * limitations under the License.
  */
 /**
- * This package provides a pure java Kafka client interface.
+ * This package provides Kafka client interfaces.
  */
 package org.apache.twill.kafka.client;

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/fcbe54d5/twill-core/src/main/resources/kafka-0.7.2.tgz
----------------------------------------------------------------------
diff --git a/twill-core/src/main/resources/kafka-0.7.2.tgz b/twill-core/src/main/resources/kafka-0.7.2.tgz
deleted file mode 100644
index 24178d9..0000000
Binary files a/twill-core/src/main/resources/kafka-0.7.2.tgz and /dev/null differ

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/fcbe54d5/twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java
----------------------------------------------------------------------
diff --git a/twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java b/twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java
index 40fc3ed..9308fb0 100644
--- a/twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java
+++ b/twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java
@@ -17,24 +17,16 @@
  */
 package org.apache.twill.kafka.client;
 
+import com.google.common.base.Charsets;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.Futures;
+import org.apache.twill.common.Cancellable;
 import org.apache.twill.common.Services;
 import org.apache.twill.internal.kafka.EmbeddedKafkaServer;
-import org.apache.twill.internal.kafka.client.Compression;
-import org.apache.twill.internal.kafka.client.SimpleKafkaClient;
+import org.apache.twill.internal.kafka.client.ZKKafkaClientService;
 import org.apache.twill.internal.utils.Networks;
 import org.apache.twill.internal.zookeeper.InMemoryZKServer;
 import org.apache.twill.zookeeper.ZKClientService;
-import com.google.common.base.Charsets;
-import com.google.common.base.Preconditions;
-import com.google.common.io.ByteStreams;
-import com.google.common.io.Files;
-import com.google.common.util.concurrent.Futures;
-import org.apache.commons.compress.archivers.ArchiveEntry;
-import org.apache.commons.compress.archivers.ArchiveException;
-import org.apache.commons.compress.archivers.ArchiveInputStream;
-import org.apache.commons.compress.archivers.ArchiveStreamFactory;
-import org.apache.commons.compress.compressors.CompressorException;
-import org.apache.commons.compress.compressors.CompressorStreamFactory;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
@@ -44,11 +36,10 @@ import org.junit.rules.TemporaryFolder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
 import java.io.IOException;
-import java.io.InputStream;
 import java.util.Iterator;
 import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -64,7 +55,7 @@ public class KafkaTest {
   private static InMemoryZKServer zkServer;
   private static EmbeddedKafkaServer kafkaServer;
   private static ZKClientService zkClientService;
-  private static KafkaClient kafkaClient;
+  private static KafkaClientService kafkaClient;
 
   @BeforeClass
   public static void init() throws Exception {
@@ -72,12 +63,12 @@ public class KafkaTest {
     zkServer.startAndWait();
 
     // Extract the kafka.tgz and start the kafka server
-    kafkaServer = new EmbeddedKafkaServer(extractKafka(), generateKafkaConfig(zkServer.getConnectionStr()));
+    kafkaServer = new EmbeddedKafkaServer(generateKafkaConfig(zkServer.getConnectionStr()));
     kafkaServer.startAndWait();
 
     zkClientService = ZKClientService.Builder.of(zkServer.getConnectionStr()).build();
 
-    kafkaClient = new SimpleKafkaClient(zkClientService);
+    kafkaClient = new ZKKafkaClientService(zkClientService);
     Services.chainStart(zkClientService, kafkaClient).get();
   }
 
@@ -102,46 +93,27 @@ public class KafkaTest {
     t2.join();
     t3.start();
 
-    Iterator<FetchedMessage> consumer = kafkaClient.consume(topic, 0, 0, 1048576);
-    int count = 0;
-    long startTime = System.nanoTime();
-    while (count < 30 && consumer.hasNext() && secondsPassed(startTime, TimeUnit.NANOSECONDS) < 5) {
-      LOG.info(Charsets.UTF_8.decode(consumer.next().getBuffer()).toString());
-      count++;
-    }
-
-    Assert.assertEquals(30, count);
-  }
-
-  @Test (timeout = 10000)
-  public void testOffset() throws Exception {
-    String topic = "testOffset";
-
-    // Initial earliest offset should be 0.
-    long[] offsets = kafkaClient.getOffset(topic, 0, -2, 10).get();
-    Assert.assertArrayEquals(new long[]{0L}, offsets);
-
-    // Publish some messages
-    Thread publishThread = createPublishThread(kafkaClient, topic, Compression.NONE, "Testing", 2000);
-    publishThread.start();
-    publishThread.join();
-
-    // Fetch earliest offset, should still be 0.
-    offsets = kafkaClient.getOffset(topic, 0, -2, 10).get();
-    Assert.assertArrayEquals(new long[]{0L}, offsets);
-
-    // Fetch latest offset
-    offsets = kafkaClient.getOffset(topic, 0, -1, 10).get();
-    Iterator<FetchedMessage> consumer = kafkaClient.consume(topic, 0, offsets[0], 1048576);
+    final CountDownLatch latch = new CountDownLatch(30);
+    final CountDownLatch stopLatch = new CountDownLatch(1);
+    Cancellable cancel = kafkaClient.getConsumer().prepare().add(topic, 0, 0).consume(new KafkaConsumer
+      .MessageCallback() {
+      @Override
+      public void onReceived(Iterator<FetchedMessage> messages) {
+        while (messages.hasNext()) {
+          LOG.info(Charsets.UTF_8.decode(messages.next().getPayload()).toString());
+          latch.countDown();
+        }
+      }
 
-    // Publish one more message, the consumer should see the new message being published.
-    publishThread = createPublishThread(kafkaClient, topic, Compression.NONE, "Testing", 1, 3000);
-    publishThread.start();
-    publishThread.join();
+      @Override
+      public void finished() {
+        stopLatch.countDown();
+      }
+    });
 
-    // Should see the last message being published.
-    Assert.assertTrue(consumer.hasNext());
-    Assert.assertEquals("3000 Testing", Charsets.UTF_8.decode(consumer.next().getBuffer()).toString());
+    Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
+    cancel.cancel();
+    Assert.assertTrue(stopLatch.await(1, TimeUnit.SECONDS));
   }
 
   private Thread createPublishThread(final KafkaClient kafkaClient, final String topic,
@@ -153,11 +125,12 @@ public class KafkaTest {
                                      final String message, final int count, final int base) {
     return new Thread() {
       public void run() {
-        PreparePublish preparePublish = kafkaClient.preparePublish(topic, compression);
+        KafkaPublisher publisher = kafkaClient.getPublisher(KafkaPublisher.Ack.ALL_RECEIVED, compression);
+        KafkaPublisher.Preparer preparer = publisher.prepare(topic);
         for (int i = 0; i < count; i++) {
-          preparePublish.add(((base + i) + " " + message).getBytes(Charsets.UTF_8), 0);
+          preparer.add(Charsets.UTF_8.encode((base + i) + " " + message), 0);
         }
-        Futures.getUnchecked(preparePublish.publish());
+        Futures.getUnchecked(preparer.send());
       }
     };
   }
@@ -167,30 +140,6 @@ public class KafkaTest {
                                     TimeUnit.NANOSECONDS);
   }
 
-  private static File extractKafka() throws IOException, ArchiveException, CompressorException {
-    File kafkaExtract = TMP_FOLDER.newFolder();
-    InputStream kakfaResource = KafkaTest.class.getClassLoader().getResourceAsStream("kafka-0.7.2.tgz");
-    ArchiveInputStream archiveInput = new ArchiveStreamFactory()
-      .createArchiveInputStream(ArchiveStreamFactory.TAR,
-                                new CompressorStreamFactory()
-                                  .createCompressorInputStream(CompressorStreamFactory.GZIP, kakfaResource));
-
-    try {
-      ArchiveEntry entry = archiveInput.getNextEntry();
-      while (entry != null) {
-        File file = new File(kafkaExtract, entry.getName());
-        if (entry.isDirectory()) {
-          file.mkdirs();
-        } else {
-          ByteStreams.copy(archiveInput, Files.newOutputStreamSupplier(file));
-        }
-        entry = archiveInput.getNextEntry();
-      }
-    } finally {
-      archiveInput.close();
-    }
-    return kafkaExtract;
-  }
 
   private static Properties generateKafkaConfig(String zkConnectStr) throws IOException {
     int port = Networks.getRandomPort();
@@ -198,20 +147,20 @@ public class KafkaTest {
 
     Properties prop = new Properties();
     prop.setProperty("log.dir", TMP_FOLDER.newFolder().getAbsolutePath());
-    prop.setProperty("zk.connect", zkConnectStr);
-    prop.setProperty("num.threads", "8");
     prop.setProperty("port", Integer.toString(port));
-    prop.setProperty("log.flush.interval", "1000");
-    prop.setProperty("max.socket.request.bytes", "104857600");
-    prop.setProperty("log.cleanup.interval.mins", "1");
-    prop.setProperty("log.default.flush.scheduler.interval.ms", "1000");
-    prop.setProperty("zk.connectiontimeout.ms", "1000000");
-    prop.setProperty("socket.receive.buffer", "1048576");
-    prop.setProperty("enable.zookeeper", "true");
-    prop.setProperty("log.retention.hours", "24");
-    prop.setProperty("brokerid", "0");
-    prop.setProperty("socket.send.buffer", "1048576");
+    prop.setProperty("broker.id", "1");
+    prop.setProperty("socket.send.buffer.bytes", "1048576");
+    prop.setProperty("socket.receive.buffer.bytes", "1048576");
+    prop.setProperty("socket.request.max.bytes", "104857600");
     prop.setProperty("num.partitions", "1");
+    prop.setProperty("log.retention.hours", "1");
+    prop.setProperty("log.flush.interval.messages", "10000");
+    prop.setProperty("log.flush.interval.ms", "1000");
+    prop.setProperty("log.segment.bytes", "536870912");
+    prop.setProperty("zookeeper.connect", zkConnectStr);
+    prop.setProperty("zookeeper.connection.timeout.ms", "1000000");
+    prop.setProperty("default.replication.factor", "1");
+
     // Use a really small file size to force some flush to happen
     prop.setProperty("log.file.size", "1024");
     prop.setProperty("log.default.flush.interval.ms", "1000");

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/fcbe54d5/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
index 51c8503..73cecf0 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
@@ -17,6 +17,39 @@
  */
 package org.apache.twill.internal.appmaster;
 
+import com.google.common.base.Charsets;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.base.Supplier;
+import com.google.common.base.Throwables;
+import com.google.common.collect.HashMultiset;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableMultimap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Multiset;
+import com.google.common.collect.Sets;
+import com.google.common.io.CharStreams;
+import com.google.common.io.Files;
+import com.google.common.io.InputSupplier;
+import com.google.common.reflect.TypeToken;
+import com.google.common.util.concurrent.AbstractExecutionThreadService;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.Service;
+import com.google.common.util.concurrent.SettableFuture;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonElement;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.util.Records;
 import org.apache.twill.api.Command;
 import org.apache.twill.api.EventHandler;
 import org.apache.twill.api.EventHandlerSpecification;
@@ -51,39 +84,6 @@ import org.apache.twill.internal.yarn.YarnContainerStatus;
 import org.apache.twill.internal.yarn.YarnUtils;
 import org.apache.twill.zookeeper.ZKClient;
 import org.apache.twill.zookeeper.ZKClients;
-import com.google.common.base.Charsets;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Predicate;
-import com.google.common.base.Supplier;
-import com.google.common.base.Throwables;
-import com.google.common.collect.HashMultiset;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableMultimap;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Multiset;
-import com.google.common.collect.Sets;
-import com.google.common.io.CharStreams;
-import com.google.common.io.Files;
-import com.google.common.io.InputSupplier;
-import com.google.common.reflect.TypeToken;
-import com.google.common.util.concurrent.AbstractExecutionThreadService;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.Service;
-import com.google.common.util.concurrent.SettableFuture;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-import com.google.gson.JsonElement;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.util.Records;
 import org.apache.zookeeper.CreateMode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -241,7 +241,7 @@ public final class ApplicationMasterService extends AbstractTwillService {
 
     instanceChangeExecutor = Executors.newSingleThreadExecutor(Threads.createDaemonThreadFactory("instanceChanger"));
 
-    kafkaServer = new EmbeddedKafkaServer(new File(Constants.Files.KAFKA), generateKafkaConfig());
+    kafkaServer = new EmbeddedKafkaServer(generateKafkaConfig());
 
     // Must start tracker before start AMClient
     LOG.info("Starting application master tracker server");
@@ -589,22 +589,19 @@ public final class ApplicationMasterService extends AbstractTwillService {
 
     Properties prop = new Properties();
     prop.setProperty("log.dir", new File("kafka-logs").getAbsolutePath());
-    prop.setProperty("zk.connect", getKafkaZKConnect());
-    prop.setProperty("num.threads", "8");
     prop.setProperty("port", Integer.toString(port));
-    prop.setProperty("log.flush.interval", "10000");
-    prop.setProperty("max.socket.request.bytes", "104857600");
-    prop.setProperty("log.cleanup.interval.mins", "1");
-    prop.setProperty("log.default.flush.scheduler.interval.ms", "1000");
-    prop.setProperty("zk.connectiontimeout.ms", "1000000");
-    prop.setProperty("socket.receive.buffer", "1048576");
-    prop.setProperty("enable.zookeeper", "true");
-    prop.setProperty("log.retention.hours", "24");
-    prop.setProperty("brokerid", "0");
-    prop.setProperty("socket.send.buffer", "1048576");
+    prop.setProperty("broker.id", "1");
+    prop.setProperty("socket.send.buffer.bytes", "1048576");
+    prop.setProperty("socket.receive.buffer.bytes", "1048576");
+    prop.setProperty("socket.request.max.bytes", "104857600");
     prop.setProperty("num.partitions", "1");
-    prop.setProperty("log.file.size", "536870912");
-    prop.setProperty("log.default.flush.interval.ms", "1000");
+    prop.setProperty("log.retention.hours", "24");
+    prop.setProperty("log.flush.interval.messages", "10000");
+    prop.setProperty("log.flush.interval.ms", "1000");
+    prop.setProperty("log.segment.bytes", "536870912");
+    prop.setProperty("zookeeper.connect", getKafkaZKConnect());
+    prop.setProperty("zookeeper.connection.timeout.ms", "1000000");
+    prop.setProperty("default.replication.factor", "1");
     return prop;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/fcbe54d5/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
index 17425d4..8c96629 100644
--- a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
+++ b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
@@ -243,7 +243,6 @@ final class YarnTwillPreparer implements TwillPreparer {
           saveSpecification(twillSpec, runnableLocalFiles, localFiles);
           saveLogback(localFiles);
           saveLauncher(localFiles);
-          saveKafka(localFiles);
           saveVmOptions(vmOpts, localFiles);
           saveArguments(new Arguments(arguments, runnableArgs), localFiles);
           saveLocalFiles(localFiles, ImmutableSet.of(Constants.Files.TWILL_SPEC,
@@ -498,15 +497,6 @@ final class YarnTwillPreparer implements TwillPreparer {
     localFiles.put(Constants.Files.LAUNCHER_JAR, createLocalFile(Constants.Files.LAUNCHER_JAR, location));
   }
 
-  private void saveKafka(Map<String, LocalFile> localFiles) throws IOException {
-    LOG.debug("Copy {}", Constants.Files.KAFKA);
-    Location location = copyFromURL(getClass().getClassLoader().getResource(KAFKA_ARCHIVE),
-                                    createTempLocation(Constants.Files.KAFKA));
-    LOG.debug("Done {}", Constants.Files.KAFKA);
-
-    localFiles.put(Constants.Files.KAFKA, createLocalFile(Constants.Files.KAFKA, location, true));
-  }
-
   private void saveVmOptions(String opts, Map<String, LocalFile> localFiles) throws IOException {
     if (opts.isEmpty()) {
       // If no vm options, no need to localize the file.