You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by da...@apache.org on 2015/11/30 22:07:17 UTC

[71/98] [abbrv] incubator-apex-malhar git commit: MLHR-1886 #resolve #comment optimizing recovery and updating rotation states only when new part is open

MLHR-1886 #resolve #comment optimizing recovery and updating rotation states only when new part is open


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/7156400b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/7156400b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/7156400b

Branch: refs/heads/master
Commit: 7156400be10c2fbda73a33ffe8c6fc03a580a569
Parents: c86e50a
Author: Chandni Singh <cs...@apache.org>
Authored: Fri Oct 30 19:00:42 2015 -0700
Committer: Chandni Singh <cs...@apache.org>
Committed: Thu Nov 5 17:16:15 2015 -0800

----------------------------------------------------------------------
 .../lib/io/fs/AbstractFileOutputOperator.java   | 361 ++++++++++---------
 1 file changed, 199 insertions(+), 162 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/7156400b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
index 3819a33..09294a2 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
@@ -240,6 +240,7 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
    * This isn't the most effective way but adds a little bit of optimization.
    */
   private Long expireStreamAfterAcessMillis;
+  private final Set<String> filesWithOpenStreams;
 
   /**
    * This input port receives incoming tuples.
@@ -264,7 +265,8 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
     }
   };
 
-  private static class RotationState {
+  private static class RotationState
+  {
     boolean notEmpty;
     boolean rotated;
   }
@@ -278,6 +280,7 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
     fileNameToTmpName = Maps.newHashMap();
     finalizedFiles = Maps.newTreeMap();
     finalizedPart = Maps.newHashMap();
+    filesWithOpenStreams = Sets.newHashSet();
   }
 
   /**
@@ -303,7 +306,8 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
   {
     LOG.debug("setup initiated");
     if (expireStreamAfterAcessMillis == null) {
-      expireStreamAfterAcessMillis = (long)(context.getValue(OperatorContext.SPIN_MILLIS) * context.getValue(Context.DAGContext.CHECKPOINT_WINDOW_COUNT));
+      expireStreamAfterAcessMillis = (long)(context.getValue(OperatorContext.SPIN_MILLIS) *
+        context.getValue(Context.DAGContext.CHECKPOINT_WINDOW_COUNT));
     }
     rollingFile = (maxLength < Long.MAX_VALUE) || (rotationWindows > 0);
 
@@ -321,126 +325,16 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
 
     LOG.debug("FS class {}", fs.getClass());
 
-    //When an entry is removed from the cache, removal listener is notified and it closes the output stream.
-    RemovalListener<String, FSFilterStreamContext> removalListener = new RemovalListener<String, FSFilterStreamContext>()
-    {
-      @Override
-      public void onRemoval(@Nonnull RemovalNotification<String, FSFilterStreamContext> notification)
-      {
-        FSFilterStreamContext streamContext = notification.getValue();
-        if (streamContext != null) {
-
-          //FilterOutputStream filterStream = streamContext.getFilterStream();
-          try {
-            String filename = notification.getKey();
-            String partFileName = getPartFileNamePri(filename);
-
-            LOG.debug("closing {}", partFileName);
-            long start = System.currentTimeMillis();
-            streamContext.close();
-            //filterStream.close();
-            totalWritingTime += System.currentTimeMillis() - start;
-          }
-          catch (IOException e) {
-            throw new RuntimeException(e);
-          }
-        }
-      }
-    };
-
-    //Define cache
-    CacheLoader<String, FSFilterStreamContext> loader = new CacheLoader<String, FSFilterStreamContext>()
-    {
-      @Override
-      public FSFilterStreamContext load(@Nonnull String filename)
-      {
-        String partFileName = getPartFileNamePri(filename);
-        Path originalFilePath = new Path(filePath + Path.SEPARATOR + partFileName);
-
-        Path activeFilePath;
-        if (!alwaysWriteToTmp) {
-          activeFilePath = originalFilePath;
-        } else {
-          //MLHR-1776 : writing to tmp file
-          String tmpFileName = fileNameToTmpName.get(partFileName);
-          if (tmpFileName == null) {
-            tmpFileName = partFileName + '.' + System.currentTimeMillis() + TMP_EXTENSION;
-            fileNameToTmpName.put(partFileName, tmpFileName);
-          }
-          activeFilePath = new Path(filePath + Path.SEPARATOR + tmpFileName);
-        }
-
-        FSDataOutputStream fsOutput;
-
-        boolean sawThisFileBefore = endOffsets.containsKey(filename);
-
-        try {
-          if (fs.exists(originalFilePath) || (alwaysWriteToTmp && fs.exists(activeFilePath))) {
-            if(sawThisFileBefore) {
-              FileStatus fileStatus = fs.getFileStatus(activeFilePath);
-              MutableLong endOffset = endOffsets.get(filename);
-
-              if (endOffset != null) {
-                endOffset.setValue(fileStatus.getLen());
-              }
-              else {
-                endOffsets.put(filename, new MutableLong(fileStatus.getLen()));
-              }
-
-              fsOutput = fs.append(activeFilePath);
-              LOG.debug("appending to {}", activeFilePath);
-            }
-            else {
-              //We never saw this file before and we don't want to append
-              //If the file is rolling we need to delete all its parts.
-              if(rollingFile) {
-                int part = 0;
-
-                while (true) {
-                  Path seenPartFilePath = new Path(filePath + Path.SEPARATOR + getPartFileName(filename, part));
-                  if (!fs.exists(seenPartFilePath)) {
-                    break;
-                  }
-
-                  fs.delete(seenPartFilePath, true);
-                  part = part + 1;
-                }
-
-                fsOutput = fs.create(activeFilePath, (short) replication);
-              }
-              else {
-                //Not rolling is easy, just delete the file and create it again.
-                fs.delete(activeFilePath, true);
-                if(alwaysWriteToTmp){
-                  //we need to delete original file if that exists
-                  if(fs.exists(originalFilePath)){
-                    fs.delete(originalFilePath, true);
-                  }
-                }
-                fsOutput = fs.create(activeFilePath, (short) replication);
-              }
-            }
-          }
-          else {
-            fsOutput = fs.create(activeFilePath, (short) replication);
-            fs.setPermission(activeFilePath, FsPermission.createImmutable(filePermission));
-          }
-
-          LOG.info("opened {}, active {}", partFileName, activeFilePath);
-          return new FSFilterStreamContext(fsOutput);
-        }
-        catch (IOException e) {
-          throw new RuntimeException(e);
-        }
-      }
-    };
+    //building cache
+    RemovalListener<String, FSFilterStreamContext> removalListener = createCacheRemoveListener();
+    CacheLoader<String, FSFilterStreamContext> loader = createCacheLoader();
+    streamsCache = CacheBuilder.newBuilder().maximumSize(maxOpenFiles).expireAfterAccess(expireStreamAfterAcessMillis,
+      TimeUnit.MILLISECONDS).removalListener(removalListener).build(loader);
 
-    streamsCache = CacheBuilder.newBuilder().maximumSize(maxOpenFiles).expireAfterAccess(expireStreamAfterAcessMillis, TimeUnit.MILLISECONDS).removalListener(removalListener).build(loader);
+    LOG.debug("File system class: {}", fs.getClass());
+    LOG.debug("end-offsets {}", endOffsets);
 
     try {
-      LOG.debug("File system class: {}", fs.getClass());
-      LOG.debug("end-offsets {}", endOffsets);
-
       //Restore the files in case they were corrupted and the operator was re-deployed.
       Path writerPath = new Path(filePath);
       if (fs.exists(writerPath)) {
@@ -467,7 +361,7 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
               byte[] buffer = new byte[COPY_BUFFER_SIZE];
               String recoveryFileName = seenFileNamePart + '.' + System.currentTimeMillis() + TMP_EXTENSION;
               Path recoveryFilePath = new Path(filePath + Path.SEPARATOR + recoveryFileName);
-              FSDataOutputStream fsOutput = fs.create(recoveryFilePath, (short) replication);
+              FSDataOutputStream fsOutput = openStream(recoveryFilePath, false);
 
               while (inputStream.getPos() < offset) {
                 long remainingBytes = offset - inputStream.getPos();
@@ -492,9 +386,9 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
                 fileContext.rename(recoveryFilePath, status.getPath(), Options.Rename.OVERWRITE);
               }
             } else {
-              if (alwaysWriteToTmp) {
+              if (alwaysWriteToTmp && filesWithOpenStreams.contains(seenFileName)) {
                 String currentTmp = seenFileNamePart + '.' + System.currentTimeMillis() + TMP_EXTENSION;
-                FSDataOutputStream outputStream = fs.create(new Path(filePath + Path.SEPARATOR + currentTmp));
+                FSDataOutputStream outputStream = openStream(new Path(filePath + Path.SEPARATOR + currentTmp), false);
                 IOUtils.copy(inputStream, outputStream);
                 outputStream.close();
                 fileNameToTmpName.put(seenFileNamePart, currentTmp);
@@ -506,8 +400,7 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
       }
 
       if (rollingFile) {
-        //delete the left over future rolling files produced from the previous crashed instance
-        //of this operator.
+        //delete the left over future rolling files produced from the previous crashed instance of this operator.
         for(String seenFileName: endOffsets.keySet()) {
           try {
             Integer fileOpenPart = this.openPart.get(seenFileName).getValue();
@@ -550,10 +443,7 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
               rotate(seenFileName);
             }
           }
-          catch (IOException e) {
-            throw new RuntimeException(e);
-          }
-          catch (ExecutionException e) {
+          catch (IOException | ExecutionException e) {
             throw new RuntimeException(e);
           }
         }
@@ -561,17 +451,178 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
 
       LOG.debug("setup completed");
       LOG.debug("end-offsets {}", endOffsets);
-    }
-    catch (IOException e) {
+
+    } catch (IOException e) {
       throw new RuntimeException(e);
     }
 
     this.context = context;
 
-    fileCounters.setCounter(Counters.TOTAL_BYTES_WRITTEN,
-                            new MutableLong());
-    fileCounters.setCounter(Counters.TOTAL_TIME_WRITING_MILLISECONDS,
-                            new MutableLong());
+    fileCounters.setCounter(Counters.TOTAL_BYTES_WRITTEN, new MutableLong());
+    fileCounters.setCounter(Counters.TOTAL_TIME_WRITING_MILLISECONDS, new MutableLong());
+  }
+
+  /**
+   * Creates the {@link CacheLoader} for loading an output stream when it is not present in the cache.
+   * @return cache loader
+   */
+  private CacheLoader<String, FSFilterStreamContext> createCacheLoader()
+  {
+    return new CacheLoader<String, FSFilterStreamContext>()
+    {
+      @Override
+      public FSFilterStreamContext load(@Nonnull String filename)
+      {
+        if (rollingFile) {
+          RotationState state = getRotationState(filename);
+          if (rollingFile && state.rotated) {
+            openPart.get(filename).add(1);
+            state.rotated = false;
+            MutableLong offset = endOffsets.get(filename);
+            offset.setValue(0);
+          }
+        }
+
+        String partFileName = getPartFileNamePri(filename);
+        Path originalFilePath = new Path(filePath + Path.SEPARATOR + partFileName);
+
+        Path activeFilePath;
+        if (!alwaysWriteToTmp) {
+          activeFilePath = originalFilePath;
+        } else {
+          //MLHR-1776 : writing to tmp file
+          String tmpFileName = fileNameToTmpName.get(partFileName);
+          if (tmpFileName == null) {
+            tmpFileName = partFileName + '.' + System.currentTimeMillis() + TMP_EXTENSION;
+            fileNameToTmpName.put(partFileName, tmpFileName);
+          }
+          activeFilePath = new Path(filePath + Path.SEPARATOR + tmpFileName);
+        }
+
+        FSDataOutputStream fsOutput;
+
+        boolean sawThisFileBefore = endOffsets.containsKey(filename);
+
+        try {
+          if (fs.exists(originalFilePath) || (alwaysWriteToTmp && fs.exists(activeFilePath))) {
+            if (sawThisFileBefore) {
+              FileStatus fileStatus = fs.getFileStatus(activeFilePath);
+              MutableLong endOffset = endOffsets.get(filename);
+
+              if (endOffset != null) {
+                endOffset.setValue(fileStatus.getLen());
+              } else {
+                endOffsets.put(filename, new MutableLong(fileStatus.getLen()));
+              }
+
+              fsOutput = openStream(activeFilePath, true);
+              LOG.debug("appending to {}", activeFilePath);
+            } else {
+              //We never saw this file before and we don't want to append
+              //If the file is rolling we need to delete all its parts.
+              if (rollingFile) {
+                int part = 0;
+
+                while (true) {
+                  Path seenPartFilePath = new Path(filePath + Path.SEPARATOR + getPartFileName(filename, part));
+                  if (!fs.exists(seenPartFilePath)) {
+                    break;
+                  }
+
+                  fs.delete(seenPartFilePath, true);
+                  part = part + 1;
+                }
+
+                fsOutput = openStream(activeFilePath, false);
+              } else {
+                //Not rolling is easy, just delete the file and create it again.
+                fs.delete(activeFilePath, true);
+                if (alwaysWriteToTmp) {
+                  //we need to delete original file if that exists
+                  if (fs.exists(originalFilePath)) {
+                    fs.delete(originalFilePath, true);
+                  }
+                }
+                fsOutput = openStream(activeFilePath, false);
+              }
+            }
+          } else {
+            fsOutput = openStream(activeFilePath, false);
+          }
+          filesWithOpenStreams.add(filename);
+
+          LOG.info("opened {}, active {}", partFileName, activeFilePath);
+          return new FSFilterStreamContext(fsOutput);
+        } catch (IOException e) {
+          throw new RuntimeException(e);
+        }
+      }
+    };
+  }
+
+  /**
+   * Creates the removal listener which is attached to the cache.
+   *
+   * @return cache entry removal listener.
+   */
+  private RemovalListener<String, FSFilterStreamContext> createCacheRemoveListener()
+  {
+    //When an entry is removed from the cache, removal listener is notified and it closes the output stream.
+    return new RemovalListener<String, FSFilterStreamContext>()
+    {
+      @Override
+      public void onRemoval(@Nonnull RemovalNotification<String, FSFilterStreamContext> notification)
+      {
+        FSFilterStreamContext streamContext = notification.getValue();
+        if (streamContext != null) {
+          try {
+            String filename = notification.getKey();
+            String partFileName = getPartFileNamePri(filename);
+
+            LOG.debug("closing {}", partFileName);
+            long start = System.currentTimeMillis();
+
+            closeStream(streamContext);
+            filesWithOpenStreams.remove(filename);
+
+            totalWritingTime += System.currentTimeMillis() - start;
+          } catch (IOException e) {
+            throw new RuntimeException(e);
+          }
+        }
+      }
+    };
+  }
+
+  /**
+   * Opens the stream for the specified file path in either append mode or create mode.
+   *
+   * @param filepath this is the path of either the actual file or the corresponding temporary file.
+   * @param append   true for opening the file in append mode; false otherwise.
+   * @return output stream.
+   * @throws IOException
+   */
+  protected FSDataOutputStream openStream(Path filepath, boolean append) throws IOException
+  {
+    FSDataOutputStream fsOutput;
+    if (append) {
+      fsOutput = fs.append(filepath);
+    } else {
+      fsOutput = fs.create(filepath, (short)replication);
+      fs.setPermission(filepath, FsPermission.createImmutable(filePermission));
+    }
+    return fsOutput;
+  }
+
+  /**
+   * Closes the stream which has been removed from the cache.
+   *
+   * @param streamContext stream context which is removed from the cache.
+   * @throws IOException
+   */
+  protected void closeStream(FSFilterStreamContext streamContext) throws IOException
+  {
+    streamContext.close();
   }
 
   /**
@@ -592,12 +643,12 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
 
       MutableInt part = finalizedPart.get(fileName);
       if (part == null) {
-        part = new MutableInt();
+        part = new MutableInt(-1);
         finalizedPart.put(fileName, part);
       }
       MutableInt currentOpenPart = openPart.get(fileName);
 
-      for (int x = part.getValue() + 1; x < currentOpenPart.getValue(); x++) {
+      for (int x = part.getValue() + 1; x <= currentOpenPart.getValue(); x++) {
         String prevPartNotFinalized = getPartFileName(fileName, x);
         LOG.debug("request finalize {}", prevPartNotFinalized);
         filesPerWindow.add(prevPartNotFinalized);
@@ -619,12 +670,11 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
     //Close all the streams you can
     Map<String, FSFilterStreamContext> openStreams = streamsCache.asMap();
     for(String seenFileName: openStreams.keySet()) {
-      //FilterOutputStream filterStream = openStreams.get(seenFileName).getFilterStream();
       FSFilterStreamContext fsFilterStreamContext = openStreams.get(seenFileName);
       try {
         long start = System.currentTimeMillis();
-        //filterStream.close();
-        fsFilterStreamContext.close();
+        closeStream(fsFilterStreamContext);
+        filesWithOpenStreams.remove(seenFileName);
         totalWritingTime += System.currentTimeMillis() - start;
       }
       catch (IOException ex) {
@@ -727,11 +777,7 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
       }
 
       count.add(1);
-    }
-    catch (IOException ex) {
-      throw new RuntimeException(ex);
-    }
-    catch (ExecutionException ex) {
+    } catch (IOException | ExecutionException ex) {
       throw new RuntimeException(ex);
     }
   }
@@ -743,24 +789,19 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
    * @throws IOException
    * @throws ExecutionException
    */
