You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@twill.apache.org by ch...@apache.org on 2020/01/21 20:02:16 UTC
[twill] branch master updated: (TWILL-265) Upgrading kafka version
and removing lz4 dependency
This is an automated email from the ASF dual-hosted git repository.
chtyim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/twill.git
The following commit(s) were added to refs/heads/master by this push:
new 944fb79 (TWILL-265) Upgrading kafka version and removing lz4 dependency
944fb79 is described below
commit 944fb7932d7f911eea0402115ee7a8e2fed5d671
Author: vinisha <vi...@google.com>
AuthorDate: Wed Jan 15 11:04:47 2020 -0800
(TWILL-265) Upgrading kafka version and removing lz4 dependency
This closes #84 on Github.
Signed-off-by: Terence Yim <te...@google.com>
---
pom.xml | 13 +++++++--
twill-core/pom.xml | 4 +++
.../internal/kafka/client/IntegerPartitioner.java | 34 ----------------------
.../kafka/client/SimpleKafkaPublisher.java | 3 +-
.../org/apache/twill/kafka/client/KafkaTest.java | 19 ++++++------
.../org/apache/twill/internal/ServiceMain.java | 19 ++++++++++--
6 files changed, 44 insertions(+), 48 deletions(-)
diff --git a/pom.xml b/pom.xml
index b76b1b3..401d97c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -165,7 +165,7 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<surefire.redirectTestOutputToFile>true</surefire.redirectTestOutputToFile>
- <slf4j.version>1.7.5</slf4j.version>
+ <slf4j.version>1.7.30</slf4j.version>
<logback.version>1.0.9</logback.version>
<guava.version>13.0.1</guava.version>
<gson.version>2.2.4</gson.version>
@@ -174,7 +174,7 @@
<snappy-java.version>1.0.5</snappy-java.version>
<jcl-over-slf4j.version>1.7.2</jcl-over-slf4j.version>
<asm.version>7.1</asm.version>
- <kafka.version>0.8.0</kafka.version>
+ <kafka.version>0.8.2.2</kafka.version>
<zkclient.version>0.10</zkclient.version>
<zookeeper.version>3.4.5</zookeeper.version>
<junit.version>4.11</junit.version>
@@ -754,8 +754,17 @@
<groupId>com.sun.jmx</groupId>
<artifactId>jmxri</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>net.jpountz.lz4</groupId>
+ <artifactId>lz4</artifactId>
+ </exclusion>
</exclusions>
</dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>log4j-over-slf4j</artifactId>
+ <version>${slf4j.version}</version>
+ </dependency>
<dependency>
<groupId>net.sf.jopt-simple</groupId>
<artifactId>jopt-simple</artifactId>
diff --git a/twill-core/pom.xml b/twill-core/pom.xml
index 249c061..603e809 100644
--- a/twill-core/pom.xml
+++ b/twill-core/pom.xml
@@ -89,6 +89,10 @@
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>log4j-over-slf4j</artifactId>
+ </dependency>
<!-- https://mvnrepository.com/artifact/net.sf.jopt-simple/jopt-simple -->
<dependency>
<groupId>net.sf.jopt-simple</groupId>
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/IntegerPartitioner.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/IntegerPartitioner.java
deleted file mode 100644
index 4aa7940..0000000
--- a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/IntegerPartitioner.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.kafka.client;
-
-import kafka.producer.Partitioner;
-import kafka.utils.VerifiableProperties;
-
-/**
- * A kafka {@link kafka.producer.Partitioner} using integer key to compute partition id.
- */
-public final class IntegerPartitioner implements Partitioner<Integer> {
-
- public IntegerPartitioner(VerifiableProperties properties) {
- }
-
- public int partition(Integer key, int numPartitions) {
- return key % numPartitions;
- }
-}
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaPublisher.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaPublisher.java
index e5d0f8d..832faa8 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaPublisher.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaPublisher.java
@@ -47,6 +47,7 @@ import java.util.concurrent.atomic.AtomicReference;
final class SimpleKafkaPublisher implements KafkaPublisher {
private static final Logger LOG = LoggerFactory.getLogger(SimpleKafkaPublisher.class);
+ private static final int MAX_MESSAGE_BYTES = 1024 * 1024 * 10;
private final BrokerService brokerService;
private final Ack ack;
@@ -171,9 +172,9 @@ final class SimpleKafkaPublisher implements KafkaPublisher {
props.put("metadata.broker.list", newBrokerList);
props.put("serializer.class", ByteBufferEncoder.class.getName());
props.put("key.serializer.class", IntegerEncoder.class.getName());
- props.put("partitioner.class", IntegerPartitioner.class.getName());
props.put("request.required.acks", Integer.toString(ack.getAck()));
props.put("compression.codec", compression.getCodec());
+ props.put("message.max.bytes", Integer.toString(MAX_MESSAGE_BYTES));
ProducerConfig config = new ProducerConfig(props);
newProducer = new Producer<>(config);
diff --git a/twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java b/twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java
index 958925c..e7e7e7c 100644
--- a/twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java
+++ b/twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java
@@ -135,6 +135,9 @@ public class KafkaTest {
server = new EmbeddedKafkaServer(kafkaServerConfig);
server.startAndWait();
+ // Wait a little while to make sure changes is reflected in broker service
+ TimeUnit.SECONDS.sleep(3);
+
// Publish another message
createPublishThread(kafkaClient, topic, Compression.NONE, "Second message", 1).start();
@@ -308,16 +311,14 @@ public class KafkaTest {
private Thread createPublishThread(final KafkaClient kafkaClient, final String topic, final Compression compression,
final String message, final int count, final int base) {
- return new Thread() {
- public void run() {
- KafkaPublisher publisher = kafkaClient.getPublisher(KafkaPublisher.Ack.ALL_RECEIVED, compression);
- KafkaPublisher.Preparer preparer = publisher.prepare(topic);
- for (int i = 0; i < count; i++) {
- preparer.add(Charsets.UTF_8.encode((base + i) + " " + message), 0);
- }
- Futures.getUnchecked(preparer.send());
+ return new Thread(() -> {
+ KafkaPublisher publisher = kafkaClient.getPublisher(KafkaPublisher.Ack.ALL_RECEIVED, compression);
+ KafkaPublisher.Preparer preparer = publisher.prepare(topic);
+ for (int i = 0; i < count; i++) {
+ preparer.add(Charsets.UTF_8.encode((base + i) + " " + message), 0);
}
- };
+ Futures.getUnchecked(preparer.send());
+ });
}
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java b/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java
index b27dcd8..ca0bc08 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java
@@ -18,8 +18,11 @@
package org.apache.twill.internal;
import ch.qos.logback.classic.LoggerContext;
+import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.classic.util.ContextInitializer;
+import ch.qos.logback.core.filter.Filter;
import ch.qos.logback.core.joran.spi.JoranException;
+import ch.qos.logback.core.spi.FilterReply;
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.AbstractIdleService;
import com.google.common.util.concurrent.Futures;
@@ -211,7 +214,14 @@ public abstract class ServiceMain {
}
}
+ KafkaAppender kafkaAppender = getKafkaAppender(context);
+ kafkaAppender.start();
+
// Attach the KafkaAppender to the root logger
+ context.getLogger(ch.qos.logback.classic.Logger.ROOT_LOGGER_NAME).addAppender(kafkaAppender);
+ }
+
+ private KafkaAppender getKafkaAppender(LoggerContext context) {
KafkaAppender kafkaAppender = new KafkaAppender();
kafkaAppender.setName("KAFKA");
kafkaAppender.setTopic(Constants.LOG_TOPIC);
@@ -223,10 +233,15 @@ public abstract class ServiceMain {
kafkaAppender.setRunnableName(runnableName);
}
+ kafkaAppender.addFilter(new Filter<ILoggingEvent>() {
+ @Override
+ public FilterReply decide(ILoggingEvent event) {
+ return event.getLoggerName().startsWith("kafka.") ? FilterReply.DENY : FilterReply.ACCEPT;
+ }
+ });
kafkaAppender.setContext(context);
- kafkaAppender.start();
- context.getLogger(ch.qos.logback.classic.Logger.ROOT_LOGGER_NAME).addAppender(kafkaAppender);
+ return kafkaAppender;
}
/**