You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/05/06 20:33:22 UTC

incubator-apex-malhar git commit: APEXMALHAR-2080 APEXMALHAR-2079 fixed the typo and increased the default value of expireStreamAfterAccessMillis

Repository: incubator-apex-malhar
Updated Branches:
  refs/heads/master 2138512bb -> 72de84005


APEXMALHAR-2080 APEXMALHAR-2079 fixed the typo and increased the default value of expireStreamAfterAccessMillis


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/72de8400
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/72de8400
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/72de8400

Branch: refs/heads/master
Commit: 72de84005945ce6e588c85a699183380016791f2
Parents: 2138512
Author: Chandni Singh <cs...@apache.org>
Authored: Thu May 5 10:23:48 2016 -0700
Committer: Chandni Singh <cs...@apache.org>
Committed: Thu May 5 10:23:48 2016 -0700

----------------------------------------------------------------------
 .../lib/io/fs/AbstractFileOutputOperator.java        | 15 ++++++++-------
 1 file changed, 8 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/72de8400/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
index 15a208f..0195f7f 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
@@ -257,7 +257,7 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
    * operations, or during occasional read operations if writes are rare.<br/>
    * This isn't the most effective way but adds a little bit of optimization.
    */
-  private Long expireStreamAfterAcessMillis;
+  private Long expireStreamAfterAccessMillis;
   private final Set<String> filesWithOpenStreams;
 
   /**
@@ -321,8 +321,8 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
   public void setup(Context.OperatorContext context)
   {
     LOG.debug("setup initiated");
-    if (expireStreamAfterAcessMillis == null) {
-      expireStreamAfterAcessMillis = (long)(context.getValue(OperatorContext.SPIN_MILLIS) *
+    if (expireStreamAfterAccessMillis == null) {
+      expireStreamAfterAccessMillis = (long)(context.getValue(Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS) *
         context.getValue(Context.DAGContext.CHECKPOINT_WINDOW_COUNT));
     }
     rollingFile = (maxLength < Long.MAX_VALUE) || (rotationWindows > 0);
@@ -343,7 +343,7 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
     //building cache
     RemovalListener<String, FSFilterStreamContext> removalListener = createCacheRemoveListener();
     CacheLoader<String, FSFilterStreamContext> loader = createCacheLoader();
-    streamsCache = CacheBuilder.newBuilder().maximumSize(maxOpenFiles).expireAfterAccess(expireStreamAfterAcessMillis,
+    streamsCache = CacheBuilder.newBuilder().maximumSize(maxOpenFiles).expireAfterAccess(expireStreamAfterAccessMillis,
       TimeUnit.MILLISECONDS).removalListener(removalListener).build(loader);
 
     LOG.debug("File system class: {}", fs.getClass());
@@ -608,7 +608,7 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
             String filename = notification.getKey();
             String partFileName = getPartFileNamePri(filename);
 
-            LOG.debug("closing {}", partFileName);
+            LOG.info("closing {}", partFileName);
             long start = System.currentTimeMillis();
 
             closeStream(streamContext);
@@ -616,6 +616,7 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
 
             totalWritingTime += System.currentTimeMillis() - start;
           } catch (IOException e) {
+            LOG.error("removing {}", notification.getValue(), e);
             throw new RuntimeException(e);
           }
         }
@@ -1292,7 +1293,7 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
 
   public Long getExpireStreamAfterAccessMillis()
   {
-    return expireStreamAfterAcessMillis;
+    return expireStreamAfterAccessMillis;
   }
 
   /**
@@ -1303,7 +1304,7 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
    */
   public void setExpireStreamAfterAccessMillis(Long millis)
   {
-    this.expireStreamAfterAcessMillis = millis;
+    this.expireStreamAfterAccessMillis = millis;
   }
 
 /**