-  protected void rotate(String fileName) throws IllegalArgumentException,
-                                                IOException,
-                                                ExecutionException
+  protected void rotate(String fileName) throws IllegalArgumentException, IOException, ExecutionException
   {
     requestFinalize(fileName);
     counts.remove(fileName);
     streamsCache.invalidate(fileName);
     MutableInt mi = openPart.get(fileName);
-    int rotatedFileIndex = mi.getValue();
-    mi.add(1);
-    LOG.debug("Part file index: {}", openPart);
-    endOffsets.get(fileName).setValue(0L);
-    String partFileName = getPartFileName(fileName, rotatedFileIndex);
+    LOG.debug("Part file rotated {} : {}", fileName, mi.getValue());
+
+    //TODO: remove this as rotateHook is deprecated.
+    String partFileName = getPartFileName(fileName, mi.getValue());
     rotateHook(partFileName);
 
-    if (rotationWindows > 0) {
-      getRotationState(fileName).rotated = true;
-    }
+    getRotationState(fileName).rotated = true;
   }
 
   private RotationState getRotationState(String fileName)
@@ -778,6 +819,7 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
    * the name of the file part that has just completed closed.
    * @param finishedFile The name of the file part that has just completed and closed.
    */
+  @Deprecated
   protected void rotateHook(@SuppressWarnings("unused") String finishedFile)
   {
     //Do nothing by default
@@ -887,15 +929,10 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
           if (rotate) {
             try {
               rotate(filename);
-            } catch (IOException e) {
-              throw new RuntimeException(e);
-            } catch (ExecutionException e) {
+            } catch (IOException | ExecutionException e) {
               throw new RuntimeException(e);
             }
           }
-          if (rotationState != null) {
-            rotationState.rotated = false;
-          }
         }
       }
     }
@@ -1047,7 +1084,7 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
     TOTAL_TIME_WRITING_MILLISECONDS
   }
 
-  private class FSFilterStreamContext implements FilterStreamContext<FilterOutputStream>
+  protected class FSFilterStreamContext implements FilterStreamContext<FilterOutputStream>
   {
 
     private FSDataOutputStream outputStream;