You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by hs...@apache.org on 2013/07/17 00:13:29 UTC

git commit: FLUME-2007. HDFS Sink should check if file is closed and retry if it is not.

Updated Branches:
  refs/heads/trunk a34cdb0ea -> 5b5470bd5


FLUME-2007. HDFS Sink should check if file is closed and retry if it is not.

(Ted Malaska via Hari Shreedharan)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/5b5470bd
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/5b5470bd
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/5b5470bd

Branch: refs/heads/trunk
Commit: 5b5470bd5d3e94842032009c36788d4ae346674b
Parents: a34cdb0
Author: Hari Shreedharan <hs...@apache.org>
Authored: Tue Jul 16 15:12:11 2013 -0700
Committer: Hari Shreedharan <hs...@apache.org>
Committed: Tue Jul 16 15:12:59 2013 -0700

----------------------------------------------------------------------
 .../flume/sink/hdfs/AbstractHDFSWriter.java     |  64 +++++++++
 .../sink/hdfs/HDFSCompressedDataStream.java     |   2 +-
 .../apache/flume/sink/hdfs/HDFSDataStream.java  |   2 +-
 .../flume/sink/hdfs/HDFSSequenceFile.java       |   8 +-
 .../hdfs/MockFileSystemCloseRetryWrapper.java   | 142 +++++++++++++++++++
 ...MockFsDataOutputStreamCloseRetryWrapper.java |  73 ++++++++++
 .../sink/hdfs/TestUseRawLocalFileSystem.java    |  52 +++++++
 7 files changed, 340 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/5b5470bd/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/AbstractHDFSWriter.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/AbstractHDFSWriter.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/AbstractHDFSWriter.java
