You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by sz...@apache.org on 2018/10/15 09:19:55 UTC

flume git commit: FLUME-3223 Flume HDFS Sink should retry close prior recover lease

Repository: flume
Updated Branches:
  refs/heads/trunk c5168c902 -> 327a43dbe


FLUME-3223 Flume HDFS Sink should retry close prior recover lease

This is based on @mcsanady 's original pull request #202
I took the test changes from him but reworked the new feature implementation
since it failed some unit tests.
Previously when a close failed we immediately did a recover lease.
This PR introduces a background retry mechanism. It uses the already
existing "hdfs.closeTries" parameter. Unfortunately it has infinite retries
by default, that seems a bit too long for me.

I also did a minimal code clean up. The most important is that
HDFSWriter writer in BucketWriter became final. This is essential for later use
in inner classes. Only some testing solutions made it not final.
I reworked those to use the constructor.

This closes #229
This closes #202

Reviewers: Peter Turcsanyi, Ferenc Szabo

(Endre Major via Ferenc Szabo)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/327a43db
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/327a43db
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/327a43db

Branch: refs/heads/trunk
Commit: 327a43dbefab57dc5d2da3ca9b7bb36ae4f743b7
Parents: c5168c9
Author: Endre Major <em...@cloudera.com>
Authored: Mon Oct 15 11:18:41 2018 +0200
Committer: Ferenc Szabo <sz...@apache.org>
Committed: Mon Oct 15 11:18:41 2018 +0200

----------------------------------------------------------------------
 .../apache/flume/sink/hdfs/BucketWriter.java    | 143 +++++++++++--------
 .../apache/flume/sink/hdfs/HDFSEventSink.java   |   8 +-
 .../apache/flume/sink/hdfs/MockHDFSWriter.java  |  30 +++-
 .../flume/sink/hdfs/TestBucketWriter.java       |  56 +++++++-
 .../hdfs/TestHDFSEventSinkOnMiniCluster.java    |   2 +
 5 files changed, 157 insertions(+), 82 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/327a43db/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
