You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@twill.apache.org by ch...@apache.org on 2014/04/22 08:06:23 UTC

[26/50] [abbrv] git commit: [TWILL-43] Fix Kafka log appender to retry on publish failure.

[TWILL-43] Fix Kafka log appender to retry on publish failure.

Signed-off-by: Terence Yim <te...@continuuity.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-twill/commit/146740b1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-twill/tree/146740b1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-twill/diff/146740b1

Branch: refs/heads/site
Commit: 146740b1289b7610c3eaa3c07cdaf322cea8e97a
Parents: 9f66b86
Author: Terence Yim <te...@continuuity.com>
Authored: Tue Feb 11 14:07:19 2014 -0800
Committer: Terence Yim <te...@continuuity.com>
Committed: Wed Feb 12 14:21:52 2014 -0800

----------------------------------------------------------------------
 .../apache/twill/internal/json/JsonUtils.java   |   2 +-
 .../kafka/client/SimpleKafkaPublisher.java      |  14 ++-
 .../twill/internal/logging/KafkaAppender.java   | 114 ++++++++++++++-----
 .../apache/twill/yarn/LogHandlerTestRun.java    |  82 +++++++++++++
 .../org/apache/twill/yarn/YarnTestSuite.java    |   3 +-
 5 files changed, 177 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/146740b1/twill-core/src/main/java/org/apache/twill/internal/json/JsonUtils.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/json/JsonUtils.java b/twill-core/src/main/java/org/apache/twill/internal/json/JsonUtils.java
