You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by hs...@apache.org on 2013/07/17 00:13:29 UTC
git commit: FLUME-2007. HDFS Sink should check if file is closed and
retry if it is not.
Updated Branches:
refs/heads/trunk a34cdb0ea -> 5b5470bd5
FLUME-2007. HDFS Sink should check if file is closed and retry if it is not.
(Ted Malaska via Hari Shreedharan)
Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/5b5470bd
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/5b5470bd
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/5b5470bd
Branch: refs/heads/trunk
Commit: 5b5470bd5d3e94842032009c36788d4ae346674b
Parents: a34cdb0
Author: Hari Shreedharan <hs...@apache.org>
Authored: Tue Jul 16 15:12:11 2013 -0700
Committer: Hari Shreedharan <hs...@apache.org>
Committed: Tue Jul 16 15:12:59 2013 -0700
----------------------------------------------------------------------
.../flume/sink/hdfs/AbstractHDFSWriter.java | 64 +++++++++
.../sink/hdfs/HDFSCompressedDataStream.java | 2 +-
.../apache/flume/sink/hdfs/HDFSDataStream.java | 2 +-
.../flume/sink/hdfs/HDFSSequenceFile.java | 8 +-
.../hdfs/MockFileSystemCloseRetryWrapper.java | 142 +++++++++++++++++++
...MockFsDataOutputStreamCloseRetryWrapper.java | 73 ++++++++++
.../sink/hdfs/TestUseRawLocalFileSystem.java | 52 +++++++
7 files changed, 340 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flume/blob/5b5470bd/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/AbstractHDFSWriter.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/AbstractHDFSWriter.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/AbstractHDFSWriter.java
index bc3b383..da0466d 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/AbstractHDFSWriter.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/AbstractHDFSWriter.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
import java.io.OutputStream;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
@@ -44,6 +45,8 @@ public abstract class AbstractHDFSWriter implements HDFSWriter {
private Method refGetNumCurrentReplicas = null;
private Method refGetDefaultReplication = null;
private Integer configuredMinReplicas = null;
+ private Integer numberOfCloseRetries = null;
+ private long timeBetweenCloseRetries = Long.MAX_VALUE;
final static Object [] NO_ARGS = new Object []{};
@@ -54,6 +57,17 @@ public abstract class AbstractHDFSWriter implements HDFSWriter {
Preconditions.checkArgument(configuredMinReplicas >= 0,
"hdfs.minBlockReplicas must be greater than or equal to 0");
}
+ numberOfCloseRetries = context.getInteger("hdfs.closeTries", 1) - 1;
+
+ if (numberOfCloseRetries > 1) {
+ try {
+ timeBetweenCloseRetries = context.getLong("hdfs.callTimeout", 10000l);
+ } catch (NumberFormatException e) {
+ logger.warn("hdfs.callTimeout can not be parsed to a long: " + context.getLong("hdfs.callTimeout"));
+ }
+ timeBetweenCloseRetries = Math.max(timeBetweenCloseRetries/numberOfCloseRetries, 1000);
+ }
+
}
/**
@@ -97,6 +111,7 @@ public abstract class AbstractHDFSWriter implements HDFSWriter {
this.destPath = destPath;
this.refGetNumCurrentReplicas = reflectGetNumCurrentReplicas(outputStream);
this.refGetDefaultReplication = reflectGetDefaultReplication(fs);
+
}
protected void unregisterCurrentStream() {
@@ -212,4 +227,53 @@ public abstract class AbstractHDFSWriter implements HDFSWriter {
return m;
}
+ /**
+ * This will
+ * @param outputStream
+ * @throws IOException
+ */
+ protected void closeHDFSOutputStream(OutputStream outputStream)
+ throws IOException {
+ try {
+ outputStream.close();
+
+ if (numberOfCloseRetries > 0) {
+ try {
+ Method isFileClosedMethod = getIsFileClosedMethod();
+ int closeAttemptsMade = 0;
+ if (isFileClosedMethod != null) {
+ while (closeAttemptsMade < numberOfCloseRetries.intValue() &&
+ Boolean.FALSE.equals(isFileClosedMethod.invoke(fs, destPath))) {
+ closeAttemptsMade++;
+ logger.debug("Waiting: '" + timeBetweenCloseRetries + "' before retry close");
+ Thread.sleep(timeBetweenCloseRetries);
+ try {
+ outputStream.close();
+ } catch (IOException e) {
+ logger.error("Unable to close HDFS file: '" + destPath + "'");
+ }
+ }
+ if (closeAttemptsMade == numberOfCloseRetries.intValue()) {
+ logger.warn("Failed to close '" + destPath + "' is " +
+ numberOfCloseRetries + " retries, over " + (timeBetweenCloseRetries * numberOfCloseRetries) + " millseconds");
+ }
+ }
+ } catch (Exception e) {
+ logger.error("Failed to close '" + destPath + "' is " +
+ numberOfCloseRetries + " retries, over " + (timeBetweenCloseRetries * numberOfCloseRetries) + " millseconds", e);
+ }
+ }
+ } catch (IOException e) {
+ logger.error("Unable to close HDFS file: '" + destPath + "'");
+ }
+ }
+
+ private Method getIsFileClosedMethod() {
+ try {
+ return fs.getClass().getMethod("isFileClosed", Path.class);
+ } catch (Exception e) {
+ return null;
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/flume/blob/5b5470bd/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java
index 2c2be6a..5518547 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java
@@ -147,7 +147,7 @@ public class HDFSCompressedDataStream extends AbstractHDFSWriter {
}
fsOut.flush();
fsOut.sync();
- cmpOut.close();
+ closeHDFSOutputStream(cmpOut);
unregisterCurrentStream();
}
http://git-wip-us.apache.org/repos/asf/flume/blob/5b5470bd/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSDataStream.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSDataStream.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSDataStream.java
index b8214be..e20d1ee 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSDataStream.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSDataStream.java
@@ -123,7 +123,7 @@ public class HDFSDataStream extends AbstractHDFSWriter {
serializer.beforeClose();
outStream.flush();
outStream.sync();
- outStream.close();
+ closeHDFSOutputStream(outStream);
unregisterCurrentStream();
}
http://git-wip-us.apache.org/repos/asf/flume/blob/5b5470bd/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java
index 0383744..5fe9f1b 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java
@@ -75,6 +75,12 @@ public class HDFSSequenceFile extends AbstractHDFSWriter {
Configuration conf = new Configuration();
Path dstPath = new Path(filePath);
FileSystem hdfs = dstPath.getFileSystem(conf);
+ open(dstPath, codeC, compType, conf, hdfs);
+ }
+
+ protected void open(Path dstPath, CompressionCodec codeC,
+ CompressionType compType, Configuration conf, FileSystem hdfs)
+ throws IOException {
if(useRawLocalFileSystem) {
if(hdfs instanceof LocalFileSystem) {
hdfs = ((LocalFileSystem)hdfs).getRaw();
@@ -110,7 +116,7 @@ public class HDFSSequenceFile extends AbstractHDFSWriter {
@Override
public void close() throws IOException {
writer.close();
- outStream.close();
+ closeHDFSOutputStream(outStream);
unregisterCurrentStream();
}
http://git-wip-us.apache.org/repos/asf/flume/blob/5b5470bd/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockFileSystemCloseRetryWrapper.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockFileSystemCloseRetryWrapper.java b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockFileSystemCloseRetryWrapper.java
new file mode 100644
index 0000000..b5d89e6
--- /dev/null
+++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockFileSystemCloseRetryWrapper.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.sink.hdfs;
+
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.util.Progressable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MockFileSystemCloseRetryWrapper extends FileSystem{
+
+ private static final Logger logger =
+ LoggerFactory.getLogger(MockFileSystemCloseRetryWrapper.class);
+
+ FileSystem fs;
+ int numberOfClosesRequired;
+ boolean throwExceptionsOfFailedClose;
+ MockFsDataOutputStreamCloseRetryWrapper latestOutputStream;
+
+ public MockFileSystemCloseRetryWrapper (FileSystem fs,
+ int numberOfClosesRequired, boolean throwExceptionsOfFailedClose) {
+ this.fs = fs;
+ this.throwExceptionsOfFailedClose = throwExceptionsOfFailedClose;
+ this.numberOfClosesRequired = numberOfClosesRequired;
+ }
+
+ public MockFsDataOutputStreamCloseRetryWrapper getLastMockOutputStream() {
+ return latestOutputStream;
+ }
+
+ @Override
+ public FSDataOutputStream append(Path arg0, int arg1, Progressable arg2)
+ throws IOException {
+
+ latestOutputStream = new MockFsDataOutputStreamCloseRetryWrapper(fs.append(arg0, arg1, arg2), numberOfClosesRequired, throwExceptionsOfFailedClose);
+
+ return latestOutputStream;
+ }
+
+ @Override
+ public FSDataOutputStream create(Path arg0) throws IOException {
+ //throw new IOException ("HI there2");
+ latestOutputStream = new MockFsDataOutputStreamCloseRetryWrapper(fs.create(arg0), numberOfClosesRequired, throwExceptionsOfFailedClose);
+
+ return latestOutputStream;
+ }
+
+ @Override
+ public FSDataOutputStream create(Path arg0, FsPermission arg1, boolean arg2,
+ int arg3, short arg4, long arg5, Progressable arg6) throws IOException {
+ throw new IOException ("Not a real file system");
+ //return new MockFsDataOutputStreamCloseRetryWrapper(fs.create(arg0, arg1, arg2, arg3, arg4, arg5, arg6), numberOfClosesRequired, throwExceptionsOfFailedClose);
+ }
+
+ @Override
+ @Deprecated
+ public boolean delete(Path arg0) throws IOException {
+ return fs.delete(arg0);
+ }
+
+ @Override
+ public boolean delete(Path arg0, boolean arg1) throws IOException {
+ return fs.delete(arg0, arg1);
+ }
+
+ @Override
+ public FileStatus getFileStatus(Path arg0) throws IOException {
+ return fs.getFileStatus(arg0);
+ }
+
+ @Override
+ public URI getUri() {
+ return fs.getUri();
+ }
+
+ @Override
+ public Path getWorkingDirectory() {
+ return fs.getWorkingDirectory();
+ }
+
+ @Override
+ public FileStatus[] listStatus(Path arg0) throws IOException {
+ return fs.listStatus(arg0);
+ }
+
+ @Override
+ public boolean mkdirs(Path arg0, FsPermission arg1) throws IOException {
+ // TODO Auto-generated method stub
+ return fs.mkdirs(arg0, arg1);
+ }
+
+ @Override
+ public FSDataInputStream open(Path arg0, int arg1) throws IOException {
+ return fs.open(arg0, arg1);
+ }
+
+ @Override
+ public boolean rename(Path arg0, Path arg1) throws IOException {
+
+ return fs.rename(arg0, arg1);
+ }
+
+ @Override
+ public void setWorkingDirectory(Path arg0) {
+ fs.setWorkingDirectory(arg0);
+
+ }
+
+ public boolean isFileClosed(Path path) {
+
+ logger.info("isFileClosed: '" + latestOutputStream.getCurrentCloseAttempts() + "' , '" + numberOfClosesRequired + "'");
+
+ return latestOutputStream.getCurrentCloseAttempts() >= numberOfClosesRequired || numberOfClosesRequired == 0;
+ }
+
+
+
+}
http://git-wip-us.apache.org/repos/asf/flume/blob/5b5470bd/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockFsDataOutputStreamCloseRetryWrapper.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockFsDataOutputStreamCloseRetryWrapper.java b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockFsDataOutputStreamCloseRetryWrapper.java
new file mode 100644
index 0000000..1d8c140
--- /dev/null
+++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockFsDataOutputStreamCloseRetryWrapper.java
@@ -0,0 +1,73 @@
+/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
+package org.apache.flume.sink.hdfs;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem.Statistics;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MockFsDataOutputStreamCloseRetryWrapper extends FSDataOutputStream{
+
+ private static final Logger logger =
+ LoggerFactory.getLogger(MockFsDataOutputStreamCloseRetryWrapper.class);
+
+ int currentCloseAttempts = 0;
+ int numberOfClosesRequired;
+ boolean throwExceptionsOfFailedClose;
+
+ public MockFsDataOutputStreamCloseRetryWrapper(FSDataOutputStream wrapMe,
+ int numberOfClosesRequired, boolean throwExceptionsOfFailedClose)
+ throws IOException {
+ super(wrapMe.getWrappedStream(), null);
+
+ this.numberOfClosesRequired = numberOfClosesRequired;
+ this.throwExceptionsOfFailedClose = throwExceptionsOfFailedClose;
+
+ }
+
+ public MockFsDataOutputStreamCloseRetryWrapper(OutputStream out,
+ Statistics stats) throws IOException {
+ super(out, stats);
+
+ }
+
+ @Override
+ public void close() throws IOException {
+ currentCloseAttempts++;
+ logger.info("Attempting to Close: '" + currentCloseAttempts + "' of '" + numberOfClosesRequired + "'");
+ if (currentCloseAttempts > numberOfClosesRequired || numberOfClosesRequired == 0) {
+ logger.info("closing file");
+ super.close();
+ } else {
+ if (throwExceptionsOfFailedClose) {
+ logger.info("no closed and throwing exception");
+ throw new IOException("MockIOException");
+ } else {
+ logger.info("no closed and doing nothing");
+ }
+ }
+ }
+
+ public int getCurrentCloseAttempts() {
+ return currentCloseAttempts;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flume/blob/5b5470bd/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestUseRawLocalFileSystem.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestUseRawLocalFileSystem.java b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestUseRawLocalFileSystem.java
index ffbdde0..4476530 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestUseRawLocalFileSystem.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestUseRawLocalFileSystem.java
@@ -30,6 +30,9 @@ import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.instrumentation.SinkCounter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.compress.CompressionCodec;
@@ -102,4 +105,53 @@ public class TestUseRawLocalFileSystem {
stream.sync();
Assert.assertTrue(testFile.length() > 0);
}
+
+ @Test
+ public void testSequenceFileCloseRetries() throws Exception {
+ SequenceFileCloseRetryCoreTest(3, 0, false);
+ SequenceFileCloseRetryCoreTest(3, 1, false);
+ SequenceFileCloseRetryCoreTest(3, 5, false);
+
+ SequenceFileCloseRetryCoreTest(3, 0, true);
+ SequenceFileCloseRetryCoreTest(3, 1, true);
+ SequenceFileCloseRetryCoreTest(3, 5, true);
+
+ SequenceFileCloseRetryCoreTest(3, 2, true);
+ SequenceFileCloseRetryCoreTest(3, 2, true);
+
+ SequenceFileCloseRetryCoreTest(0, 0, true);
+ SequenceFileCloseRetryCoreTest(1, 0, true);
+ }
+
+
+ public void SequenceFileCloseRetryCoreTest(int numberOfCloseRetriesToAttempt, int numberOfClosesRequired, boolean throwExceptionsOfFailedClose) throws Exception {
+ String file = testFile.getCanonicalPath();
+ HDFSSequenceFile stream = new HDFSSequenceFile();
+ context.put("hdfs.useRawLocalFileSystem", "true");
+ context.put("hdfs.closeTries", String.valueOf(numberOfCloseRetriesToAttempt));
+ Configuration conf = new Configuration();
+ Path dstPath = new Path(file);
+ MockFileSystemCloseRetryWrapper mockFs = new MockFileSystemCloseRetryWrapper(dstPath.getFileSystem(conf), numberOfClosesRequired, throwExceptionsOfFailedClose);
+ stream.configure(context);
+ stream.open(dstPath, null, CompressionType.NONE, conf, mockFs);
+ stream.append(event);
+ stream.sync();
+
+ stream.close();
+
+ if (throwExceptionsOfFailedClose) {
+ int expectedNumberOfCloses = 1;
+ Assert.assertTrue("Expected " + expectedNumberOfCloses + " but got " + mockFs.getLastMockOutputStream().getCurrentCloseAttempts() , mockFs.getLastMockOutputStream().currentCloseAttempts == expectedNumberOfCloses);
+ } else {
+ int expectedNumberOfCloses = Math.max(Math.min(numberOfClosesRequired, numberOfCloseRetriesToAttempt), 1);
+ Assert.assertTrue("Expected " + expectedNumberOfCloses + " but got " + mockFs.getLastMockOutputStream().getCurrentCloseAttempts() , mockFs.getLastMockOutputStream().currentCloseAttempts == expectedNumberOfCloses);
+ }
+
+
+
+
+
+ }
+
+
}
\ No newline at end of file