index 2462d8a..c3941a6 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
@@ -68,7 +68,7 @@ class BucketWriter {
   private static final Integer staticLock = new Integer(1);
   private Method isClosedMethod = null;
 
-  private HDFSWriter writer;
+  private final HDFSWriter writer;
   private final long rollInterval;
   private final long rollSize;
   private final long rollCount;
@@ -109,7 +109,7 @@ class BucketWriter {
   private boolean mockFsInjected = false;
 
   private final long retryInterval;
-  private final int maxRenameTries;
+  private final int maxRetries;
 
   // flag that the bucket writer was closed due to idling and thus shouldn't be
   // reopened. Not ideal, but avoids internals of owners
@@ -124,7 +124,7 @@ class BucketWriter {
       SinkCounter sinkCounter, int idleTimeout, WriterCallback onCloseCallback,
       String onCloseCallbackPath, long callTimeout,
       ExecutorService callTimeoutPool, long retryInterval,
-      int maxCloseTries) {
+      int maxRetries) {
     this(rollInterval, rollSize, rollCount, batchSize,
             context, filePath, fileName, inUsePrefix,
             inUseSuffix, fileSuffix, codeC,
@@ -133,7 +133,7 @@ class BucketWriter {
             sinkCounter, idleTimeout, onCloseCallback,
             onCloseCallbackPath, callTimeout,
             callTimeoutPool, retryInterval,
-            maxCloseTries, new SystemClock());
+            maxRetries, new SystemClock());
   }
 
   BucketWriter(long rollInterval, long rollSize, long rollCount, long batchSize,
@@ -144,7 +144,7 @@ class BucketWriter {
            SinkCounter sinkCounter, int idleTimeout, WriterCallback onCloseCallback,
            String onCloseCallbackPath, long callTimeout,
            ExecutorService callTimeoutPool, long retryInterval,
-           int maxCloseTries, Clock clock) {
+           int maxRetries, Clock clock) {
     this.rollInterval = rollInterval;
     this.rollSize = rollSize;
     this.rollCount = rollCount;
@@ -168,7 +168,7 @@ class BucketWriter {
     fileExtensionCounter = new AtomicLong(clock.currentTimeMillis());
 
     this.retryInterval = retryInterval;
-    this.maxRenameTries = maxCloseTries;
+    this.maxRetries = maxRetries;
     isOpen = false;
     isUnderReplicated = false;
     this.writer.configure(context);
@@ -180,12 +180,6 @@ class BucketWriter {
     mockFsInjected = true;
   }
 
-  @VisibleForTesting
-  void setMockStream(HDFSWriter dataWriter) {
-    this.writer = dataWriter;
-  }
-
-
   /**
    * Clear the class counters
    */
@@ -313,51 +307,87 @@ class BucketWriter {
    * method will not cause the bucket writer to be dereferenced from the HDFS
    * sink that owns it. This method should be used only when size or count
    * based rolling closes this file.
-   * @throws IOException On failure to rename if temp file exists.
-   * @throws InterruptedException
    */
-  public void close() throws IOException, InterruptedException {
+  public void close() throws InterruptedException {
     close(false);
   }
 
   private CallRunner<Void> createCloseCallRunner() {
     return new CallRunner<Void>() {
-      private final HDFSWriter localWriter = writer;
       @Override
       public Void call() throws Exception {
-        localWriter.close(); // could block
+        writer.close(); // could block
         return null;
       }
     };
   }
 
-  private Callable<Void> createScheduledRenameCallable() {
+  private class CloseHandler implements Callable<Void> {
+    private final String path = bucketPath;
+    private int closeTries = 0;
 
-    return new Callable<Void>() {
-      private final String path = bucketPath;
-      private final String finalPath = targetPath;
-      private FileSystem fs = fileSystem;
-      private int renameTries = 1; // one attempt is already done
+    @Override
+    public Void call() throws Exception {
+      close(false);
+      return null;
+    }
 
-      @Override
-      public Void call() throws Exception {
-        if (renameTries >= maxRenameTries) {
-          LOG.warn("Unsuccessfully attempted to rename " + path + " " +
-              maxRenameTries + " times. File may still be open.");
-          return null;
+    /**
+     * Tries to close the writer. Repeats the close if the maximum number
+     * of retries is not reached or an immediate close is not reuqested.
+     * If all close attempts were unsuccessful we try to recover the lease.
+     * @param immediate An immediate close is required
+     */
+    public void close(boolean immediate) {
+      closeTries++;
+      boolean shouldRetry = closeTries < maxRetries && !immediate;
+      try {
+        callWithTimeout(createCloseCallRunner());
+        sinkCounter.incrementConnectionClosedCount();
+      } catch (InterruptedException | IOException e) {
+        LOG.warn("Closing file: " + path + " failed. Will " +
+            "retry again in " + retryInterval + " seconds.", e);
+        if (timedRollerPool != null && !timedRollerPool.isTerminated()) {
+          if (shouldRetry) {
+            timedRollerPool.schedule(this, retryInterval, TimeUnit.SECONDS);
+          }
+        } else {
+          LOG.warn("Cannot retry close any more timedRollerPool is null or terminated");
         }
-        renameTries++;
-        try {
-          renameBucket(path, finalPath, fs);
-        } catch (Exception e) {
-          LOG.warn("Renaming file: " + path + " failed. Will " +
-              "retry again in " + retryInterval + " seconds.", e);
-          timedRollerPool.schedule(this, retryInterval, TimeUnit.SECONDS);
-          return null;
+        if (!shouldRetry) {
+          LOG.warn("Unsuccessfully attempted to close " + path + " " +
+                  maxRetries + " times. Initializing lease recovery.");
+          sinkCounter.incrementConnectionFailedCount();
+          recoverLease();
         }
+      }
+    }
+  }
+
+  private class ScheduledRenameCallable implements Callable<Void> {
+    private final String path = bucketPath;
+    private final String finalPath = targetPath;
+    private FileSystem fs = fileSystem;
+    private int renameTries = 1; // one attempt is already done
+
+    @Override
+    public Void call() throws Exception {
+      if (renameTries >= maxRetries) {
+        LOG.warn("Unsuccessfully attempted to rename " + path + " " +
+                maxRetries + " times. File may still be open.");
         return null;
       }
-    };
+      renameTries++;
+      try {
+        renameBucket(path, finalPath, fs);
+      } catch (Exception e) {
+        LOG.warn("Renaming file: " + path + " failed. Will " +
+            "retry again in " + retryInterval + " seconds.", e);
+        timedRollerPool.schedule(this, retryInterval, TimeUnit.SECONDS);
+        return null;
+      }
+      return null;
+    }
   }
 
   /**
@@ -376,14 +406,16 @@ class BucketWriter {
     }
   }
 
+  public void close(boolean callCloseCallback) throws InterruptedException {
+    close(callCloseCallback, false);
+  }
+
   /**
    * Close the file handle and rename the temp file to the permanent filename.
    * Safe to call multiple times. Logs HDFSWriter.close() exceptions.
-   * @throws IOException On failure to rename if temp file exists.
-   * @throws InterruptedException
    */
-  public void close(boolean callCloseCallback)
-      throws IOException, InterruptedException {
+  public void close(boolean callCloseCallback, boolean immediate)
+      throws InterruptedException {
     if (callCloseCallback) {
       if (closed.compareAndSet(false, true)) {
         runCloseAction(); //remove from the cache as soon as possible
@@ -391,11 +423,11 @@ class BucketWriter {
         LOG.warn("This bucketWriter is already closing or closed.");
       }
     }
-    doClose();
+    doClose(immediate);
   }
 
-  private synchronized void doClose()
-      throws IOException, InterruptedException {
+  private synchronized void doClose(boolean immediate)
+      throws InterruptedException {
     checkAndThrowInterruptedException();
     try {
       flush();
@@ -404,18 +436,8 @@ class BucketWriter {
     }
 
     LOG.info("Closing {}", bucketPath);
-    CallRunner<Void> closeCallRunner = createCloseCallRunner();
     if (isOpen) {
-      try {
-        callWithTimeout(closeCallRunner);
-        sinkCounter.incrementConnectionClosedCount();
-      } catch (IOException e) {
-        LOG.warn("failed to close() HDFSWriter for file (" + bucketPath +
-                 "). Exception follows.", e);
-        sinkCounter.incrementConnectionFailedCount();
-        // starting lease recovery process, see FLUME-3080
-        recoverLease();
-      }
+      new CloseHandler().close(immediate);
       isOpen = false;
     } else {
       LOG.info("HDFSWriter is already closed: {}", bucketPath);
@@ -440,7 +462,7 @@ class BucketWriter {
         LOG.warn("failed to rename() file (" + bucketPath +
                  "). Exception follows.", e);
         sinkCounter.incrementConnectionFailedCount();
-        final Callable<Void> scheduledRename = createScheduledRenameCallable();
+        final Callable<Void> scheduledRename =  new ScheduledRenameCallable();
         timedRollerPool.schedule(scheduledRename, retryInterval, TimeUnit.SECONDS);
       }
     }
@@ -591,12 +613,7 @@ class BucketWriter {
       LOG.warn("Caught IOException writing to HDFSWriter ({}). Closing file (" +
           bucketPath + ") and rethrowing exception.",
           e.getMessage());
-      try {
-        close(true);
-      } catch (IOException e2) {
-        LOG.warn("Caught IOException while closing file (" +
-             bucketPath + "). Exception follows.", e2);
-      }
+      close(true);
       throw e;
     }
 

http://git-wip-us.apache.org/repos/asf/flume/blob/327a43db/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
index 8189ca8..a300996 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
@@ -160,8 +160,6 @@ public class HDFSEventSink extends AbstractSink implements Configurable, BatchSi
         // return true
         try {
           eldest.getValue().close();
-        } catch (IOException e) {
-          LOG.warn(eldest.getKey().toString(), e);
         } catch (InterruptedException e) {
           LOG.warn(eldest.getKey().toString(), e);
           Thread.currentThread().interrupt();
@@ -462,16 +460,16 @@ public class HDFSEventSink extends AbstractSink implements Configurable, BatchSi
   BucketWriter initializeBucketWriter(String realPath,
       String realName, String lookupPath, HDFSWriter hdfsWriter,
       WriterCallback closeCallback) {
+    HDFSWriter actualHdfsWriter = mockFs == null ? hdfsWriter : mockWriter;
     BucketWriter bucketWriter = new BucketWriter(rollInterval,
         rollSize, rollCount,
         batchSize, context, realPath, realName, inUsePrefix, inUseSuffix,
-        suffix, codeC, compType, hdfsWriter, timedRollerPool,
+        suffix, codeC, compType, actualHdfsWriter, timedRollerPool,
         privExecutor, sinkCounter, idleTimeout, closeCallback,
         lookupPath, callTimeout, callTimeoutPool, retryInterval,
         tryCount);
     if (mockFs != null) {
       bucketWriter.setFileSystem(mockFs);
-      bucketWriter.setMockStream(mockWriter);
     }
     return bucketWriter;
   }
@@ -484,7 +482,7 @@ public class HDFSEventSink extends AbstractSink implements Configurable, BatchSi
         LOG.info("Closing {}", entry.getKey());
 
         try {
-          entry.getValue().close();
+          entry.getValue().close(false, true);
         } catch (Exception ex) {
           LOG.warn("Exception while closing " + entry.getKey() + ". " +
                   "Exception follows.", ex);

http://git-wip-us.apache.org/repos/asf/flume/blob/327a43db/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockHDFSWriter.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockHDFSWriter.java b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockHDFSWriter.java
index 05c4316..c57496b 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockHDFSWriter.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockHDFSWriter.java
@@ -19,10 +19,14 @@
 package org.apache.flume.sink.hdfs;
 
 import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
+
 import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.io.compress.CompressionCodec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class MockHDFSWriter implements HDFSWriter {
 
@@ -31,6 +35,17 @@ public class MockHDFSWriter implements HDFSWriter {
   private int bytesWritten = 0;
   private int eventsWritten = 0;
   private String filePath = null;
+  private static final Logger logger = LoggerFactory.getLogger(MockHDFSWriter.class);
+  private int numberOfRetriesRequired;
+  public volatile AtomicInteger currentCloseAttempts = new AtomicInteger(0);
+
+  public MockHDFSWriter(int numberOfRetriesRequired) {
+    this.numberOfRetriesRequired = numberOfRetriesRequired;
+  }
+
+  public MockHDFSWriter() {
+    this.numberOfRetriesRequired = 0;
+  }
 
   public int getFilesOpened() {
     return filesOpened;
@@ -52,13 +67,6 @@ public class MockHDFSWriter implements HDFSWriter {
     return filePath;
   }
 
-  public void clear() {
-    filesOpened = 0;
-    filesClosed = 0;
-    bytesWritten = 0;
-    eventsWritten = 0;
-  }
-
   public void configure(Context context) {
     // no-op
   }
@@ -85,6 +93,14 @@ public class MockHDFSWriter implements HDFSWriter {
 
   public void close() throws IOException {
     filesClosed++;
+    int curr = currentCloseAttempts.incrementAndGet();
+    logger.info("Attempting to close: '" + currentCloseAttempts + "' of '" +
+        numberOfRetriesRequired + "'");
+    if (curr >= numberOfRetriesRequired || numberOfRetriesRequired == 0) {
+      logger.info("closing file");
+    } else {
+      throw new IOException("MockIOException");
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/flume/blob/327a43db/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java
index f96aa96..64a89e1 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java
@@ -392,16 +392,22 @@ public class TestBucketWriter {
 
   @Test
   public void testSequenceFileRenameRetries() throws Exception {
-    SequenceFileRenameRetryCoreTest(1, true);
-    SequenceFileRenameRetryCoreTest(5, true);
-    SequenceFileRenameRetryCoreTest(2, true);
+    sequenceFileRenameRetryCoreTest(1, true);
+    sequenceFileRenameRetryCoreTest(5, true);
+    sequenceFileRenameRetryCoreTest(2, true);
 
-    SequenceFileRenameRetryCoreTest(1, false);
-    SequenceFileRenameRetryCoreTest(5, false);
-    SequenceFileRenameRetryCoreTest(2, false);
+    sequenceFileRenameRetryCoreTest(1, false);
+    sequenceFileRenameRetryCoreTest(5, false);
+    sequenceFileRenameRetryCoreTest(2, false);
   }
 
-  public void SequenceFileRenameRetryCoreTest(int numberOfRetriesRequired, boolean closeSucceed)
+  @Test
+  public void testSequenceFileCloseRetries() throws Exception {
+    sequenceFileCloseRetryCoreTest(5);
+    sequenceFileCloseRetryCoreTest(1);
+  }
+
+  public void sequenceFileRenameRetryCoreTest(int numberOfRetriesRequired, boolean closeSucceed)
       throws Exception {
     String hdfsPath = "file:///tmp/flume-test." +
                       Calendar.getInstance().getTimeInMillis() +
@@ -447,6 +453,42 @@ public class TestBucketWriter {
                       bucketWriter.renameTries.get() == numberOfRetriesRequired);
   }
 
+  private void sequenceFileCloseRetryCoreTest(int numberOfRetriesRequired)
+      throws Exception {
+    String hdfsPath = "file:///tmp/flume-test." +
+        Calendar.getInstance().getTimeInMillis() +
+        "." + Thread.currentThread().getId();
+
+    Context context = new Context();
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.get(conf);
+    Path dirPath = new Path(hdfsPath);
+    fs.delete(dirPath, true);
+    fs.mkdirs(dirPath);
+    context.put("hdfs.path", hdfsPath);
+    context.put("hdfs.closeTries", String.valueOf(numberOfRetriesRequired));
+    context.put("hdfs.rollCount", "1");
+    context.put("hdfs.retryInterval", "1");
+    context.put("hdfs.callTimeout", Long.toString(1000));
+    MockHDFSWriter mockHDFSWriter = new MockHDFSWriter(Integer.MAX_VALUE);
+    ExecutorService executorService = Executors.newSingleThreadExecutor();
+    BucketWriter bucketWriter = new BucketWriter(
+        0, 0, 1, 1, ctx, hdfsPath, hdfsPath, "singleBucket", ".tmp", null, null,
+        null, mockHDFSWriter, timedRollerPool, proxy,
+        new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), 0, null, null, 30000,
+        executorService, 1, numberOfRetriesRequired);
+
+    Event event = EventBuilder.withBody("test", Charsets.UTF_8);
+    bucketWriter.append(event);
+    bucketWriter.close(false);
+    TimeUnit.SECONDS.sleep(numberOfRetriesRequired + 2);
+    Assert.assertEquals("ExcceutorService should be empty",
+        executorService.shutdownNow().size(), 0);
+    Assert.assertEquals("Expected " + numberOfRetriesRequired + " " +
+        "but got " + mockHDFSWriter.currentCloseAttempts,
+        mockHDFSWriter.currentCloseAttempts.get(), numberOfRetriesRequired);
+  }
+
   // Test that we don't swallow IOExceptions in secure mode. We should close the bucket writer
   // and rethrow the exception. Regression test for FLUME-3049.
   @Test

http://git-wip-us.apache.org/repos/asf/flume/blob/327a43db/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSinkOnMiniCluster.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSinkOnMiniCluster.java b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSinkOnMiniCluster.java
index 9400c9c..6dfac8a 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSinkOnMiniCluster.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSinkOnMiniCluster.java
@@ -293,6 +293,7 @@ public class TestHDFSEventSinkOnMiniCluster {
     sinkCtx.put("hdfs.path", nnURL + outputDir);
     sinkCtx.put("hdfs.fileType", HDFSWriterFactory.DataStreamType);
     sinkCtx.put("hdfs.batchSize", Integer.toString(1));
+    sinkCtx.put("hdfs.retryInterval", "10"); //to speed up test
 
     HDFSEventSink sink = new HDFSEventSink();
     sink.setName("simpleHDFSTest-hdfs-sink");
@@ -531,6 +532,7 @@ public class TestHDFSEventSinkOnMiniCluster {
     ctx.put("hdfs.fileType", HDFSWriterFactory.DataStreamType);
     ctx.put("hdfs.batchSize", Integer.toString(1));
     ctx.put("hdfs.callTimeout", Integer.toString(1000));
+    ctx.put("hdfs.retryInterval", "10"); //to speed up test
 
     HDFSWriter hdfsWriter = new HDFSDataStream() {
       @Override