You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by ga...@apache.org on 2015/09/05 01:34:02 UTC
[1/2] incubator-apex-malhar git commit: MLHR-1837 #resolve #comment
creating a new file even when it is not corrupted and also closing stale
streams
Repository: incubator-apex-malhar
Updated Branches:
refs/heads/devel-3 30e4b1ff6 -> d910102d7
MLHR-1837 #resolve #comment creating a new file even when it is not corrupted and also closing stale streams
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/2a7d36dc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/2a7d36dc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/2a7d36dc
Branch: refs/heads/devel-3
Commit: 2a7d36dc4157aa6837b6e5706f0c014ce9389524
Parents: 30e4b1f
Author: Chandni Singh <cs...@apache.org>
Authored: Thu Sep 3 22:48:25 2015 -0700
Committer: Chandni Singh <cs...@apache.org>
Committed: Thu Sep 3 22:52:32 2015 -0700
----------------------------------------------------------------------
.../lib/io/fs/AbstractFileOutputOperator.java | 44 ++++++++++++-
.../io/fs/AbstractFileOutputOperatorTest.java | 65 ++++++++++----------
2 files changed, 75 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/2a7d36dc/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
index a589751..aef0739 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
@@ -20,6 +20,7 @@ import java.io.IOException;
import java.io.OutputStream;
import java.util.*;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import javax.validation.constraints.Min;
@@ -31,6 +32,7 @@ import com.google.common.cache.*;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
+import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -226,6 +228,17 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
protected long currentWindow;
/**
+ * The stream is expired (closed and evicted from cache) after the specified duration has passed since it was last
+ * accessed by a read or write.
+ * <p/>
+ * https://code.google.com/p/guava-libraries/wiki/CachesExplained <br/>
+ * Caches built with CacheBuilder do not perform cleanup and evict values "automatically," or instantly after a value expires, or anything of the sort.
+ * Instead, it performs small amounts of maintenance during write operations, or during occasional read operations if writes are rare.<br/>
+ * This isn't the most effective way but adds a little bit of optimization.
+ */
+ private Long expireStreamAfterAcessMillis;
+
+ /**
* This input port receives incoming tuples.
*/
public final transient DefaultInputPort<INPUT> input = new DefaultInputPort<INPUT>()
@@ -286,6 +299,9 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
public void setup(Context.OperatorContext context)
{
LOG.debug("setup initiated");
+ if (expireStreamAfterAcessMillis == null) {
+ expireStreamAfterAcessMillis = (long)(context.getValue(OperatorContext.SPIN_MILLIS) * context.getValue(Context.DAGContext.CHECKPOINT_WINDOW_COUNT));
+ }
rollingFile = (maxLength < Long.MAX_VALUE) || (rotationWindows > 0);
//Getting required file system instance.
@@ -416,13 +432,13 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
}
};
- streamsCache = CacheBuilder.newBuilder().maximumSize(maxOpenFiles).removalListener(removalListener).build(loader);
+ streamsCache = CacheBuilder.newBuilder().maximumSize(maxOpenFiles).expireAfterAccess(expireStreamAfterAcessMillis, TimeUnit.MILLISECONDS).removalListener(removalListener).build(loader);
try {
LOG.debug("File system class: {}", fs.getClass());
LOG.debug("end-offsets {}", endOffsets);
- //Restore the files in case they were corrupted and the operator
+ //Restore the files in case they were corrupted and the operator was re-deployed.
Path writerPath = new Path(filePath);
if (fs.exists(writerPath)) {
for (String seenFileName : endOffsets.keySet()) {
@@ -473,6 +489,13 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
fileContext.rename(recoveryFilePath, status.getPath(), Options.Rename.OVERWRITE);
}
} else {
+ if (alwaysWriteToTmp) {
+ String currentTmp = seenFileNamePart + '.' + System.currentTimeMillis() + TMP_EXTENSION;
+ FSDataOutputStream outputStream = fs.create(new Path(filePath + Path.SEPARATOR + currentTmp));
+ IOUtils.copy(inputStream, outputStream);
+ outputStream.close();
+ fileNameToTmpName.put(seenFileNamePart, currentTmp);
+ }
inputStream.close();
}
}
@@ -1188,6 +1211,23 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
{
return finalizedFiles;
}
+
+ public Long getExpireStreamAfterAccessMillis()
+ {
+ return expireStreamAfterAcessMillis;
+ }
+
+ /**
+ * Sets the duration after which the stream is expired (closed and removed from the cache) since it was last accessed
+ * by a read or write.
+ *
+ * @param millis time in millis.
+ */
+ public void setExpireStreamAfterAccessMillis(Long millis)
+ {
+ this.expireStreamAfterAcessMillis = millis;
+ }
+
/**
* Return the filter to use. If this method returns a filter the filter is applied to data before the data is stored
* in the file. If it returns null no filter is applied and data is written as is. Override this method to provide
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/2a7d36dc/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java
index 98f0152..b900af2 100644
--- a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java
@@ -45,9 +45,7 @@ import com.datatorrent.lib.helper.OperatorContextTestHelper;
import com.datatorrent.lib.testbench.RandomWordGenerator;
import com.datatorrent.lib.util.TestUtils.TestInfo;
-import com.datatorrent.api.DAG;
-import com.datatorrent.api.LocalMode;
-import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.*;
import com.datatorrent.netlet.util.DTThrowable;
@@ -61,12 +59,10 @@ public class AbstractFileOutputOperatorTest
@Rule public FSTestWatcher testMeta = new FSTestWatcher();
- public static OperatorContextTestHelper.TestIdOperatorContext testOperatorContext =
- new OperatorContextTestHelper.TestIdOperatorContext(0);
-
public static class FSTestWatcher extends TestInfo
{
public boolean writeToTmp = false;
+ public OperatorContextTestHelper.TestIdOperatorContext testOperatorContext;
@Override
protected void starting(Description description)
@@ -74,6 +70,11 @@ public class AbstractFileOutputOperatorTest
super.starting(description);
try {
FileUtils.forceMkdir(new File(getDir()));
+ Attribute.AttributeMap.DefaultAttributeMap attributeMap = new Attribute.AttributeMap.DefaultAttributeMap();
+ attributeMap.put(Context.DAGContext.CHECKPOINT_WINDOW_COUNT, 60);
+ attributeMap.put(Context.OperatorContext.SPIN_MILLIS, 500);
+
+ testOperatorContext = new OperatorContextTestHelper.TestIdOperatorContext(0, attributeMap);
} catch (IOException e) {
throw new RuntimeException(e);
}
@@ -347,7 +348,7 @@ public class AbstractFileOutputOperatorTest
{
writer.setFilePath(testMeta.getDir());
writer.setAlwaysWriteToTmp(testMeta.writeToTmp);
- writer.setup(testOperatorContext);
+ writer.setup(testMeta.testOperatorContext);
writer.beginWindow(0);
writer.input.put(0);
@@ -428,7 +429,7 @@ public class AbstractFileOutputOperatorTest
File meta = new File(testMeta.getDir());
writer.setFilePath(meta.getAbsolutePath());
writer.setAlwaysWriteToTmp(testMeta.writeToTmp);
- writer.setup(testOperatorContext);
+ writer.setup(testMeta.testOperatorContext);
writer.beginWindow(0);
writer.input.put(0);
@@ -444,7 +445,7 @@ public class AbstractFileOutputOperatorTest
writer.teardown();
restoreCheckPoint(checkPointWriter, writer);
- writer.setup(testOperatorContext);
+ writer.setup(testMeta.testOperatorContext);
writer.beginWindow(1);
writer.input.put(4);
@@ -635,7 +636,7 @@ public class AbstractFileOutputOperatorTest
File meta = new File(testMeta.getDir());
writer.setFilePath(meta.getAbsolutePath());
writer.setAlwaysWriteToTmp(testMeta.writeToTmp);
- writer.setup(testOperatorContext);
+ writer.setup(testMeta.testOperatorContext);
writer.beginWindow(0);
writer.input.put(0);
@@ -661,7 +662,7 @@ public class AbstractFileOutputOperatorTest
File meta = new File(testMeta.getDir());
writer.setFilePath(meta.getAbsolutePath());
writer.setAlwaysWriteToTmp(testMeta.writeToTmp);
- writer.setup(testOperatorContext);
+ writer.setup(testMeta.testOperatorContext);
writer.beginWindow(0);
writer.input.put(0);
@@ -762,7 +763,7 @@ public class AbstractFileOutputOperatorTest
File meta = new File(testMeta.getDir());
writer.setFilePath(meta.getAbsolutePath());
writer.setAlwaysWriteToTmp(testMeta.writeToTmp);
- writer.setup(testOperatorContext);
+ writer.setup(testMeta.testOperatorContext);
writer.beginWindow(0);
writer.input.put(0);
@@ -783,7 +784,7 @@ public class AbstractFileOutputOperatorTest
restoreCheckPoint(checkPointWriter,
writer);
- writer.setup(testOperatorContext);
+ writer.setup(testMeta.testOperatorContext);
writer.beginWindow(2);
writer.input.put(6);
@@ -889,7 +890,7 @@ public class AbstractFileOutputOperatorTest
writer.setFilePath(testMeta.getDir());
writer.setMaxLength(4);
writer.setAlwaysWriteToTmp(testMeta.writeToTmp);
- writer.setup(testOperatorContext);
+ writer.setup(testMeta.testOperatorContext);
writer.beginWindow(0);
writer.input.put(0);
@@ -960,7 +961,7 @@ public class AbstractFileOutputOperatorTest
writer.setMaxLength(4);
writer.setFilePath(testMeta.getDir());
writer.setAlwaysWriteToTmp(testMeta.writeToTmp);
- writer.setup(testOperatorContext);
+ writer.setup(testMeta.testOperatorContext);
writer.beginWindow(0);
writer.input.put(0);
@@ -978,7 +979,7 @@ public class AbstractFileOutputOperatorTest
restoreCheckPoint(checkPointWriter,
writer);
- writer.setup(testOperatorContext);
+ writer.setup(testMeta.testOperatorContext);
writer.beginWindow(1);
writer.input.put(3);
@@ -1003,7 +1004,7 @@ public class AbstractFileOutputOperatorTest
writer.setFilePath(testMeta.getDir());
writer.setMaxLength(4);
writer.setAlwaysWriteToTmp(testMeta.writeToTmp);
- writer.setup(testOperatorContext);
+ writer.setup(testMeta.testOperatorContext);
writer.beginWindow(0);
writer.input.put(0);
@@ -1028,7 +1029,7 @@ public class AbstractFileOutputOperatorTest
restoreCheckPoint(checkPointWriter,
writer);
LOG.debug("Checkpoint endOffsets={}", checkPointWriter.endOffsets);
- writer.setup(testOperatorContext);
+ writer.setup(testMeta.testOperatorContext);
writer.beginWindow(2);
writer.input.put(5);
@@ -1043,7 +1044,7 @@ public class AbstractFileOutputOperatorTest
writer.teardown();
restoreCheckPoint(checkPointWriter1, writer);
- writer.setup(testOperatorContext);
+ writer.setup(testMeta.testOperatorContext);
writer.committed(2);
String singleFilePath = testMeta.getDir() + File.separator + SINGLE_FILE;
@@ -1143,7 +1144,7 @@ public class AbstractFileOutputOperatorTest
File meta = new File(testMeta.getDir());
writer.setFilePath(meta.getAbsolutePath());
- writer.setup(testOperatorContext);
+ writer.setup(testMeta.testOperatorContext);
writer.beginWindow(0);
writer.input.put(0);
@@ -1211,7 +1212,7 @@ public class AbstractFileOutputOperatorTest
File meta = new File(testMeta.getDir());
writer.setFilePath(meta.getAbsolutePath());
writer.setAlwaysWriteToTmp(testMeta.writeToTmp);
- writer.setup(testOperatorContext);
+ writer.setup(testMeta.testOperatorContext);
writer.beginWindow(0);
writer.input.put(0);
@@ -1356,7 +1357,7 @@ public class AbstractFileOutputOperatorTest
writer.setMaxLength(4);
writer.setAlwaysWriteToTmp(testMeta.writeToTmp);
- writer.setup(testOperatorContext);
+ writer.setup(testMeta.testOperatorContext);
writer.beginWindow(0);
writer.input.put(0);
@@ -1372,7 +1373,7 @@ public class AbstractFileOutputOperatorTest
restoreCheckPoint(checkPointWriter,
writer);
- writer.setup(testOperatorContext);
+ writer.setup(testMeta.testOperatorContext);
writer.beginWindow(1);
writer.input.put(2);
@@ -1476,7 +1477,7 @@ public class AbstractFileOutputOperatorTest
writer.setFilePath(meta.getAbsolutePath());
writer.setMaxLength(4);
writer.setAlwaysWriteToTmp(testMeta.writeToTmp);
- writer.setup(testOperatorContext);
+ writer.setup(testMeta.testOperatorContext);
writer.beginWindow(0);
writer.input.put(0);
@@ -1492,7 +1493,7 @@ public class AbstractFileOutputOperatorTest
restoreCheckPoint(checkPointWriter,
writer);
- writer.setup(testOperatorContext);
+ writer.setup(testMeta.testOperatorContext);
writer.beginWindow(1);
writer.input.put(4);
@@ -1528,7 +1529,7 @@ public class AbstractFileOutputOperatorTest
writer.setFilePath(meta.getAbsolutePath());
writer.setMaxLength(4);
writer.setAlwaysWriteToTmp(testMeta.writeToTmp);
- writer.setup(testOperatorContext);
+ writer.setup(testMeta.testOperatorContext);
writer.beginWindow(0);
writer.input.process(0);
@@ -1544,7 +1545,7 @@ public class AbstractFileOutputOperatorTest
restoreCheckPoint(checkPointWriter,
writer);
- writer.setup(testOperatorContext);
+ writer.setup(testMeta.testOperatorContext);
writer.beginWindow(1);
writer.input.process(4);
@@ -1662,7 +1663,7 @@ public class AbstractFileOutputOperatorTest
private void singleFileMultiRollingFailureHelper(SingleHDFSExactlyOnceWriter writer)
{
writer.setAlwaysWriteToTmp(testMeta.writeToTmp);
- writer.setup(testOperatorContext);
+ writer.setup(testMeta.testOperatorContext);
writer.beginWindow(0);
writer.input.put(0);
@@ -1690,7 +1691,7 @@ public class AbstractFileOutputOperatorTest
restoreCheckPoint(checkPointWriter,
writer);
- writer.setup(testOperatorContext);
+ writer.setup(testMeta.testOperatorContext);
writer.beginWindow(1);
writer.input.put(0);
@@ -1746,7 +1747,7 @@ public class AbstractFileOutputOperatorTest
writer.setFilePath(testMeta.getDir());
writer.setRotationWindows(30);
writer.setAlwaysWriteToTmp(testMeta.writeToTmp);
- writer.setup(testOperatorContext);
+ writer.setup(testMeta.testOperatorContext);
// Check that rotation doesn't happen prematurely
for (int i = 0; i < 30; ++i) {
@@ -1823,7 +1824,7 @@ public class AbstractFileOutputOperatorTest
writer.setFilePath(testMeta.getDir());
writer.setAlwaysWriteToTmp(false);
- writer.setup(testOperatorContext);
+ writer.setup(testMeta.testOperatorContext);
for (int i = 0; i < 10; ++i) {
writer.beginWindow(i);
@@ -1982,7 +1983,7 @@ public class AbstractFileOutputOperatorTest
writer.setFilePath(testMeta.getDir());
writer.setAlwaysWriteToTmp(false);
- writer.setup(testOperatorContext);
+ writer.setup(testMeta.testOperatorContext);
for (int i = 0; i < 10; ++i) {
writer.beginWindow(i);
[2/2] incubator-apex-malhar git commit: Merge branch 'MLHR-1837' into
devel-3
Posted by ga...@apache.org.
Merge branch 'MLHR-1837' into devel-3
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/d910102d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/d910102d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/d910102d
Branch: refs/heads/devel-3
Commit: d910102d7d0caf8d5b0a44b85448bb7d5774f67d
Parents: 30e4b1f 2a7d36d
Author: Gaurav Gupta <ga...@apache.org>
Authored: Fri Sep 4 16:31:45 2015 -0700
Committer: Gaurav Gupta <ga...@apache.org>
Committed: Fri Sep 4 16:31:45 2015 -0700
----------------------------------------------------------------------
.../lib/io/fs/AbstractFileOutputOperator.java | 44 ++++++++++++-
.../io/fs/AbstractFileOutputOperatorTest.java | 65 ++++++++++----------
2 files changed, 75 insertions(+), 34 deletions(-)
----------------------------------------------------------------------