You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by ga...@apache.org on 2015/09/05 01:34:02 UTC

[1/2] incubator-apex-malhar git commit: MLHR-1837 #resolve #comment creating a new file even when it is not corrupted and also closing stale streams

Repository: incubator-apex-malhar
Updated Branches:
  refs/heads/devel-3 30e4b1ff6 -> d910102d7


MLHR-1837 #resolve #comment creating a new file even when it is not corrupted and also closing stale streams


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/2a7d36dc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/2a7d36dc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/2a7d36dc

Branch: refs/heads/devel-3
Commit: 2a7d36dc4157aa6837b6e5706f0c014ce9389524
Parents: 30e4b1f
Author: Chandni Singh <cs...@apache.org>
Authored: Thu Sep 3 22:48:25 2015 -0700
Committer: Chandni Singh <cs...@apache.org>
Committed: Thu Sep 3 22:52:32 2015 -0700

----------------------------------------------------------------------
 .../lib/io/fs/AbstractFileOutputOperator.java   | 44 ++++++++++++-
 .../io/fs/AbstractFileOutputOperatorTest.java   | 65 ++++++++++----------
 2 files changed, 75 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/2a7d36dc/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
index a589751..aef0739 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
@@ -20,6 +20,7 @@ import java.io.IOException;
 import java.io.OutputStream;
 import java.util.*;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 
 import javax.annotation.Nonnull;
 import javax.validation.constraints.Min;
@@ -31,6 +32,7 @@ import com.google.common.cache.*;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
+import org.apache.commons.io.IOUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -226,6 +228,17 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
   protected long currentWindow;
 
   /**
+   * The stream is expired (closed and evicted from cache) after the specified duration has passed since it was last
+   * accessed by a read or write.
+   * <p/>
+   * https://code.google.com/p/guava-libraries/wiki/CachesExplained <br/>
+   * Caches built with CacheBuilder do not perform cleanup and evict values "automatically," or instantly after a value expires, or anything of the sort.
+   * Instead, it performs small amounts of maintenance during write operations, or during occasional read operations if writes are rare.<br/>
+   * This isn't the most effective way but adds a little bit of optimization.
+   */
+  private Long expireStreamAfterAcessMillis;
+
+  /**
    * This input port receives incoming tuples.
    */
   public final transient DefaultInputPort<INPUT> input = new DefaultInputPort<INPUT>()
@@ -286,6 +299,9 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
   public void setup(Context.OperatorContext context)
   {
     LOG.debug("setup initiated");
+    if (expireStreamAfterAcessMillis == null) {
+      expireStreamAfterAcessMillis = (long)(context.getValue(OperatorContext.SPIN_MILLIS) * context.getValue(Context.DAGContext.CHECKPOINT_WINDOW_COUNT));
+    }
     rollingFile = (maxLength < Long.MAX_VALUE) || (rotationWindows > 0);
 
     //Getting required file system instance.
@@ -416,13 +432,13 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
       }
     };
 
-    streamsCache = CacheBuilder.newBuilder().maximumSize(maxOpenFiles).removalListener(removalListener).build(loader);
+    streamsCache = CacheBuilder.newBuilder().maximumSize(maxOpenFiles).expireAfterAccess(expireStreamAfterAcessMillis, TimeUnit.MILLISECONDS).removalListener(removalListener).build(loader);
 
     try {
       LOG.debug("File system class: {}", fs.getClass());
       LOG.debug("end-offsets {}", endOffsets);
 
-      //Restore the files in case they were corrupted and the operator
+      //Restore the files in case they were corrupted and the operator was re-deployed.
       Path writerPath = new Path(filePath);
       if (fs.exists(writerPath)) {
         for (String seenFileName : endOffsets.keySet()) {
@@ -473,6 +489,13 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
                 fileContext.rename(recoveryFilePath, status.getPath(), Options.Rename.OVERWRITE);
               }
             } else {
+              if (alwaysWriteToTmp) {
+                String currentTmp = seenFileNamePart + '.' + System.currentTimeMillis() + TMP_EXTENSION;
+                FSDataOutputStream outputStream = fs.create(new Path(filePath + Path.SEPARATOR + currentTmp));
+                IOUtils.copy(inputStream, outputStream);
+                outputStream.close();
+                fileNameToTmpName.put(seenFileNamePart, currentTmp);
+              }
               inputStream.close();
             }
           }
@@ -1188,6 +1211,23 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
   {
     return finalizedFiles;
   }
