You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@twill.apache.org by ch...@apache.org on 2014/04/22 08:06:01 UTC
[04/50] [abbrv] [TWILL-27] Upgrade to Kafka-0.8
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/fcbe54d5/twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java b/twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java
new file mode 100644
index 0000000..d53ee98
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.kafka.client;
+
+import org.apache.twill.common.Cancellable;
+
+import java.util.Iterator;
+
+/**
+ * A consumer for consuming (reading) messages published to a Kafka server cluster.
+ */
+public interface KafkaConsumer {
+
+ /**
+ * Callback for receiving new messages.
+ */
+ interface MessageCallback {
+
+ /**
+ * Invoked when new messages is available.
+ * @param messages Iterator of new messages. The {@link FetchedMessage} instance maybe reused in the Iterator
+ * and across different invocation.
+ */
+ void onReceived(Iterator<FetchedMessage> messages);
+
+ /**
+ * Invoked when message consumption is stopped. When this method is invoked,
+ * no more {@link #onReceived(java.util.Iterator)} will get triggered.
+ */
+ void finished();
+ }
+
+ /**
+ * A builder for preparing message consumption.
+ */
+ interface Preparer {
+
+ /**
+ * Consumes messages from a given offset. If the given offset is invalid, it'll start consuming from the
+ * latest offset.
+ * @param topic Topic to consume from.
+ * @param partition Partition in the topic to consume from.
+ * @param offset Offset to starts with.
+ * @return This {@link Preparer} instance.
+ */
+ Preparer add(String topic, int partition, long offset);
+
+ /**
+ * Consumes messages from the earliest message available.
+ * @param topic Topic to consume from.
+ * @param partition Partition in the topic to consume from.
+ * @return This {@link Preparer} instance.
+ */
+ Preparer addFromBeginning(String topic, int partition);
+
+ /**
+ * Consumes messages from the latest message.
+ * @param topic Topic to consume from.
+ * @param partition Partition in the topic to consume from.
+ * @return This {@link Preparer} instance.
+ */
+ Preparer addLatest(String topic, int partition);
+
+ /**
+ * Starts the consumption as being configured by this {@link Preparer}.
+ * @param callback The {@link MessageCallback} for receiving new messages.
+ * @return A {@link Cancellable} for cancelling message consumption.
+ */
+ Cancellable consume(MessageCallback callback);
+ }
+
+ /**
+ * Prepares for message consumption.
+ * @return A {@link Preparer} to setup details about message consumption.
+ */
+ Preparer prepare();
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/fcbe54d5/twill-core/src/main/java/org/apache/twill/kafka/client/KafkaPublisher.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/kafka/client/KafkaPublisher.java b/twill-core/src/main/java/org/apache/twill/kafka/client/KafkaPublisher.java
new file mode 100644
index 0000000..bffce97
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/kafka/client/KafkaPublisher.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.kafka.client;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+import java.nio.ByteBuffer;
+
+/**
+ * This interface is for publishing data to Kafka.
+ */
+public interface KafkaPublisher {
+
+ /**
+ * A Preparer for preparing to publish messages to a given topic. An instance of this class could be reused
+ * to send more messages after {@link #send()} is called.
+ */
+ interface Preparer {
+ /**
+ * Adds the given message to the message set, partitioned with the given partition key.
+ * @param message Remaining bytes in the ByteBuffer will be used as message payload. This method would
+ * consume the ByteBuffer, meaning after this method returns, the remaining bytes in the
+ * ByteBuffer would be {@code 0}.
+ * @param partitionKey Key for computing the partition Id to publish to. The {@link Object#hashCode()} method
+ * will be invoke to compute the id.
+ * @return
+ */
+ Preparer add(ByteBuffer message, Object partitionKey);
+
+ /**
+ * Sends all the messages being added through the {@link #add} method.
+ *
+ * @return A {@link ListenableFuture} that will be completed when the send action is done. If publish is succeeded,
+ * it returns number of messages published, otherwise the failure reason will be carried in the future.
+ * The {@link ListenableFuture#cancel(boolean)} method has no effect on the publish action.
+ */
+ ListenableFuture<Integer> send();
+ }
+
+ /**
+ * Represents the desired level of publish acknowledgment.
+ */
+ enum Ack {
+ /**
+ * Not wait for ack.
+ */
+ FIRE_AND_FORGET(0),
+
+ /**
+ * Waits for the leader received data.
+ */
+ LEADER_RECEIVED(1),
+
+ /**
+ * Waits for all replicas received data.
+ */
+ ALL_RECEIVED(-1);
+
+ private final int ack;
+
+ private Ack(int ack) {
+ this.ack = ack;
+ }
+
+ /**
+ * Returns the numerical ack number as understand by Kafka server.
+ */
+ public int getAck() {
+ return ack;
+ }
+ }
+
+ /**
+ * Prepares to publish to a given topic.
+ *
+ * @param topic Name of the topic.
+ * @return A {@link Preparer} to prepare for publishing.
+ */
+ Preparer prepare(String topic);
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/fcbe54d5/twill-core/src/main/java/org/apache/twill/kafka/client/PreparePublish.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/kafka/client/PreparePublish.java b/twill-core/src/main/java/org/apache/twill/kafka/client/PreparePublish.java
deleted file mode 100644
index 5db4abb..0000000
--- a/twill-core/src/main/java/org/apache/twill/kafka/client/PreparePublish.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.kafka.client;
-
-import com.google.common.util.concurrent.ListenableFuture;
-
-import java.nio.ByteBuffer;
-
-/**
- * This interface is for preparing to publish a set of messages to kafka.
- */
-public interface PreparePublish {
-
- PreparePublish add(byte[] payload, Object partitionKey);
-
- PreparePublish add(ByteBuffer payload, Object partitionKey);
-
- ListenableFuture<?> publish();
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/fcbe54d5/twill-core/src/main/java/org/apache/twill/kafka/client/TopicPartition.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/kafka/client/TopicPartition.java b/twill-core/src/main/java/org/apache/twill/kafka/client/TopicPartition.java
new file mode 100644
index 0000000..87040be
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/kafka/client/TopicPartition.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.kafka.client;
+
+import com.google.common.base.Objects;
+
+/**
+ * Represents a combination of topic and partition.
+ */
+public class TopicPartition {
+
+ private final String topic;
+ private final int partition;
+
+ public TopicPartition(String topic, int partition) {
+ this.topic = topic;
+ this.partition = partition;
+ }
+
+ public final String getTopic() {
+ return topic;
+ }
+
+ public final int getPartition() {
+ return partition;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ TopicPartition other = (TopicPartition) o;
+ return partition == other.partition && topic.equals(other.topic);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = topic.hashCode();
+ result = 31 * result + partition;
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return Objects.toStringHelper(this)
+ .add("topic", topic)
+ .add("partition", partition)
+ .toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/fcbe54d5/twill-core/src/main/java/org/apache/twill/kafka/client/package-info.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/kafka/client/package-info.java b/twill-core/src/main/java/org/apache/twill/kafka/client/package-info.java
index ea3bf20..cc22d12 100644
--- a/twill-core/src/main/java/org/apache/twill/kafka/client/package-info.java
+++ b/twill-core/src/main/java/org/apache/twill/kafka/client/package-info.java
@@ -16,6 +16,6 @@
* limitations under the License.
*/
/**
- * This package provides a pure java Kafka client interface.
+ * This package provides Kafka client interfaces.
*/
package org.apache.twill.kafka.client;
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/fcbe54d5/twill-core/src/main/resources/kafka-0.7.2.tgz
----------------------------------------------------------------------
diff --git a/twill-core/src/main/resources/kafka-0.7.2.tgz b/twill-core/src/main/resources/kafka-0.7.2.tgz
deleted file mode 100644
index 24178d9..0000000
Binary files a/twill-core/src/main/resources/kafka-0.7.2.tgz and /dev/null differ
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/fcbe54d5/twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java
----------------------------------------------------------------------
diff --git a/twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java b/twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java
index 40fc3ed..9308fb0 100644
--- a/twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java
+++ b/twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java
@@ -17,24 +17,16 @@
*/
package org.apache.twill.kafka.client;
+import com.google.common.base.Charsets;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.Futures;
+import org.apache.twill.common.Cancellable;
import org.apache.twill.common.Services;
import org.apache.twill.internal.kafka.EmbeddedKafkaServer;
-import org.apache.twill.internal.kafka.client.Compression;
-import org.apache.twill.internal.kafka.client.SimpleKafkaClient;
+import org.apache.twill.internal.kafka.client.ZKKafkaClientService;
import org.apache.twill.internal.utils.Networks;
import org.apache.twill.internal.zookeeper.InMemoryZKServer;
import org.apache.twill.zookeeper.ZKClientService;
-import com.google.common.base.Charsets;
-import com.google.common.base.Preconditions;
-import com.google.common.io.ByteStreams;
-import com.google.common.io.Files;
-import com.google.common.util.concurrent.Futures;
-import org.apache.commons.compress.archivers.ArchiveEntry;
-import org.apache.commons.compress.archivers.ArchiveException;
-import org.apache.commons.compress.archivers.ArchiveInputStream;
-import org.apache.commons.compress.archivers.ArchiveStreamFactory;
-import org.apache.commons.compress.compressors.CompressorException;
-import org.apache.commons.compress.compressors.CompressorStreamFactory;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
@@ -44,11 +36,10 @@ import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.File;
import java.io.IOException;
-import java.io.InputStream;
import java.util.Iterator;
import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/**
@@ -64,7 +55,7 @@ public class KafkaTest {
private static InMemoryZKServer zkServer;
private static EmbeddedKafkaServer kafkaServer;
private static ZKClientService zkClientService;
- private static KafkaClient kafkaClient;
+ private static KafkaClientService kafkaClient;
@BeforeClass
public static void init() throws Exception {
@@ -72,12 +63,12 @@ public class KafkaTest {
zkServer.startAndWait();
// Extract the kafka.tgz and start the kafka server
- kafkaServer = new EmbeddedKafkaServer(extractKafka(), generateKafkaConfig(zkServer.getConnectionStr()));
+ kafkaServer = new EmbeddedKafkaServer(generateKafkaConfig(zkServer.getConnectionStr()));
kafkaServer.startAndWait();
zkClientService = ZKClientService.Builder.of(zkServer.getConnectionStr()).build();
- kafkaClient = new SimpleKafkaClient(zkClientService);
+ kafkaClient = new ZKKafkaClientService(zkClientService);
Services.chainStart(zkClientService, kafkaClient).get();
}
@@ -102,46 +93,27 @@ public class KafkaTest {
t2.join();
t3.start();
- Iterator<FetchedMessage> consumer = kafkaClient.consume(topic, 0, 0, 1048576);
- int count = 0;
- long startTime = System.nanoTime();
- while (count < 30 && consumer.hasNext() && secondsPassed(startTime, TimeUnit.NANOSECONDS) < 5) {
- LOG.info(Charsets.UTF_8.decode(consumer.next().getBuffer()).toString());
- count++;
- }
-
- Assert.assertEquals(30, count);
- }
-
- @Test (timeout = 10000)
- public void testOffset() throws Exception {
- String topic = "testOffset";
-
- // Initial earliest offset should be 0.
- long[] offsets = kafkaClient.getOffset(topic, 0, -2, 10).get();
- Assert.assertArrayEquals(new long[]{0L}, offsets);
-
- // Publish some messages
- Thread publishThread = createPublishThread(kafkaClient, topic, Compression.NONE, "Testing", 2000);
- publishThread.start();
- publishThread.join();
-
- // Fetch earliest offset, should still be 0.
- offsets = kafkaClient.getOffset(topic, 0, -2, 10).get();
- Assert.assertArrayEquals(new long[]{0L}, offsets);
-
- // Fetch latest offset
- offsets = kafkaClient.getOffset(topic, 0, -1, 10).get();
- Iterator<FetchedMessage> consumer = kafkaClient.consume(topic, 0, offsets[0], 1048576);
+ final CountDownLatch latch = new CountDownLatch(30);
+ final CountDownLatch stopLatch = new CountDownLatch(1);
+ Cancellable cancel = kafkaClient.getConsumer().prepare().add(topic, 0, 0).consume(new KafkaConsumer
+ .MessageCallback() {
+ @Override
+ public void onReceived(Iterator<FetchedMessage> messages) {
+ while (messages.hasNext()) {
+ LOG.info(Charsets.UTF_8.decode(messages.next().getPayload()).toString());
+ latch.countDown();
+ }
+ }
- // Publish one more message, the consumer should see the new message being published.
- publishThread = createPublishThread(kafkaClient, topic, Compression.NONE, "Testing", 1, 3000);
- publishThread.start();
- publishThread.join();
+ @Override
+ public void finished() {
+ stopLatch.countDown();
+ }
+ });
- // Should see the last message being published.
- Assert.assertTrue(consumer.hasNext());
- Assert.assertEquals("3000 Testing", Charsets.UTF_8.decode(consumer.next().getBuffer()).toString());
+ Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
+ cancel.cancel();
+ Assert.assertTrue(stopLatch.await(1, TimeUnit.SECONDS));
}
private Thread createPublishThread(final KafkaClient kafkaClient, final String topic,
@@ -153,11 +125,12 @@ public class KafkaTest {
final String message, final int count, final int base) {
return new Thread() {
public void run() {
- PreparePublish preparePublish = kafkaClient.preparePublish(topic, compression);
+ KafkaPublisher publisher = kafkaClient.getPublisher(KafkaPublisher.Ack.ALL_RECEIVED, compression);
+ KafkaPublisher.Preparer preparer = publisher.prepare(topic);
for (int i = 0; i < count; i++) {
- preparePublish.add(((base + i) + " " + message).getBytes(Charsets.UTF_8), 0);
+ preparer.add(Charsets.UTF_8.encode((base + i) + " " + message), 0);
}
- Futures.getUnchecked(preparePublish.publish());
+ Futures.getUnchecked(preparer.send());
}
};
}
@@ -167,30 +140,6 @@ public class KafkaTest {
TimeUnit.NANOSECONDS);
}
- private static File extractKafka() throws IOException, ArchiveException, CompressorException {
- File kafkaExtract = TMP_FOLDER.newFolder();
- InputStream kakfaResource = KafkaTest.class.getClassLoader().getResourceAsStream("kafka-0.7.2.tgz");
- ArchiveInputStream archiveInput = new ArchiveStreamFactory()
- .createArchiveInputStream(ArchiveStreamFactory.TAR,
- new CompressorStreamFactory()
- .createCompressorInputStream(CompressorStreamFactory.GZIP, kakfaResource));
-
- try {
- ArchiveEntry entry = archiveInput.getNextEntry();
- while (entry != null) {
- File file = new File(kafkaExtract, entry.getName());
- if (entry.isDirectory()) {
- file.mkdirs();
- } else {
- ByteStreams.copy(archiveInput, Files.newOutputStreamSupplier(file));
- }
- entry = archiveInput.getNextEntry();
- }
- } finally {
- archiveInput.close();
- }
- return kafkaExtract;
- }
private static Properties generateKafkaConfig(String zkConnectStr) throws IOException {
int port = Networks.getRandomPort();
@@ -198,20 +147,20 @@ public class KafkaTest {
Properties prop = new Properties();
prop.setProperty("log.dir", TMP_FOLDER.newFolder().getAbsolutePath());
- prop.setProperty("zk.connect", zkConnectStr);
- prop.setProperty("num.threads", "8");
prop.setProperty("port", Integer.toString(port));
- prop.setProperty("log.flush.interval", "1000");
- prop.setProperty("max.socket.request.bytes", "104857600");
- prop.setProperty("log.cleanup.interval.mins", "1");
- prop.setProperty("log.default.flush.scheduler.interval.ms", "1000");
- prop.setProperty("zk.connectiontimeout.ms", "1000000");
- prop.setProperty("socket.receive.buffer", "1048576");
- prop.setProperty("enable.zookeeper", "true");
- prop.setProperty("log.retention.hours", "24");
- prop.setProperty("brokerid", "0");
- prop.setProperty("socket.send.buffer", "1048576");
+ prop.setProperty("broker.id", "1");
+ prop.setProperty("socket.send.buffer.bytes", "1048576");
+ prop.setProperty("socket.receive.buffer.bytes", "1048576");
+ prop.setProperty("socket.request.max.bytes", "104857600");
prop.setProperty("num.partitions", "1");
+ prop.setProperty("log.retention.hours", "1");
+ prop.setProperty("log.flush.interval.messages", "10000");
+ prop.setProperty("log.flush.interval.ms", "1000");
+ prop.setProperty("log.segment.bytes", "536870912");
+ prop.setProperty("zookeeper.connect", zkConnectStr);
+ prop.setProperty("zookeeper.connection.timeout.ms", "1000000");
+ prop.setProperty("default.replication.factor", "1");
+
// Use a really small file size to force some flush to happen
prop.setProperty("log.file.size", "1024");
prop.setProperty("log.default.flush.interval.ms", "1000");
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/fcbe54d5/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
index 51c8503..73cecf0 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
@@ -17,6 +17,39 @@
*/
package org.apache.twill.internal.appmaster;
+import com.google.common.base.Charsets;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.base.Supplier;
+import com.google.common.base.Throwables;
+import com.google.common.collect.HashMultiset;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableMultimap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Multiset;
+import com.google.common.collect.Sets;
+import com.google.common.io.CharStreams;
+import com.google.common.io.Files;
+import com.google.common.io.InputSupplier;
+import com.google.common.reflect.TypeToken;
+import com.google.common.util.concurrent.AbstractExecutionThreadService;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.Service;
+import com.google.common.util.concurrent.SettableFuture;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonElement;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.util.Records;
import org.apache.twill.api.Command;
import org.apache.twill.api.EventHandler;
import org.apache.twill.api.EventHandlerSpecification;
@@ -51,39 +84,6 @@ import org.apache.twill.internal.yarn.YarnContainerStatus;
import org.apache.twill.internal.yarn.YarnUtils;
import org.apache.twill.zookeeper.ZKClient;
import org.apache.twill.zookeeper.ZKClients;
-import com.google.common.base.Charsets;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Predicate;
-import com.google.common.base.Supplier;
-import com.google.common.base.Throwables;
-import com.google.common.collect.HashMultiset;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableMultimap;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Multiset;
-import com.google.common.collect.Sets;
-import com.google.common.io.CharStreams;
-import com.google.common.io.Files;
-import com.google.common.io.InputSupplier;
-import com.google.common.reflect.TypeToken;
-import com.google.common.util.concurrent.AbstractExecutionThreadService;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.Service;
-import com.google.common.util.concurrent.SettableFuture;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-import com.google.gson.JsonElement;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.util.Records;
import org.apache.zookeeper.CreateMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -241,7 +241,7 @@ public final class ApplicationMasterService extends AbstractTwillService {
instanceChangeExecutor = Executors.newSingleThreadExecutor(Threads.createDaemonThreadFactory("instanceChanger"));
- kafkaServer = new EmbeddedKafkaServer(new File(Constants.Files.KAFKA), generateKafkaConfig());
+ kafkaServer = new EmbeddedKafkaServer(generateKafkaConfig());
// Must start tracker before start AMClient
LOG.info("Starting application master tracker server");
@@ -589,22 +589,19 @@ public final class ApplicationMasterService extends AbstractTwillService {
Properties prop = new Properties();
prop.setProperty("log.dir", new File("kafka-logs").getAbsolutePath());
- prop.setProperty("zk.connect", getKafkaZKConnect());
- prop.setProperty("num.threads", "8");
prop.setProperty("port", Integer.toString(port));
- prop.setProperty("log.flush.interval", "10000");
- prop.setProperty("max.socket.request.bytes", "104857600");
- prop.setProperty("log.cleanup.interval.mins", "1");
- prop.setProperty("log.default.flush.scheduler.interval.ms", "1000");
- prop.setProperty("zk.connectiontimeout.ms", "1000000");
- prop.setProperty("socket.receive.buffer", "1048576");
- prop.setProperty("enable.zookeeper", "true");
- prop.setProperty("log.retention.hours", "24");
- prop.setProperty("brokerid", "0");
- prop.setProperty("socket.send.buffer", "1048576");
+ prop.setProperty("broker.id", "1");
+ prop.setProperty("socket.send.buffer.bytes", "1048576");
+ prop.setProperty("socket.receive.buffer.bytes", "1048576");
+ prop.setProperty("socket.request.max.bytes", "104857600");
prop.setProperty("num.partitions", "1");
- prop.setProperty("log.file.size", "536870912");
- prop.setProperty("log.default.flush.interval.ms", "1000");
+ prop.setProperty("log.retention.hours", "24");
+ prop.setProperty("log.flush.interval.messages", "10000");
+ prop.setProperty("log.flush.interval.ms", "1000");
+ prop.setProperty("log.segment.bytes", "536870912");
+ prop.setProperty("zookeeper.connect", getKafkaZKConnect());
+ prop.setProperty("zookeeper.connection.timeout.ms", "1000000");
+ prop.setProperty("default.replication.factor", "1");
return prop;
}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/fcbe54d5/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
index 17425d4..8c96629 100644
--- a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
+++ b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
@@ -243,7 +243,6 @@ final class YarnTwillPreparer implements TwillPreparer {
saveSpecification(twillSpec, runnableLocalFiles, localFiles);
saveLogback(localFiles);
saveLauncher(localFiles);
- saveKafka(localFiles);
saveVmOptions(vmOpts, localFiles);
saveArguments(new Arguments(arguments, runnableArgs), localFiles);
saveLocalFiles(localFiles, ImmutableSet.of(Constants.Files.TWILL_SPEC,
@@ -498,15 +497,6 @@ final class YarnTwillPreparer implements TwillPreparer {
localFiles.put(Constants.Files.LAUNCHER_JAR, createLocalFile(Constants.Files.LAUNCHER_JAR, location));
}
- private void saveKafka(Map<String, LocalFile> localFiles) throws IOException {
- LOG.debug("Copy {}", Constants.Files.KAFKA);
- Location location = copyFromURL(getClass().getClassLoader().getResource(KAFKA_ARCHIVE),
- createTempLocation(Constants.Files.KAFKA));
- LOG.debug("Done {}", Constants.Files.KAFKA);
-
- localFiles.put(Constants.Files.KAFKA, createLocalFile(Constants.Files.KAFKA, location, true));
- }
-
private void saveVmOptions(String opts, Map<String, LocalFile> localFiles) throws IOException {
if (opts.isEmpty()) {
// If no vm options, no need to localize the file.