You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by da...@apache.org on 2015/11/30 22:07:21 UTC

[75/98] [abbrv] incubator-apex-malhar git commit: MLHR-1889 #resolve #comment moved atomic renaming to its own method

MLHR-1889 #resolve #comment moved atomic renaming to its own method


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/5d8382d5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/5d8382d5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/5d8382d5

Branch: refs/heads/master
Commit: 5d8382d51b12f0e47e83271b327d03cfe88b49b0
Parents: 73c8abf
Author: Chandni Singh <cs...@apache.org>
Authored: Wed Nov 4 11:40:39 2015 -0800
Committer: Chandni Singh <cs...@apache.org>
Committed: Thu Nov 5 17:34:03 2015 -0800

----------------------------------------------------------------------
 .../lib/io/fs/AbstractFileOutputOperator.java   | 61 +++++++++++++++-----
 1 file changed, 46 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/5d8382d5/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
index 744f024..fcbe1e8 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
@@ -21,7 +21,11 @@ package com.datatorrent.lib.io.fs;
 import java.io.FilterOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
@@ -29,31 +33,43 @@ import javax.annotation.Nonnull;
 import javax.validation.constraints.Min;
 import javax.validation.constraints.NotNull;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Strings;
-import com.google.common.cache.*;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
-import org.apache.commons.io.IOUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang.mutable.MutableLong;
 import org.apache.commons.lang3.mutable.MutableInt;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.AbstractFileSystem;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Options;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
 import org.apache.hadoop.fs.permission.FsPermission;
 
-import com.datatorrent.lib.counters.BasicCounters;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Strings;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 
-import com.datatorrent.common.util.BaseOperator;
 import com.datatorrent.api.Context;
 import com.datatorrent.api.Context.OperatorContext;
 import com.datatorrent.api.DefaultInputPort;
 import com.datatorrent.api.Operator;
 import com.datatorrent.api.StreamCodec;
 import com.datatorrent.api.annotation.OperatorAnnotation;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.counters.BasicCounters;
 
 /**
  * This base implementation for a fault tolerant HDFS output operator,
@@ -179,6 +195,7 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
    * The file system used to write to.
    */
   protected transient FileSystem fs;
+  protected transient FileContext fileContext;
 
   protected short filePermission = 0777;
 
@@ -455,7 +472,6 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
       fsOutput.close();
       inputStream.close();
 
-      FileContext fileContext = FileContext.getFileContext(fs.getUri());
       LOG.debug("active {} recovery {} ", filepath, recoveryFilePath);
 
       if (alwaysWriteToTmp) {
@@ -464,7 +480,7 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
         fileNameToTmpName.put(partFileName, recoveryFileName);
       } else {
         LOG.debug("recovery path {} actual path {} ", recoveryFilePath, status.getPath());
-        fileContext.rename(recoveryFilePath, status.getPath(), Options.Rename.OVERWRITE);
+        rename(recoveryFilePath, status.getPath());
       }
     } else {
       if (alwaysWriteToTmp && filesWithOpenStreams.contains(filename)) {
@@ -642,6 +658,22 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
   }
 
   /**
+   * Renames source path to destination atomically. This relies on the FileContext api. If
+   * the underlying filesystem doesn't have an {@link AbstractFileSystem} then this should be overridden.
+   *
+   * @param source      source path
+   * @param destination destination path
+   * @throws IOException
+   */
+  protected void rename(Path source, Path destination) throws IOException
+  {
+    if (fileContext == null) {
+      fileContext = FileContext.getFileContext(fs.getUri());
+    }
+    fileContext.rename(source, destination, Options.Rename.OVERWRITE);
+  }
+
+  /**
    * Requests a file to be finalized. When it is writing to a rolling file, this will
    * request for finalizing the current open part and all the prev parts which weren't requested yet.
    *
@@ -1206,13 +1238,12 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
   protected void finalizeFile(String fileName) throws IOException
   {
     String tmpFileName = fileNameToTmpName.get(fileName);
-    FileContext fileContext = FileContext.getFileContext(fs.getUri());
     Path srcPath = new Path(filePath + Path.SEPARATOR + tmpFileName);
     Path destPath = new Path(filePath + Path.SEPARATOR + fileName);
 
     if (!fs.exists(destPath)) {
       LOG.debug("rename from tmp {} actual {} ", tmpFileName, fileName);
-      fileContext.rename(srcPath, destPath);
+      rename(srcPath, destPath);
     } else if (fs.exists(srcPath)) {
       //if the destination and src both exists that means there was a failure between file rename and clearing the endOffset so
       //we just delete the tmp file.