You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/05/06 20:33:22 UTC
incubator-apex-malhar git commit: APEXMALHAR-2080 APEXMALHAR-2079
fixed the typo and increased the default value of
expireStreamAfterAccessMillis
Repository: incubator-apex-malhar
Updated Branches:
refs/heads/master 2138512bb -> 72de84005
APEXMALHAR-2080 APEXMALHAR-2079 fixed the typo and increased the default value of expireStreamAfterAccessMillis
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/72de8400
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/72de8400
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/72de8400
Branch: refs/heads/master
Commit: 72de84005945ce6e588c85a699183380016791f2
Parents: 2138512
Author: Chandni Singh <cs...@apache.org>
Authored: Thu May 5 10:23:48 2016 -0700
Committer: Chandni Singh <cs...@apache.org>
Committed: Thu May 5 10:23:48 2016 -0700
----------------------------------------------------------------------
.../lib/io/fs/AbstractFileOutputOperator.java | 15 ++++++++-------
1 file changed, 8 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/72de8400/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
index 15a208f..0195f7f 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
@@ -257,7 +257,7 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
* operations, or during occasional read operations if writes are rare.<br/>
* This isn't the most effective way but adds a little bit of optimization.
*/
- private Long expireStreamAfterAcessMillis;
+ private Long expireStreamAfterAccessMillis;
private final Set<String> filesWithOpenStreams;
/**
@@ -321,8 +321,8 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
public void setup(Context.OperatorContext context)
{
LOG.debug("setup initiated");
- if (expireStreamAfterAcessMillis == null) {
- expireStreamAfterAcessMillis = (long)(context.getValue(OperatorContext.SPIN_MILLIS) *
+ if (expireStreamAfterAccessMillis == null) {
+ expireStreamAfterAccessMillis = (long)(context.getValue(Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS) *
context.getValue(Context.DAGContext.CHECKPOINT_WINDOW_COUNT));
}
rollingFile = (maxLength < Long.MAX_VALUE) || (rotationWindows > 0);
@@ -343,7 +343,7 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
//building cache
RemovalListener<String, FSFilterStreamContext> removalListener = createCacheRemoveListener();
CacheLoader<String, FSFilterStreamContext> loader = createCacheLoader();
- streamsCache = CacheBuilder.newBuilder().maximumSize(maxOpenFiles).expireAfterAccess(expireStreamAfterAcessMillis,
+ streamsCache = CacheBuilder.newBuilder().maximumSize(maxOpenFiles).expireAfterAccess(expireStreamAfterAccessMillis,
TimeUnit.MILLISECONDS).removalListener(removalListener).build(loader);
LOG.debug("File system class: {}", fs.getClass());
@@ -608,7 +608,7 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
String filename = notification.getKey();
String partFileName = getPartFileNamePri(filename);
- LOG.debug("closing {}", partFileName);
+ LOG.info("closing {}", partFileName);
long start = System.currentTimeMillis();
closeStream(streamContext);
@@ -616,6 +616,7 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
totalWritingTime += System.currentTimeMillis() - start;
} catch (IOException e) {
+ LOG.error("removing {}", notification.getValue(), e);
throw new RuntimeException(e);
}
}
@@ -1292,7 +1293,7 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
public Long getExpireStreamAfterAccessMillis()
{
- return expireStreamAfterAcessMillis;
+ return expireStreamAfterAccessMillis;
}
/**
@@ -1303,7 +1304,7 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
*/
public void setExpireStreamAfterAccessMillis(Long millis)
{
- this.expireStreamAfterAcessMillis = millis;
+ this.expireStreamAfterAccessMillis = millis;
}
/**