You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@twill.apache.org by ch...@apache.org on 2014/04/22 08:06:23 UTC
[26/50] [abbrv] git commit: [TWILL-43] Fix Kafka log appender to
retry on publish failure.
[TWILL-43] Fix Kafka log appender to retry on publish failure.
Signed-off-by: Terence Yim <te...@continuuity.com>
Project: http://git-wip-us.apache.org/repos/asf/incubator-twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-twill/commit/146740b1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-twill/tree/146740b1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-twill/diff/146740b1
Branch: refs/heads/site
Commit: 146740b1289b7610c3eaa3c07cdaf322cea8e97a
Parents: 9f66b86
Author: Terence Yim <te...@continuuity.com>
Authored: Tue Feb 11 14:07:19 2014 -0800
Committer: Terence Yim <te...@continuuity.com>
Committed: Wed Feb 12 14:21:52 2014 -0800
----------------------------------------------------------------------
.../apache/twill/internal/json/JsonUtils.java | 2 +-
.../kafka/client/SimpleKafkaPublisher.java | 14 ++-
.../twill/internal/logging/KafkaAppender.java | 114 ++++++++++++++-----
.../apache/twill/yarn/LogHandlerTestRun.java | 82 +++++++++++++
.../org/apache/twill/yarn/YarnTestSuite.java | 3 +-
5 files changed, 177 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/146740b1/twill-core/src/main/java/org/apache/twill/internal/json/JsonUtils.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/json/JsonUtils.java b/twill-core/src/main/java/org/apache/twill/internal/json/JsonUtils.java
index 9556ad8..7e6a11b 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/json/JsonUtils.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/json/JsonUtils.java
@@ -33,7 +33,7 @@ public final class JsonUtils {
*/
public static String getAsString(JsonObject json, String property) {
JsonElement jsonElement = json.get(property);
- if (jsonElement.isJsonNull()) {
+ if (jsonElement == null || jsonElement.isJsonNull()) {
return null;
}
if (jsonElement.isJsonPrimitive()) {
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/146740b1/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaPublisher.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaPublisher.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaPublisher.java
index 97b14c6..302a6a4 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaPublisher.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaPublisher.java
@@ -90,11 +90,15 @@ final class SimpleKafkaPublisher implements KafkaPublisher {
@Override
public ListenableFuture<Integer> send() {
- int size = messages.size();
- producer.send(messages);
-
- messages.clear();
- return Futures.immediateFuture(size);
+ try {
+ int size = messages.size();
+ producer.send(messages);
+ return Futures.immediateFuture(size);
+ } catch (Exception e) {
+ return Futures.immediateFailedFuture(e);
+ } finally {
+ messages.clear();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/146740b1/twill-core/src/main/java/org/apache/twill/internal/logging/KafkaAppender.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/logging/KafkaAppender.java b/twill-core/src/main/java/org/apache/twill/internal/logging/KafkaAppender.java
index 9a22bea..8345865 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/logging/KafkaAppender.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/logging/KafkaAppender.java
@@ -27,11 +27,14 @@ import ch.qos.logback.classic.spi.StackTraceElementProxy;
import ch.qos.logback.core.AppenderBase;
import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
import com.google.common.base.Throwables;
import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.Service;
import com.google.gson.stream.JsonWriter;
import org.apache.twill.common.Services;
import org.apache.twill.common.Threads;
@@ -48,11 +51,16 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.StringWriter;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
@@ -141,22 +149,24 @@ public final class KafkaAppender extends AppenderBase<ILoggingEvent> {
RetryStrategies.fixDelay(1, TimeUnit.SECONDS))));
kafkaClient = new ZKKafkaClientService(zkClientService);
- Futures.addCallback(Services.chainStart(zkClientService, kafkaClient), new FutureCallback<Object>() {
+ Futures.addCallback(Services.chainStart(zkClientService, kafkaClient),
+ new FutureCallback<List<ListenableFuture<Service.State>>>() {
@Override
- public void onSuccess(Object result) {
+ public void onSuccess(List<ListenableFuture<Service.State>> result) {
+ for (ListenableFuture<Service.State> future : result) {
+ Preconditions.checkState(Futures.getUnchecked(future) == Service.State.RUNNING,
+ "Service is not running.");
+ }
LOG.info("Kafka client started: " + zkConnectStr);
- KafkaPublisher.Preparer preparer = kafkaClient.getPublisher(KafkaPublisher.Ack.LEADER_RECEIVED,
- Compression.SNAPPY).prepare(topic);
- publisher.set(preparer);
scheduler.scheduleWithFixedDelay(flushTask, 0, flushPeriod, TimeUnit.MILLISECONDS);
}
@Override
public void onFailure(Throwable t) {
// Fail to talk to kafka. Other than logging, what can be done?
- LOG.error("Failed to start kafka client.", t);
+ LOG.error("Failed to start kafka appender.", t);
}
- });
+ }, Threads.SAME_THREAD_EXECUTOR);
super.start();
}
@@ -170,7 +180,7 @@ public final class KafkaAppender extends AppenderBase<ILoggingEvent> {
public void forceFlush() {
try {
- publishLogs().get(2, TimeUnit.SECONDS);
+ publishLogs(2, TimeUnit.SECONDS);
} catch (Exception e) {
LOG.error("Failed to publish last batch of log.", e);
}
@@ -185,24 +195,71 @@ public final class KafkaAppender extends AppenderBase<ILoggingEvent> {
}
}
- private ListenableFuture<Integer> publishLogs() {
- // If the publisher is not available, simply returns a completed future.
- KafkaPublisher.Preparer publisher = KafkaAppender.this.publisher.get();
- if (publisher == null) {
- return Futures.immediateFuture(0);
- }
+ /**
+ * Publishes buffered logs to Kafka, within the given timeout.
+ *
+ * @return Number of logs published.
+ * @throws TimeoutException If timeout reached before publish completed.
+ */
+ private int publishLogs(long timeout, TimeUnit timeoutUnit) throws TimeoutException {
+ List<ByteBuffer> logs = Lists.newArrayListWithExpectedSize(bufferedSize.get());
- int count = 0;
for (String json : Iterables.consumingIterable(buffer)) {
- publisher.add(Charsets.UTF_8.encode(json), 0);
- count++;
+ logs.add(Charsets.UTF_8.encode(json));
+ }
+
+ long backOffTime = timeoutUnit.toNanos(timeout) / 10;
+ if (backOffTime <= 0) {
+ backOffTime = 1;
+ }
+
+ try {
+ Stopwatch stopwatch = new Stopwatch();
+ stopwatch.start();
+ long publishTimeout = timeout;
+
+ do {
+ try {
+ int published = doPublishLogs(logs).get(publishTimeout, timeoutUnit);
+ bufferedSize.addAndGet(-published);
+ return published;
+ } catch (ExecutionException e) {
+ LOG.error("Failed to publish logs to Kafka.", e);
+ TimeUnit.NANOSECONDS.sleep(backOffTime);
+ publishTimeout -= stopwatch.elapsedTime(timeoutUnit);
+ stopwatch.reset();
+ stopwatch.start();
+ }
+ } while (publishTimeout > 0);
+ } catch (InterruptedException e) {
+ LOG.warn("Logs publish to Kafka interrupted.", e);
}
+ return 0;
+ }
+
+ private ListenableFuture<Integer> doPublishLogs(Collection <ByteBuffer> logs) {
// Nothing to publish, simply returns a completed future.
- if (count == 0) {
+ if (logs.isEmpty()) {
return Futures.immediateFuture(0);
}
- bufferedSize.set(0);
+ // If the publisher is not available, tries to create one.
+ KafkaPublisher.Preparer publisher = KafkaAppender.this.publisher.get();
+ if (publisher == null) {
+ try {
+ KafkaPublisher.Preparer preparer = kafkaClient.getPublisher(KafkaPublisher.Ack.LEADER_RECEIVED,
+ Compression.SNAPPY).prepare(topic);
+ KafkaAppender.this.publisher.compareAndSet(null, preparer);
+ publisher = KafkaAppender.this.publisher.get();
+ } catch (Exception e) {
+ return Futures.immediateFailedFuture(e);
+ }
+ }
+
+ for (ByteBuffer buffer : logs) {
+ publisher.add(buffer, 0);
+ }
+
return publisher.send();
}
@@ -214,19 +271,14 @@ public final class KafkaAppender extends AppenderBase<ILoggingEvent> {
return new Runnable() {
@Override
public void run() {
- Futures.addCallback(publishLogs(), new FutureCallback<Integer>() {
- @Override
- public void onSuccess(Integer result) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Log entries published, size=" + result);
- }
- }
-
- @Override
- public void onFailure(Throwable t) {
- LOG.error("Failed to push logs to kafka. Log entries dropped.", t);
+ try {
+ int published = publishLogs(2L, TimeUnit.SECONDS);
+ if (LOG.isDebugEnabled()) {
+ LOG.info("Published {} log messages to Kafka.", published);
}
- });
+ } catch (Exception e) {
+ LOG.error("Failed to push logs to Kafka. Log entries dropped.", e);
+ }
}
};
}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/146740b1/twill-yarn/src/test/java/org/apache/twill/yarn/LogHandlerTestRun.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/LogHandlerTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/LogHandlerTestRun.java
new file mode 100644
index 0000000..30a5a41
--- /dev/null
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/LogHandlerTestRun.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.yarn;
+
+import org.apache.twill.api.AbstractTwillRunnable;
+import org.apache.twill.api.TwillController;
+import org.apache.twill.api.TwillRunner;
+import org.apache.twill.api.logging.LogEntry;
+import org.apache.twill.api.logging.LogHandler;
+import org.apache.twill.common.Services;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Test for LogHandler able to receive logs from AM and runnable.
+ */
+public class LogHandlerTestRun extends BaseYarnTest {
+
+ @Test
+ public void testLogHandler() throws ExecutionException, InterruptedException {
+ final CountDownLatch latch = new CountDownLatch(2);
+
+ LogHandler logHandler = new LogHandler() {
+ @Override
+ public void onLog(LogEntry logEntry) {
+ // Would expect logs from AM and the runnable.
+ if (logEntry.getMessage().startsWith("Starting runnable " + LogRunnable.class.getSimpleName())) {
+ latch.countDown();
+ } else if (logEntry.getMessage().equals("Running")) {
+ latch.countDown();
+ }
+ }
+ };
+
+ TwillRunner runner = YarnTestUtils.getTwillRunner();
+ TwillController controller = runner.prepare(new LogRunnable())
+ .addLogHandler(logHandler)
+ .start();
+
+ Services.getCompletionFuture(controller).get();
+ latch.await(1, TimeUnit.SECONDS);
+ }
+
+ /**
+ * TwillRunnable for the test case to simply emit one log line.
+ */
+ public static final class LogRunnable extends AbstractTwillRunnable {
+
+ private static final Logger LOG = LoggerFactory.getLogger(LogRunnable.class);
+
+
+ @Override
+ public void run() {
+ LOG.info("Running");
+ }
+
+ @Override
+ public void stop() {
+
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/146740b1/twill-yarn/src/test/java/org/apache/twill/yarn/YarnTestSuite.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/YarnTestSuite.java b/twill-yarn/src/test/java/org/apache/twill/yarn/YarnTestSuite.java
index 912b713..bed613a 100644
--- a/twill-yarn/src/test/java/org/apache/twill/yarn/YarnTestSuite.java
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/YarnTestSuite.java
@@ -31,7 +31,8 @@ import org.junit.runners.Suite;
DistributeShellTestRun.class,
LocalFileTestRun.class,
FailureRestartTestRun.class,
- ProvisionTimeoutTestRun.class
+ ProvisionTimeoutTestRun.class,
+ LogHandlerTestRun.class
})
public final class YarnTestSuite {