index 9556ad8..7e6a11b 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/json/JsonUtils.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/json/JsonUtils.java
@@ -33,7 +33,7 @@ public final class JsonUtils {
    */
   public static String getAsString(JsonObject json, String property) {
     JsonElement jsonElement = json.get(property);
-    if (jsonElement.isJsonNull()) {
+    if (jsonElement == null || jsonElement.isJsonNull()) {
       return null;
     }
     if (jsonElement.isJsonPrimitive()) {

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/146740b1/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaPublisher.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaPublisher.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaPublisher.java
index 97b14c6..302a6a4 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaPublisher.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaPublisher.java
@@ -90,11 +90,15 @@ final class SimpleKafkaPublisher implements KafkaPublisher {
 
     @Override
     public ListenableFuture<Integer> send() {
-      int size = messages.size();
-      producer.send(messages);
-
-      messages.clear();
-      return Futures.immediateFuture(size);
+      try {
+        int size = messages.size();
+        producer.send(messages);
+        return Futures.immediateFuture(size);
+      } catch (Exception e) {
+        return Futures.immediateFailedFuture(e);
+      } finally {
+        messages.clear();
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/146740b1/twill-core/src/main/java/org/apache/twill/internal/logging/KafkaAppender.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/logging/KafkaAppender.java b/twill-core/src/main/java/org/apache/twill/internal/logging/KafkaAppender.java
index 9a22bea..8345865 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/logging/KafkaAppender.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/logging/KafkaAppender.java
@@ -27,11 +27,14 @@ import ch.qos.logback.classic.spi.StackTraceElementProxy;
 import ch.qos.logback.core.AppenderBase;
 import com.google.common.base.Charsets;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
 import com.google.common.base.Throwables;
 import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.Service;
 import com.google.gson.stream.JsonWriter;
 import org.apache.twill.common.Services;
 import org.apache.twill.common.Threads;
@@ -48,11 +51,16 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.StringWriter;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.List;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -141,22 +149,24 @@ public final class KafkaAppender extends AppenderBase<ILoggingEvent> {
                                  RetryStrategies.fixDelay(1, TimeUnit.SECONDS))));
 
     kafkaClient = new ZKKafkaClientService(zkClientService);
-    Futures.addCallback(Services.chainStart(zkClientService, kafkaClient), new FutureCallback<Object>() {
+    Futures.addCallback(Services.chainStart(zkClientService, kafkaClient),
+                        new FutureCallback<List<ListenableFuture<Service.State>>>() {
       @Override
-      public void onSuccess(Object result) {
+      public void onSuccess(List<ListenableFuture<Service.State>> result) {
+        for (ListenableFuture<Service.State> future : result) {
+          Preconditions.checkState(Futures.getUnchecked(future) == Service.State.RUNNING,
+                                   "Service is not running.");
+        }
         LOG.info("Kafka client started: " + zkConnectStr);
-        KafkaPublisher.Preparer preparer = kafkaClient.getPublisher(KafkaPublisher.Ack.LEADER_RECEIVED,
-                                                                    Compression.SNAPPY).prepare(topic);
-        publisher.set(preparer);
         scheduler.scheduleWithFixedDelay(flushTask, 0, flushPeriod, TimeUnit.MILLISECONDS);
       }
 
       @Override
       public void onFailure(Throwable t) {
         // Fail to talk to kafka. Other than logging, what can be done?
-        LOG.error("Failed to start kafka client.", t);
+        LOG.error("Failed to start kafka appender.", t);
       }
-    });
+    }, Threads.SAME_THREAD_EXECUTOR);
 
     super.start();
   }
@@ -170,7 +180,7 @@ public final class KafkaAppender extends AppenderBase<ILoggingEvent> {
 
   public void forceFlush() {
     try {
-      publishLogs().get(2, TimeUnit.SECONDS);
+      publishLogs(2, TimeUnit.SECONDS);
     } catch (Exception e) {
       LOG.error("Failed to publish last batch of log.", e);
     }
@@ -185,24 +195,71 @@ public final class KafkaAppender extends AppenderBase<ILoggingEvent> {
     }
   }
 
-  private ListenableFuture<Integer> publishLogs() {
-    // If the publisher is not available, simply returns a completed future.
-    KafkaPublisher.Preparer publisher = KafkaAppender.this.publisher.get();
-    if (publisher == null) {
-      return Futures.immediateFuture(0);
-    }
+  /**
+   * Publishes buffered logs to Kafka, within the given timeout.
+   *
+   * @return Number of logs published.
+   * @throws TimeoutException If timeout reached before publish completed.
+   */
+  private int publishLogs(long timeout, TimeUnit timeoutUnit) throws TimeoutException {
+    List<ByteBuffer> logs = Lists.newArrayListWithExpectedSize(bufferedSize.get());
 
-    int count = 0;
     for (String json : Iterables.consumingIterable(buffer)) {
-      publisher.add(Charsets.UTF_8.encode(json), 0);
-      count++;
+      logs.add(Charsets.UTF_8.encode(json));
+    }
+
+    long backOffTime = timeoutUnit.toNanos(timeout) / 10;
+    if (backOffTime <= 0) {
+      backOffTime = 1;
+    }
+
+    try {
+      Stopwatch stopwatch = new Stopwatch();
+      stopwatch.start();
+      long publishTimeout = timeout;
+
+      do {
+        try {
+          int published = doPublishLogs(logs).get(publishTimeout, timeoutUnit);
+          bufferedSize.addAndGet(-published);
+          return published;
+        } catch (ExecutionException e) {
+          LOG.error("Failed to publish logs to Kafka.", e);
+          TimeUnit.NANOSECONDS.sleep(backOffTime);
+          publishTimeout -= stopwatch.elapsedTime(timeoutUnit);
+          stopwatch.reset();
+          stopwatch.start();
+        }
+      } while (publishTimeout > 0);
+    } catch (InterruptedException e) {
+      LOG.warn("Logs publish to Kafka interrupted.", e);
     }
+    return 0;
+  }
+
+  private ListenableFuture<Integer> doPublishLogs(Collection <ByteBuffer> logs) {
     // Nothing to publish, simply returns a completed future.
-    if (count == 0) {
+    if (logs.isEmpty()) {
       return Futures.immediateFuture(0);
     }
 
-    bufferedSize.set(0);
+    // If the publisher is not available, tries to create one.
+    KafkaPublisher.Preparer publisher = KafkaAppender.this.publisher.get();
+    if (publisher == null) {
+      try {
+        KafkaPublisher.Preparer preparer = kafkaClient.getPublisher(KafkaPublisher.Ack.LEADER_RECEIVED,
+                                                                    Compression.SNAPPY).prepare(topic);
+        KafkaAppender.this.publisher.compareAndSet(null, preparer);
+        publisher = KafkaAppender.this.publisher.get();
+      } catch (Exception e) {
+        return Futures.immediateFailedFuture(e);
+      }
+    }
+
+    for (ByteBuffer buffer : logs) {
+      publisher.add(buffer, 0);
+    }
+
     return publisher.send();
   }
 
@@ -214,19 +271,14 @@ public final class KafkaAppender extends AppenderBase<ILoggingEvent> {
     return new Runnable() {
       @Override
       public void run() {
-        Futures.addCallback(publishLogs(), new FutureCallback<Integer>() {
-          @Override
-          public void onSuccess(Integer result) {
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("Log entries published, size=" + result);
-            }
-          }
-
-          @Override
-          public void onFailure(Throwable t) {
-            LOG.error("Failed to push logs to kafka. Log entries dropped.", t);
+        try {
+          int published = publishLogs(2L, TimeUnit.SECONDS);
+          if (LOG.isDebugEnabled()) {
+            LOG.info("Published {} log messages to Kafka.", published);
           }
-        });
+        } catch (Exception e) {
+          LOG.error("Failed to push logs to Kafka. Log entries dropped.", e);
+        }
       }
     };
   }

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/146740b1/twill-yarn/src/test/java/org/apache/twill/yarn/LogHandlerTestRun.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/LogHandlerTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/LogHandlerTestRun.java
new file mode 100644
index 0000000..30a5a41
--- /dev/null
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/LogHandlerTestRun.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.yarn;
+
+import org.apache.twill.api.AbstractTwillRunnable;
+import org.apache.twill.api.TwillController;
+import org.apache.twill.api.TwillRunner;
+import org.apache.twill.api.logging.LogEntry;
+import org.apache.twill.api.logging.LogHandler;
+import org.apache.twill.common.Services;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Test for LogHandler able to receive logs from AM and runnable.
+ */
+public class LogHandlerTestRun extends BaseYarnTest {
+
+  @Test
+  public void testLogHandler() throws ExecutionException, InterruptedException {
+    final CountDownLatch latch = new CountDownLatch(2);
+
+    LogHandler logHandler = new LogHandler() {
+      @Override
+      public void onLog(LogEntry logEntry) {
+        // Would expect logs from AM and the runnable.
+        if (logEntry.getMessage().startsWith("Starting runnable " + LogRunnable.class.getSimpleName())) {
+          latch.countDown();
+        } else if (logEntry.getMessage().equals("Running")) {
+          latch.countDown();
+        }
+      }
+    };
+
+    TwillRunner runner = YarnTestUtils.getTwillRunner();
+    TwillController controller = runner.prepare(new LogRunnable())
+                                       .addLogHandler(logHandler)
+                                       .start();
+
+    Services.getCompletionFuture(controller).get();
+    latch.await(1, TimeUnit.SECONDS);
+  }
+
+  /**
+   * TwillRunnable for the test case to simply emit one log line.
+   */
+  public static final class LogRunnable extends AbstractTwillRunnable {
+
+    private static final Logger LOG = LoggerFactory.getLogger(LogRunnable.class);
+
+
+    @Override
+    public void run() {
+      LOG.info("Running");
+    }
+
+    @Override
+    public void stop() {
+
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/146740b1/twill-yarn/src/test/java/org/apache/twill/yarn/YarnTestSuite.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/YarnTestSuite.java b/twill-yarn/src/test/java/org/apache/twill/yarn/YarnTestSuite.java
index 912b713..bed613a 100644
--- a/twill-yarn/src/test/java/org/apache/twill/yarn/YarnTestSuite.java
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/YarnTestSuite.java
@@ -31,7 +31,8 @@ import org.junit.runners.Suite;
                       DistributeShellTestRun.class,
                       LocalFileTestRun.class,
                       FailureRestartTestRun.class,
-                      ProvisionTimeoutTestRun.class
+                      ProvisionTimeoutTestRun.class,
+                      LogHandlerTestRun.class
                     })
 public final class YarnTestSuite {