index bc3b383..da0466d 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/AbstractHDFSWriter.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/AbstractHDFSWriter.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.io.OutputStream;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
@@ -44,6 +45,8 @@ public abstract class AbstractHDFSWriter implements HDFSWriter {
   private Method refGetNumCurrentReplicas = null;
   private Method refGetDefaultReplication = null;
   private Integer configuredMinReplicas = null;
+  private Integer numberOfCloseRetries = null;
+  private long timeBetweenCloseRetries = Long.MAX_VALUE;
 
   final static Object [] NO_ARGS = new Object []{};
 
@@ -54,6 +57,17 @@ public abstract class AbstractHDFSWriter implements HDFSWriter {
       Preconditions.checkArgument(configuredMinReplicas >= 0,
           "hdfs.minBlockReplicas must be greater than or equal to 0");
     }
+    numberOfCloseRetries = context.getInteger("hdfs.closeTries", 1) - 1;
+
+    if (numberOfCloseRetries > 1) {
+      try {
+        timeBetweenCloseRetries = context.getLong("hdfs.callTimeout", 10000l);
+      } catch (NumberFormatException e) {
+        logger.warn("hdfs.callTimeout can not be parsed to a long: " + context.getLong("hdfs.callTimeout"));
+      }
+      timeBetweenCloseRetries = Math.max(timeBetweenCloseRetries/numberOfCloseRetries, 1000);
+    }
+
   }
 
   /**
@@ -97,6 +111,7 @@ public abstract class AbstractHDFSWriter implements HDFSWriter {
     this.destPath = destPath;
     this.refGetNumCurrentReplicas = reflectGetNumCurrentReplicas(outputStream);
     this.refGetDefaultReplication = reflectGetDefaultReplication(fs);
+
   }
 
   protected void unregisterCurrentStream() {
@@ -212,4 +227,53 @@ public abstract class AbstractHDFSWriter implements HDFSWriter {
     return m;
   }
 
+  /**
+   * This will
+   * @param outputStream
+   * @throws IOException
+   */
+  protected void closeHDFSOutputStream(OutputStream outputStream)
+      throws IOException {
+    try {
+      outputStream.close();
+
+      if (numberOfCloseRetries > 0) {
+        try {
+          Method isFileClosedMethod = getIsFileClosedMethod();
+          int closeAttemptsMade = 0;
+          if (isFileClosedMethod != null) {
+            while (closeAttemptsMade < numberOfCloseRetries.intValue() &&
+                Boolean.FALSE.equals(isFileClosedMethod.invoke(fs, destPath))) {
+              closeAttemptsMade++;
+              logger.debug("Waiting: '" + timeBetweenCloseRetries + "' before retry close");
+              Thread.sleep(timeBetweenCloseRetries);
+              try {
+                outputStream.close();
+              } catch (IOException e) {
+                logger.error("Unable to close HDFS file: '" + destPath + "'");
+              }
+            }
+            if (closeAttemptsMade == numberOfCloseRetries.intValue()) {
+              logger.warn("Failed to close '" + destPath + "' is " +
+                numberOfCloseRetries + " retries, over " + (timeBetweenCloseRetries * numberOfCloseRetries) + " millseconds");
+            }
+          }
+        } catch (Exception e) {
+          logger.error("Failed to close '" + destPath + "' is " +
+              numberOfCloseRetries + " retries, over " + (timeBetweenCloseRetries * numberOfCloseRetries) + " millseconds", e);
+        }
+      }
+    } catch (IOException e) {
+      logger.error("Unable to close HDFS file: '" + destPath + "'");
+    }
+  }
+
+  private Method getIsFileClosedMethod() {
+    try {
+      return fs.getClass().getMethod("isFileClosed", Path.class);
+    } catch (Exception e) {
+      return null;
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/5b5470bd/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java
index 2c2be6a..5518547 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java
@@ -147,7 +147,7 @@ public class HDFSCompressedDataStream extends AbstractHDFSWriter {
     }
     fsOut.flush();
     fsOut.sync();
-    cmpOut.close();
+    closeHDFSOutputStream(cmpOut);
 
     unregisterCurrentStream();
   }

http://git-wip-us.apache.org/repos/asf/flume/blob/5b5470bd/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSDataStream.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSDataStream.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSDataStream.java
index b8214be..e20d1ee 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSDataStream.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSDataStream.java
@@ -123,7 +123,7 @@ public class HDFSDataStream extends AbstractHDFSWriter {
     serializer.beforeClose();
     outStream.flush();
     outStream.sync();
-    outStream.close();
+    closeHDFSOutputStream(outStream);
 
     unregisterCurrentStream();
   }

http://git-wip-us.apache.org/repos/asf/flume/blob/5b5470bd/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java
index 0383744..5fe9f1b 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java
@@ -75,6 +75,12 @@ public class HDFSSequenceFile extends AbstractHDFSWriter {
     Configuration conf = new Configuration();
     Path dstPath = new Path(filePath);
     FileSystem hdfs = dstPath.getFileSystem(conf);
+    open(dstPath, codeC, compType, conf, hdfs);
+  }
+
+  protected void open(Path dstPath, CompressionCodec codeC,
+      CompressionType compType, Configuration conf, FileSystem hdfs)
+          throws IOException {
     if(useRawLocalFileSystem) {
       if(hdfs instanceof LocalFileSystem) {
         hdfs = ((LocalFileSystem)hdfs).getRaw();
@@ -110,7 +116,7 @@ public class HDFSSequenceFile extends AbstractHDFSWriter {
   @Override
   public void close() throws IOException {
     writer.close();
-    outStream.close();
+    closeHDFSOutputStream(outStream);
 
     unregisterCurrentStream();
   }

http://git-wip-us.apache.org/repos/asf/flume/blob/5b5470bd/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockFileSystemCloseRetryWrapper.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockFileSystemCloseRetryWrapper.java b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockFileSystemCloseRetryWrapper.java
new file mode 100644
index 0000000..b5d89e6
--- /dev/null
+++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockFileSystemCloseRetryWrapper.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.sink.hdfs;
+
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.util.Progressable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MockFileSystemCloseRetryWrapper extends FileSystem{
+
+  private static final Logger logger =
+      LoggerFactory.getLogger(MockFileSystemCloseRetryWrapper.class);
+
+  FileSystem fs;
+  int numberOfClosesRequired;
+  boolean throwExceptionsOfFailedClose;
+  MockFsDataOutputStreamCloseRetryWrapper latestOutputStream;
+
+  public MockFileSystemCloseRetryWrapper (FileSystem fs,
+      int numberOfClosesRequired, boolean throwExceptionsOfFailedClose) {
+    this.fs = fs;
+    this.throwExceptionsOfFailedClose = throwExceptionsOfFailedClose;
+    this.numberOfClosesRequired = numberOfClosesRequired;
+  }
+
+  public MockFsDataOutputStreamCloseRetryWrapper getLastMockOutputStream() {
+    return latestOutputStream;
+  }
+
+  @Override
+  public FSDataOutputStream append(Path arg0, int arg1, Progressable arg2)
+      throws IOException {
+
+    latestOutputStream = new MockFsDataOutputStreamCloseRetryWrapper(fs.append(arg0, arg1, arg2), numberOfClosesRequired, throwExceptionsOfFailedClose);
+
+    return latestOutputStream;
+  }
+
+  @Override
+  public FSDataOutputStream create(Path arg0) throws IOException {
+    //throw new IOException ("HI there2");
+    latestOutputStream = new MockFsDataOutputStreamCloseRetryWrapper(fs.create(arg0), numberOfClosesRequired, throwExceptionsOfFailedClose);
+
+    return latestOutputStream;
+  }
+
+  @Override
+  public FSDataOutputStream create(Path arg0, FsPermission arg1, boolean arg2,
+      int arg3, short arg4, long arg5, Progressable arg6) throws IOException {
+    throw new IOException ("Not a real file system");
+    //return new MockFsDataOutputStreamCloseRetryWrapper(fs.create(arg0, arg1, arg2, arg3, arg4, arg5, arg6), numberOfClosesRequired, throwExceptionsOfFailedClose);
+  }
+
+  @Override
+  @Deprecated
+  public boolean delete(Path arg0) throws IOException {
+    return fs.delete(arg0);
+  }
+
+  @Override
+  public boolean delete(Path arg0, boolean arg1) throws IOException {
+    return fs.delete(arg0, arg1);
+  }
+
+  @Override
+  public FileStatus getFileStatus(Path arg0) throws IOException {
+    return fs.getFileStatus(arg0);
+  }
+
+  @Override
+  public URI getUri() {
+    return fs.getUri();
+  }
+
+  @Override
+  public Path getWorkingDirectory() {
+    return fs.getWorkingDirectory();
+  }
+
+  @Override
+  public FileStatus[] listStatus(Path arg0) throws IOException {
+    return fs.listStatus(arg0);
+  }
+
+  @Override
+  public boolean mkdirs(Path arg0, FsPermission arg1) throws IOException {
+    // TODO Auto-generated method stub
+    return fs.mkdirs(arg0, arg1);
+  }
+
+  @Override
+  public FSDataInputStream open(Path arg0, int arg1) throws IOException {
+    return fs.open(arg0, arg1);
+  }
+
+  @Override
+  public boolean rename(Path arg0, Path arg1) throws IOException {
+
+    return fs.rename(arg0, arg1);
+  }
+
+  @Override
+  public void setWorkingDirectory(Path arg0) {
+    fs.setWorkingDirectory(arg0);
+
+  }
+
+  public boolean isFileClosed(Path path) {
+
+    logger.info("isFileClosed: '" + latestOutputStream.getCurrentCloseAttempts() + "' , '" + numberOfClosesRequired + "'");
+
+    return latestOutputStream.getCurrentCloseAttempts() >= numberOfClosesRequired || numberOfClosesRequired == 0;
+  }
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/5b5470bd/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockFsDataOutputStreamCloseRetryWrapper.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockFsDataOutputStreamCloseRetryWrapper.java b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockFsDataOutputStreamCloseRetryWrapper.java
new file mode 100644
index 0000000..1d8c140
--- /dev/null
+++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockFsDataOutputStreamCloseRetryWrapper.java
@@ -0,0 +1,73 @@
+/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
+package org.apache.flume.sink.hdfs;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem.Statistics;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MockFsDataOutputStreamCloseRetryWrapper extends FSDataOutputStream{
+
+  private static final Logger logger =
+      LoggerFactory.getLogger(MockFsDataOutputStreamCloseRetryWrapper.class);
+
+  int currentCloseAttempts = 0;
+  int numberOfClosesRequired;
+  boolean throwExceptionsOfFailedClose;
+
+  public MockFsDataOutputStreamCloseRetryWrapper(FSDataOutputStream wrapMe,
+      int numberOfClosesRequired, boolean throwExceptionsOfFailedClose)
+      throws IOException {
+    super(wrapMe.getWrappedStream(), null);
+
+    this.numberOfClosesRequired = numberOfClosesRequired;
+    this.throwExceptionsOfFailedClose = throwExceptionsOfFailedClose;
+
+  }
+
+  public MockFsDataOutputStreamCloseRetryWrapper(OutputStream out,
+      Statistics stats) throws IOException {
+    super(out, stats);
+
+  }
+
+  @Override
+  public void close() throws IOException {
+    currentCloseAttempts++;
+    logger.info("Attempting to Close: '" + currentCloseAttempts + "' of '" + numberOfClosesRequired + "'");
+    if (currentCloseAttempts > numberOfClosesRequired || numberOfClosesRequired == 0) {
+      logger.info("closing file");
+      super.close();
+    } else {
+      if (throwExceptionsOfFailedClose) {
+        logger.info("no closed and throwing exception");
+        throw new IOException("MockIOException");
+      } else {
+        logger.info("no closed and doing nothing");
+      }
+    }
+  }
+
+  public int getCurrentCloseAttempts() {
+    return currentCloseAttempts;
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/5b5470bd/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestUseRawLocalFileSystem.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestUseRawLocalFileSystem.java b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestUseRawLocalFileSystem.java
index ffbdde0..4476530 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestUseRawLocalFileSystem.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestUseRawLocalFileSystem.java
@@ -30,6 +30,9 @@ import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.event.EventBuilder;
 import org.apache.flume.instrumentation.SinkCounter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.io.compress.CompressionCodec;
@@ -102,4 +105,53 @@ public class TestUseRawLocalFileSystem {
     stream.sync();
     Assert.assertTrue(testFile.length() > 0);
   }
+
+  @Test
+  public void testSequenceFileCloseRetries() throws Exception {
+    SequenceFileCloseRetryCoreTest(3, 0, false);
+    SequenceFileCloseRetryCoreTest(3, 1, false);
+    SequenceFileCloseRetryCoreTest(3, 5, false);
+
+    SequenceFileCloseRetryCoreTest(3, 0, true);
+    SequenceFileCloseRetryCoreTest(3, 1, true);
+    SequenceFileCloseRetryCoreTest(3, 5, true);
+
+    SequenceFileCloseRetryCoreTest(3, 2, true);
+    SequenceFileCloseRetryCoreTest(3, 2, true);
+
+    SequenceFileCloseRetryCoreTest(0, 0, true);
+    SequenceFileCloseRetryCoreTest(1, 0, true);
+  }
+
+
+  public void SequenceFileCloseRetryCoreTest(int numberOfCloseRetriesToAttempt, int numberOfClosesRequired, boolean throwExceptionsOfFailedClose) throws Exception {
+    String file = testFile.getCanonicalPath();
+    HDFSSequenceFile stream = new HDFSSequenceFile();
+    context.put("hdfs.useRawLocalFileSystem", "true");
+    context.put("hdfs.closeTries", String.valueOf(numberOfCloseRetriesToAttempt));
+    Configuration conf = new Configuration();
+    Path dstPath = new Path(file);
+    MockFileSystemCloseRetryWrapper mockFs = new MockFileSystemCloseRetryWrapper(dstPath.getFileSystem(conf), numberOfClosesRequired, throwExceptionsOfFailedClose);
+    stream.configure(context);
+    stream.open(dstPath, null, CompressionType.NONE, conf, mockFs);
+    stream.append(event);
+    stream.sync();
+
+    stream.close();
+
+    if (throwExceptionsOfFailedClose) {
+      int expectedNumberOfCloses = 1;
+      Assert.assertTrue("Expected " + expectedNumberOfCloses + " but got " + mockFs.getLastMockOutputStream().getCurrentCloseAttempts() ,  mockFs.getLastMockOutputStream().currentCloseAttempts == expectedNumberOfCloses);
+    } else {
+      int expectedNumberOfCloses = Math.max(Math.min(numberOfClosesRequired, numberOfCloseRetriesToAttempt), 1);
+      Assert.assertTrue("Expected " + expectedNumberOfCloses + " but got " + mockFs.getLastMockOutputStream().getCurrentCloseAttempts() ,  mockFs.getLastMockOutputStream().currentCloseAttempts == expectedNumberOfCloses);
+    }
+
+
+
+
+
+  }
+
+
 }
\ No newline at end of file