+
+  public Long getExpireStreamAfterAccessMillis()
+  {
+    return expireStreamAfterAcessMillis;
+  }
+
+  /**
+   * Sets the duration after which the stream is expired (closed and removed from the cache) since it was last accessed
+   * by a read or write.
+   *
+   * @param millis time in millis.
+   */
+  public void setExpireStreamAfterAccessMillis(Long millis)
+  {
+    this.expireStreamAfterAcessMillis = millis;
+  }
+
 /**
    * Return the filter to use. If this method returns a filter the filter is applied to data before the data is stored
    * in the file. If it returns null no filter is applied and data is written as is. Override this method to provide

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/2a7d36dc/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java
index 98f0152..b900af2 100644
--- a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java
@@ -45,9 +45,7 @@ import com.datatorrent.lib.helper.OperatorContextTestHelper;
 import com.datatorrent.lib.testbench.RandomWordGenerator;
 import com.datatorrent.lib.util.TestUtils.TestInfo;
 
-import com.datatorrent.api.DAG;
-import com.datatorrent.api.LocalMode;
-import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.*;
 
 import com.datatorrent.netlet.util.DTThrowable;
 
@@ -61,12 +59,10 @@ public class AbstractFileOutputOperatorTest
 
   @Rule public FSTestWatcher testMeta = new FSTestWatcher();
 
-  public static OperatorContextTestHelper.TestIdOperatorContext testOperatorContext =
-                new OperatorContextTestHelper.TestIdOperatorContext(0);
-
   public static class FSTestWatcher extends TestInfo
   {
     public boolean writeToTmp = false;
+    public OperatorContextTestHelper.TestIdOperatorContext testOperatorContext;
 
     @Override
     protected void starting(Description description)
@@ -74,6 +70,11 @@ public class AbstractFileOutputOperatorTest
       super.starting(description);
       try {
         FileUtils.forceMkdir(new File(getDir()));
+        Attribute.AttributeMap.DefaultAttributeMap attributeMap = new Attribute.AttributeMap.DefaultAttributeMap();
+        attributeMap.put(Context.DAGContext.CHECKPOINT_WINDOW_COUNT, 60);
+        attributeMap.put(Context.OperatorContext.SPIN_MILLIS, 500);
+
+        testOperatorContext = new OperatorContextTestHelper.TestIdOperatorContext(0, attributeMap);
       } catch (IOException e) {
         throw new RuntimeException(e);
       }
@@ -347,7 +348,7 @@ public class AbstractFileOutputOperatorTest
   {
     writer.setFilePath(testMeta.getDir());
     writer.setAlwaysWriteToTmp(testMeta.writeToTmp);
-    writer.setup(testOperatorContext);
+    writer.setup(testMeta.testOperatorContext);
 
     writer.beginWindow(0);
     writer.input.put(0);
@@ -428,7 +429,7 @@ public class AbstractFileOutputOperatorTest
     File meta = new File(testMeta.getDir());
     writer.setFilePath(meta.getAbsolutePath());
     writer.setAlwaysWriteToTmp(testMeta.writeToTmp);
-    writer.setup(testOperatorContext);
+    writer.setup(testMeta.testOperatorContext);
 
     writer.beginWindow(0);
     writer.input.put(0);
@@ -444,7 +445,7 @@ public class AbstractFileOutputOperatorTest
     writer.teardown();
 
     restoreCheckPoint(checkPointWriter, writer);
-    writer.setup(testOperatorContext);
+    writer.setup(testMeta.testOperatorContext);
 
     writer.beginWindow(1);
     writer.input.put(4);
@@ -635,7 +636,7 @@ public class AbstractFileOutputOperatorTest
     File meta = new File(testMeta.getDir());
     writer.setFilePath(meta.getAbsolutePath());
     writer.setAlwaysWriteToTmp(testMeta.writeToTmp);
-    writer.setup(testOperatorContext);
+    writer.setup(testMeta.testOperatorContext);
 
     writer.beginWindow(0);
     writer.input.put(0);
@@ -661,7 +662,7 @@ public class AbstractFileOutputOperatorTest
     File meta = new File(testMeta.getDir());
     writer.setFilePath(meta.getAbsolutePath());
     writer.setAlwaysWriteToTmp(testMeta.writeToTmp);
-    writer.setup(testOperatorContext);
+    writer.setup(testMeta.testOperatorContext);
 
     writer.beginWindow(0);
     writer.input.put(0);
@@ -762,7 +763,7 @@ public class AbstractFileOutputOperatorTest
     File meta = new File(testMeta.getDir());
     writer.setFilePath(meta.getAbsolutePath());
     writer.setAlwaysWriteToTmp(testMeta.writeToTmp);
-    writer.setup(testOperatorContext);
+    writer.setup(testMeta.testOperatorContext);
 
     writer.beginWindow(0);
     writer.input.put(0);
@@ -783,7 +784,7 @@ public class AbstractFileOutputOperatorTest
 
     restoreCheckPoint(checkPointWriter,
                       writer);
-    writer.setup(testOperatorContext);
+    writer.setup(testMeta.testOperatorContext);
 
     writer.beginWindow(2);
     writer.input.put(6);
@@ -889,7 +890,7 @@ public class AbstractFileOutputOperatorTest
     writer.setFilePath(testMeta.getDir());
     writer.setMaxLength(4);
     writer.setAlwaysWriteToTmp(testMeta.writeToTmp);
-    writer.setup(testOperatorContext);
+    writer.setup(testMeta.testOperatorContext);
 
     writer.beginWindow(0);
     writer.input.put(0);
@@ -960,7 +961,7 @@ public class AbstractFileOutputOperatorTest
     writer.setMaxLength(4);
     writer.setFilePath(testMeta.getDir());
     writer.setAlwaysWriteToTmp(testMeta.writeToTmp);
-    writer.setup(testOperatorContext);
+    writer.setup(testMeta.testOperatorContext);
 
     writer.beginWindow(0);
     writer.input.put(0);
@@ -978,7 +979,7 @@ public class AbstractFileOutputOperatorTest
 
     restoreCheckPoint(checkPointWriter,
                       writer);
-    writer.setup(testOperatorContext);
+    writer.setup(testMeta.testOperatorContext);
 
     writer.beginWindow(1);
     writer.input.put(3);
@@ -1003,7 +1004,7 @@ public class AbstractFileOutputOperatorTest
     writer.setFilePath(testMeta.getDir());
     writer.setMaxLength(4);
     writer.setAlwaysWriteToTmp(testMeta.writeToTmp);
-    writer.setup(testOperatorContext);
+    writer.setup(testMeta.testOperatorContext);
 
     writer.beginWindow(0);
     writer.input.put(0);
@@ -1028,7 +1029,7 @@ public class AbstractFileOutputOperatorTest
     restoreCheckPoint(checkPointWriter,
                       writer);
     LOG.debug("Checkpoint endOffsets={}", checkPointWriter.endOffsets);
-    writer.setup(testOperatorContext);
+    writer.setup(testMeta.testOperatorContext);
 
     writer.beginWindow(2);
     writer.input.put(5);
@@ -1043,7 +1044,7 @@ public class AbstractFileOutputOperatorTest
     writer.teardown();
 
     restoreCheckPoint(checkPointWriter1, writer);
-    writer.setup(testOperatorContext);
+    writer.setup(testMeta.testOperatorContext);
     writer.committed(2);
 
     String singleFilePath = testMeta.getDir() + File.separator + SINGLE_FILE;
@@ -1143,7 +1144,7 @@ public class AbstractFileOutputOperatorTest
     File meta = new File(testMeta.getDir());
     writer.setFilePath(meta.getAbsolutePath());
 
-    writer.setup(testOperatorContext);
+    writer.setup(testMeta.testOperatorContext);
 
     writer.beginWindow(0);
     writer.input.put(0);
@@ -1211,7 +1212,7 @@ public class AbstractFileOutputOperatorTest
     File meta = new File(testMeta.getDir());
     writer.setFilePath(meta.getAbsolutePath());
     writer.setAlwaysWriteToTmp(testMeta.writeToTmp);
-    writer.setup(testOperatorContext);
+    writer.setup(testMeta.testOperatorContext);
 
     writer.beginWindow(0);
     writer.input.put(0);
@@ -1356,7 +1357,7 @@ public class AbstractFileOutputOperatorTest
     writer.setMaxLength(4);
     writer.setAlwaysWriteToTmp(testMeta.writeToTmp);
 
-    writer.setup(testOperatorContext);
+    writer.setup(testMeta.testOperatorContext);
 
     writer.beginWindow(0);
     writer.input.put(0);
@@ -1372,7 +1373,7 @@ public class AbstractFileOutputOperatorTest
 
     restoreCheckPoint(checkPointWriter,
                       writer);
-    writer.setup(testOperatorContext);
+    writer.setup(testMeta.testOperatorContext);
 
     writer.beginWindow(1);
     writer.input.put(2);
@@ -1476,7 +1477,7 @@ public class AbstractFileOutputOperatorTest
     writer.setFilePath(meta.getAbsolutePath());
     writer.setMaxLength(4);
     writer.setAlwaysWriteToTmp(testMeta.writeToTmp);
-    writer.setup(testOperatorContext);
+    writer.setup(testMeta.testOperatorContext);
 
     writer.beginWindow(0);
     writer.input.put(0);
@@ -1492,7 +1493,7 @@ public class AbstractFileOutputOperatorTest
 
     restoreCheckPoint(checkPointWriter,
                       writer);
-    writer.setup(testOperatorContext);
+    writer.setup(testMeta.testOperatorContext);
 
     writer.beginWindow(1);
     writer.input.put(4);
@@ -1528,7 +1529,7 @@ public class AbstractFileOutputOperatorTest
     writer.setFilePath(meta.getAbsolutePath());
     writer.setMaxLength(4);
     writer.setAlwaysWriteToTmp(testMeta.writeToTmp);
-    writer.setup(testOperatorContext);
+    writer.setup(testMeta.testOperatorContext);
 
     writer.beginWindow(0);
     writer.input.process(0);
@@ -1544,7 +1545,7 @@ public class AbstractFileOutputOperatorTest
 
     restoreCheckPoint(checkPointWriter,
                       writer);
-    writer.setup(testOperatorContext);
+    writer.setup(testMeta.testOperatorContext);
 
     writer.beginWindow(1);
     writer.input.process(4);
@@ -1662,7 +1663,7 @@ public class AbstractFileOutputOperatorTest
   private void singleFileMultiRollingFailureHelper(SingleHDFSExactlyOnceWriter writer)
   {
     writer.setAlwaysWriteToTmp(testMeta.writeToTmp);
-    writer.setup(testOperatorContext);
+    writer.setup(testMeta.testOperatorContext);
 
     writer.beginWindow(0);
     writer.input.put(0);
@@ -1690,7 +1691,7 @@ public class AbstractFileOutputOperatorTest
 
     restoreCheckPoint(checkPointWriter,
                       writer);
-    writer.setup(testOperatorContext);
+    writer.setup(testMeta.testOperatorContext);
 
     writer.beginWindow(1);
     writer.input.put(0);
@@ -1746,7 +1747,7 @@ public class AbstractFileOutputOperatorTest
     writer.setFilePath(testMeta.getDir());
     writer.setRotationWindows(30);
     writer.setAlwaysWriteToTmp(testMeta.writeToTmp);
-    writer.setup(testOperatorContext);
+    writer.setup(testMeta.testOperatorContext);
 
     // Check that rotation doesn't happen prematurely
     for (int i = 0; i < 30; ++i) {
@@ -1823,7 +1824,7 @@ public class AbstractFileOutputOperatorTest
     
     writer.setFilePath(testMeta.getDir());
     writer.setAlwaysWriteToTmp(false);
-    writer.setup(testOperatorContext);
+    writer.setup(testMeta.testOperatorContext);
 
     for (int i = 0; i < 10; ++i) {
       writer.beginWindow(i);
@@ -1982,7 +1983,7 @@ public class AbstractFileOutputOperatorTest
 
     writer.setFilePath(testMeta.getDir());
     writer.setAlwaysWriteToTmp(false);
-    writer.setup(testOperatorContext);
+    writer.setup(testMeta.testOperatorContext);
 
     for (int i = 0; i < 10; ++i) {
       writer.beginWindow(i);


[2/2] incubator-apex-malhar git commit: Merge branch 'MLHR-1837' into devel-3

Posted by ga...@apache.org.
Merge branch 'MLHR-1837' into devel-3


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/d910102d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/d910102d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/d910102d

Branch: refs/heads/devel-3
Commit: d910102d7d0caf8d5b0a44b85448bb7d5774f67d
Parents: 30e4b1f 2a7d36d
Author: Gaurav Gupta <ga...@apache.org>
Authored: Fri Sep 4 16:31:45 2015 -0700
Committer: Gaurav Gupta <ga...@apache.org>
Committed: Fri Sep 4 16:31:45 2015 -0700

----------------------------------------------------------------------
 .../lib/io/fs/AbstractFileOutputOperator.java   | 44 ++++++++++++-
 .../io/fs/AbstractFileOutputOperatorTest.java   | 65 ++++++++++----------
 2 files changed, 75 insertions(+), 34 deletions(-)
----------------------------------------------------------------------