You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by da...@apache.org on 2015/11/30 22:07:21 UTC
[75/98] [abbrv] incubator-apex-malhar git commit: MLHR-1889 #resolve
#comment moved atomic renaming to its own method
MLHR-1889 #resolve #comment moved atomic renaming to its own method
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/5d8382d5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/5d8382d5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/5d8382d5
Branch: refs/heads/master
Commit: 5d8382d51b12f0e47e83271b327d03cfe88b49b0
Parents: 73c8abf
Author: Chandni Singh <cs...@apache.org>
Authored: Wed Nov 4 11:40:39 2015 -0800
Committer: Chandni Singh <cs...@apache.org>
Committed: Thu Nov 5 17:34:03 2015 -0800
----------------------------------------------------------------------
.../lib/io/fs/AbstractFileOutputOperator.java | 61 +++++++++++++++-----
1 file changed, 46 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/5d8382d5/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
index 744f024..fcbe1e8 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
@@ -21,7 +21,11 @@ package com.datatorrent.lib.io.fs;
import java.io.FilterOutputStream;
import java.io.IOException;
import java.io.OutputStream;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
@@ -29,31 +33,43 @@ import javax.annotation.Nonnull;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Strings;
-import com.google.common.cache.*;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
-import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.mutable.MutableLong;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.AbstractFileSystem;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Options;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
import org.apache.hadoop.fs.permission.FsPermission;
-import com.datatorrent.lib.counters.BasicCounters;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Strings;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
-import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.api.Context;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.Operator;
import com.datatorrent.api.StreamCodec;
import com.datatorrent.api.annotation.OperatorAnnotation;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.counters.BasicCounters;
/**
* This base implementation for a fault tolerant HDFS output operator,
@@ -179,6 +195,7 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
* The file system used to write to.
*/
protected transient FileSystem fs;
+ protected transient FileContext fileContext;
protected short filePermission = 0777;
@@ -455,7 +472,6 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
fsOutput.close();
inputStream.close();
- FileContext fileContext = FileContext.getFileContext(fs.getUri());
LOG.debug("active {} recovery {} ", filepath, recoveryFilePath);
if (alwaysWriteToTmp) {
@@ -464,7 +480,7 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
fileNameToTmpName.put(partFileName, recoveryFileName);
} else {
LOG.debug("recovery path {} actual path {} ", recoveryFilePath, status.getPath());
- fileContext.rename(recoveryFilePath, status.getPath(), Options.Rename.OVERWRITE);
+ rename(recoveryFilePath, status.getPath());
}
} else {
if (alwaysWriteToTmp && filesWithOpenStreams.contains(filename)) {
@@ -642,6 +658,22 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
}
/**
+ * Renames source path to destination atomically. This relies on the FileContext api. If
+ * the underlying filesystem doesn't have an {@link AbstractFileSystem} then this should be overridden.
+ *
+ * @param source source path
+ * @param destination destination path
+ * @throws IOException
+ */
+ protected void rename(Path source, Path destination) throws IOException
+ {
+ if (fileContext == null) {
+ fileContext = FileContext.getFileContext(fs.getUri());
+ }
+ fileContext.rename(source, destination, Options.Rename.OVERWRITE);
+ }
+
+ /**
* Requests a file to be finalized. When it is writing to a rolling file, this will
* request for finalizing the current open part and all the prev parts which weren't requested yet.
*
@@ -1206,13 +1238,12 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
protected void finalizeFile(String fileName) throws IOException
{
String tmpFileName = fileNameToTmpName.get(fileName);
- FileContext fileContext = FileContext.getFileContext(fs.getUri());
Path srcPath = new Path(filePath + Path.SEPARATOR + tmpFileName);
Path destPath = new Path(filePath + Path.SEPARATOR + fileName);
if (!fs.exists(destPath)) {
LOG.debug("rename from tmp {} actual {} ", tmpFileName, fileName);
- fileContext.rename(srcPath, destPath);
+ rename(srcPath, destPath);
} else if (fs.exists(srcPath)) {
//if the destination and src both exists that means there was a failure between file rename and clearing the endOffset so
//we just delete the tmp file.