You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by da...@apache.org on 2015/11/30 22:07:17 UTC
[71/98] [abbrv] incubator-apex-malhar git commit: MLHR-1886 #resolve
#comment optimizing recovery and updating rotation states only when new part
is open
MLHR-1886 #resolve #comment optimizing recovery and updating rotation states only when new part is open
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/7156400b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/7156400b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/7156400b
Branch: refs/heads/master
Commit: 7156400be10c2fbda73a33ffe8c6fc03a580a569
Parents: c86e50a
Author: Chandni Singh <cs...@apache.org>
Authored: Fri Oct 30 19:00:42 2015 -0700
Committer: Chandni Singh <cs...@apache.org>
Committed: Thu Nov 5 17:16:15 2015 -0800
----------------------------------------------------------------------
.../lib/io/fs/AbstractFileOutputOperator.java | 361 ++++++++++---------
1 file changed, 199 insertions(+), 162 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/7156400b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
index 3819a33..09294a2 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
@@ -240,6 +240,7 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
* This isn't the most effective way but adds a little bit of optimization.
*/
private Long expireStreamAfterAcessMillis;
+ private final Set<String> filesWithOpenStreams;
/**
* This input port receives incoming tuples.
@@ -264,7 +265,8 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
}
};
- private static class RotationState {
+ private static class RotationState
+ {
boolean notEmpty;
boolean rotated;
}
@@ -278,6 +280,7 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
fileNameToTmpName = Maps.newHashMap();
finalizedFiles = Maps.newTreeMap();
finalizedPart = Maps.newHashMap();
+ filesWithOpenStreams = Sets.newHashSet();
}
/**
@@ -303,7 +306,8 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
{
LOG.debug("setup initiated");
if (expireStreamAfterAcessMillis == null) {
- expireStreamAfterAcessMillis = (long)(context.getValue(OperatorContext.SPIN_MILLIS) * context.getValue(Context.DAGContext.CHECKPOINT_WINDOW_COUNT));
+ expireStreamAfterAcessMillis = (long)(context.getValue(OperatorContext.SPIN_MILLIS) *
+ context.getValue(Context.DAGContext.CHECKPOINT_WINDOW_COUNT));
}
rollingFile = (maxLength < Long.MAX_VALUE) || (rotationWindows > 0);
@@ -321,126 +325,16 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
LOG.debug("FS class {}", fs.getClass());
- //When an entry is removed from the cache, removal listener is notified and it closes the output stream.
- RemovalListener<String, FSFilterStreamContext> removalListener = new RemovalListener<String, FSFilterStreamContext>()
- {
- @Override
- public void onRemoval(@Nonnull RemovalNotification<String, FSFilterStreamContext> notification)
- {
- FSFilterStreamContext streamContext = notification.getValue();
- if (streamContext != null) {
-
- //FilterOutputStream filterStream = streamContext.getFilterStream();
- try {
- String filename = notification.getKey();
- String partFileName = getPartFileNamePri(filename);
-
- LOG.debug("closing {}", partFileName);
- long start = System.currentTimeMillis();
- streamContext.close();
- //filterStream.close();
- totalWritingTime += System.currentTimeMillis() - start;
- }
- catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
- }
- };
-
- //Define cache
- CacheLoader<String, FSFilterStreamContext> loader = new CacheLoader<String, FSFilterStreamContext>()
- {
- @Override
- public FSFilterStreamContext load(@Nonnull String filename)
- {
- String partFileName = getPartFileNamePri(filename);
- Path originalFilePath = new Path(filePath + Path.SEPARATOR + partFileName);
-
- Path activeFilePath;
- if (!alwaysWriteToTmp) {
- activeFilePath = originalFilePath;
- } else {
- //MLHR-1776 : writing to tmp file
- String tmpFileName = fileNameToTmpName.get(partFileName);
- if (tmpFileName == null) {
- tmpFileName = partFileName + '.' + System.currentTimeMillis() + TMP_EXTENSION;
- fileNameToTmpName.put(partFileName, tmpFileName);
- }
- activeFilePath = new Path(filePath + Path.SEPARATOR + tmpFileName);
- }
-
- FSDataOutputStream fsOutput;
-
- boolean sawThisFileBefore = endOffsets.containsKey(filename);
-
- try {
- if (fs.exists(originalFilePath) || (alwaysWriteToTmp && fs.exists(activeFilePath))) {
- if(sawThisFileBefore) {
- FileStatus fileStatus = fs.getFileStatus(activeFilePath);
- MutableLong endOffset = endOffsets.get(filename);
-
- if (endOffset != null) {
- endOffset.setValue(fileStatus.getLen());
- }
- else {
- endOffsets.put(filename, new MutableLong(fileStatus.getLen()));
- }
-
- fsOutput = fs.append(activeFilePath);
- LOG.debug("appending to {}", activeFilePath);
- }
- else {
- //We never saw this file before and we don't want to append
- //If the file is rolling we need to delete all its parts.
- if(rollingFile) {
- int part = 0;
-
- while (true) {
- Path seenPartFilePath = new Path(filePath + Path.SEPARATOR + getPartFileName(filename, part));
- if (!fs.exists(seenPartFilePath)) {
- break;
- }
-
- fs.delete(seenPartFilePath, true);
- part = part + 1;
- }
-
- fsOutput = fs.create(activeFilePath, (short) replication);
- }
- else {
- //Not rolling is easy, just delete the file and create it again.
- fs.delete(activeFilePath, true);
- if(alwaysWriteToTmp){
- //we need to delete original file if that exists
- if(fs.exists(originalFilePath)){
- fs.delete(originalFilePath, true);
- }
- }
- fsOutput = fs.create(activeFilePath, (short) replication);
- }
- }
- }
- else {
- fsOutput = fs.create(activeFilePath, (short) replication);
- fs.setPermission(activeFilePath, FsPermission.createImmutable(filePermission));
- }
-
- LOG.info("opened {}, active {}", partFileName, activeFilePath);
- return new FSFilterStreamContext(fsOutput);
- }
- catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
- };
+ //building cache
+ RemovalListener<String, FSFilterStreamContext> removalListener = createCacheRemoveListener();
+ CacheLoader<String, FSFilterStreamContext> loader = createCacheLoader();
+ streamsCache = CacheBuilder.newBuilder().maximumSize(maxOpenFiles).expireAfterAccess(expireStreamAfterAcessMillis,
+ TimeUnit.MILLISECONDS).removalListener(removalListener).build(loader);
- streamsCache = CacheBuilder.newBuilder().maximumSize(maxOpenFiles).expireAfterAccess(expireStreamAfterAcessMillis, TimeUnit.MILLISECONDS).removalListener(removalListener).build(loader);
+ LOG.debug("File system class: {}", fs.getClass());
+ LOG.debug("end-offsets {}", endOffsets);
try {
- LOG.debug("File system class: {}", fs.getClass());
- LOG.debug("end-offsets {}", endOffsets);
-
//Restore the files in case they were corrupted and the operator was re-deployed.
Path writerPath = new Path(filePath);
if (fs.exists(writerPath)) {
@@ -467,7 +361,7 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
byte[] buffer = new byte[COPY_BUFFER_SIZE];
String recoveryFileName = seenFileNamePart + '.' + System.currentTimeMillis() + TMP_EXTENSION;
Path recoveryFilePath = new Path(filePath + Path.SEPARATOR + recoveryFileName);
- FSDataOutputStream fsOutput = fs.create(recoveryFilePath, (short) replication);
+ FSDataOutputStream fsOutput = openStream(recoveryFilePath, false);
while (inputStream.getPos() < offset) {
long remainingBytes = offset - inputStream.getPos();
@@ -492,9 +386,9 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
fileContext.rename(recoveryFilePath, status.getPath(), Options.Rename.OVERWRITE);
}
} else {
- if (alwaysWriteToTmp) {
+ if (alwaysWriteToTmp && filesWithOpenStreams.contains(seenFileName)) {
String currentTmp = seenFileNamePart + '.' + System.currentTimeMillis() + TMP_EXTENSION;
- FSDataOutputStream outputStream = fs.create(new Path(filePath + Path.SEPARATOR + currentTmp));
+ FSDataOutputStream outputStream = openStream(new Path(filePath + Path.SEPARATOR + currentTmp), false);
IOUtils.copy(inputStream, outputStream);
outputStream.close();
fileNameToTmpName.put(seenFileNamePart, currentTmp);
@@ -506,8 +400,7 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
}
if (rollingFile) {
- //delete the left over future rolling files produced from the previous crashed instance
- //of this operator.
+ //delete the left over future rolling files produced from the previous crashed instance of this operator.
for(String seenFileName: endOffsets.keySet()) {
try {
Integer fileOpenPart = this.openPart.get(seenFileName).getValue();
@@ -550,10 +443,7 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
rotate(seenFileName);
}
}
- catch (IOException e) {
- throw new RuntimeException(e);
- }
- catch (ExecutionException e) {
+ catch (IOException | ExecutionException e) {
throw new RuntimeException(e);
}
}
@@ -561,17 +451,178 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
LOG.debug("setup completed");
LOG.debug("end-offsets {}", endOffsets);
- }
- catch (IOException e) {
+
+ } catch (IOException e) {
throw new RuntimeException(e);
}
this.context = context;
- fileCounters.setCounter(Counters.TOTAL_BYTES_WRITTEN,
- new MutableLong());
- fileCounters.setCounter(Counters.TOTAL_TIME_WRITING_MILLISECONDS,
- new MutableLong());
+ fileCounters.setCounter(Counters.TOTAL_BYTES_WRITTEN, new MutableLong());
+ fileCounters.setCounter(Counters.TOTAL_TIME_WRITING_MILLISECONDS, new MutableLong());
+ }
+
+ /**
+ * Creates the {@link CacheLoader} for loading an output stream when it is not present in the cache.
+ * @return cache loader
+ */
+ private CacheLoader<String, FSFilterStreamContext> createCacheLoader()
+ {
+ return new CacheLoader<String, FSFilterStreamContext>()
+ {
+ @Override
+ public FSFilterStreamContext load(@Nonnull String filename)
+ {
+ if (rollingFile) {
+ RotationState state = getRotationState(filename);
+ if (rollingFile && state.rotated) {
+ openPart.get(filename).add(1);
+ state.rotated = false;
+ MutableLong offset = endOffsets.get(filename);
+ offset.setValue(0);
+ }
+ }
+
+ String partFileName = getPartFileNamePri(filename);
+ Path originalFilePath = new Path(filePath + Path.SEPARATOR + partFileName);
+
+ Path activeFilePath;
+ if (!alwaysWriteToTmp) {
+ activeFilePath = originalFilePath;
+ } else {
+ //MLHR-1776 : writing to tmp file
+ String tmpFileName = fileNameToTmpName.get(partFileName);
+ if (tmpFileName == null) {
+ tmpFileName = partFileName + '.' + System.currentTimeMillis() + TMP_EXTENSION;
+ fileNameToTmpName.put(partFileName, tmpFileName);
+ }
+ activeFilePath = new Path(filePath + Path.SEPARATOR + tmpFileName);
+ }
+
+ FSDataOutputStream fsOutput;
+
+ boolean sawThisFileBefore = endOffsets.containsKey(filename);
+
+ try {
+ if (fs.exists(originalFilePath) || (alwaysWriteToTmp && fs.exists(activeFilePath))) {
+ if (sawThisFileBefore) {
+ FileStatus fileStatus = fs.getFileStatus(activeFilePath);
+ MutableLong endOffset = endOffsets.get(filename);
+
+ if (endOffset != null) {
+ endOffset.setValue(fileStatus.getLen());
+ } else {
+ endOffsets.put(filename, new MutableLong(fileStatus.getLen()));
+ }
+
+ fsOutput = openStream(activeFilePath, true);
+ LOG.debug("appending to {}", activeFilePath);
+ } else {
+ //We never saw this file before and we don't want to append
+ //If the file is rolling we need to delete all its parts.
+ if (rollingFile) {
+ int part = 0;
+
+ while (true) {
+ Path seenPartFilePath = new Path(filePath + Path.SEPARATOR + getPartFileName(filename, part));
+ if (!fs.exists(seenPartFilePath)) {
+ break;
+ }
+
+ fs.delete(seenPartFilePath, true);
+ part = part + 1;
+ }
+
+ fsOutput = openStream(activeFilePath, false);
+ } else {
+ //Not rolling is easy, just delete the file and create it again.
+ fs.delete(activeFilePath, true);
+ if (alwaysWriteToTmp) {
+ //we need to delete original file if that exists
+ if (fs.exists(originalFilePath)) {
+ fs.delete(originalFilePath, true);
+ }
+ }
+ fsOutput = openStream(activeFilePath, false);
+ }
+ }
+ } else {
+ fsOutput = openStream(activeFilePath, false);
+ }
+ filesWithOpenStreams.add(filename);
+
+ LOG.info("opened {}, active {}", partFileName, activeFilePath);
+ return new FSFilterStreamContext(fsOutput);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ }
+
+ /**
+ * Creates the removal listener which is attached to the cache.
+ *
+ * @return cache entry removal listener.
+ */
+ private RemovalListener<String, FSFilterStreamContext> createCacheRemoveListener()
+ {
+ //When an entry is removed from the cache, removal listener is notified and it closes the output stream.
+ return new RemovalListener<String, FSFilterStreamContext>()
+ {
+ @Override
+ public void onRemoval(@Nonnull RemovalNotification<String, FSFilterStreamContext> notification)
+ {
+ FSFilterStreamContext streamContext = notification.getValue();
+ if (streamContext != null) {
+ try {
+ String filename = notification.getKey();
+ String partFileName = getPartFileNamePri(filename);
+
+ LOG.debug("closing {}", partFileName);
+ long start = System.currentTimeMillis();
+
+ closeStream(streamContext);
+ filesWithOpenStreams.remove(filename);
+
+ totalWritingTime += System.currentTimeMillis() - start;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ };
+ }
+
+ /**
+ * Opens the stream for the specified file path in either append mode or create mode.
+ *
+ * @param filepath this is the path of either the actual file or the corresponding temporary file.
+ * @param append true for opening the file in append mode; false otherwise.
+ * @return output stream.
+ * @throws IOException
+ */
+ protected FSDataOutputStream openStream(Path filepath, boolean append) throws IOException
+ {
+ FSDataOutputStream fsOutput;
+ if (append) {
+ fsOutput = fs.append(filepath);
+ } else {
+ fsOutput = fs.create(filepath, (short)replication);
+ fs.setPermission(filepath, FsPermission.createImmutable(filePermission));
+ }
+ return fsOutput;
+ }
+
+ /**
+ * Closes the stream which has been removed from the cache.
+ *
+ * @param streamContext stream context which is removed from the cache.
+ * @throws IOException
+ */
+ protected void closeStream(FSFilterStreamContext streamContext) throws IOException
+ {
+ streamContext.close();
}
/**
@@ -592,12 +643,12 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
MutableInt part = finalizedPart.get(fileName);
if (part == null) {
- part = new MutableInt();
+ part = new MutableInt(-1);
finalizedPart.put(fileName, part);
}
MutableInt currentOpenPart = openPart.get(fileName);
- for (int x = part.getValue() + 1; x < currentOpenPart.getValue(); x++) {
+ for (int x = part.getValue() + 1; x <= currentOpenPart.getValue(); x++) {
String prevPartNotFinalized = getPartFileName(fileName, x);
LOG.debug("request finalize {}", prevPartNotFinalized);
filesPerWindow.add(prevPartNotFinalized);
@@ -619,12 +670,11 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
//Close all the streams you can
Map<String, FSFilterStreamContext> openStreams = streamsCache.asMap();
for(String seenFileName: openStreams.keySet()) {
- //FilterOutputStream filterStream = openStreams.get(seenFileName).getFilterStream();
FSFilterStreamContext fsFilterStreamContext = openStreams.get(seenFileName);
try {
long start = System.currentTimeMillis();
- //filterStream.close();
- fsFilterStreamContext.close();
+ closeStream(fsFilterStreamContext);
+ filesWithOpenStreams.remove(seenFileName);
totalWritingTime += System.currentTimeMillis() - start;
}
catch (IOException ex) {
@@ -727,11 +777,7 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
}
count.add(1);
- }
- catch (IOException ex) {
- throw new RuntimeException(ex);
- }
- catch (ExecutionException ex) {
+ } catch (IOException | ExecutionException ex) {
throw new RuntimeException(ex);
}
}
@@ -743,24 +789,19 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
* @throws IOException
* @throws ExecutionException
*/
- protected void rotate(String fileName) throws IllegalArgumentException,
- IOException,
- ExecutionException
+ protected void rotate(String fileName) throws IllegalArgumentException, IOException, ExecutionException
{
requestFinalize(fileName);
counts.remove(fileName);
streamsCache.invalidate(fileName);
MutableInt mi = openPart.get(fileName);
- int rotatedFileIndex = mi.getValue();
- mi.add(1);
- LOG.debug("Part file index: {}", openPart);
- endOffsets.get(fileName).setValue(0L);
- String partFileName = getPartFileName(fileName, rotatedFileIndex);
+ LOG.debug("Part file rotated {} : {}", fileName, mi.getValue());
+
+ //TODO: remove this as rotateHook is deprecated.
+ String partFileName = getPartFileName(fileName, mi.getValue());
rotateHook(partFileName);
- if (rotationWindows > 0) {
- getRotationState(fileName).rotated = true;
- }
+ getRotationState(fileName).rotated = true;
}
private RotationState getRotationState(String fileName)
@@ -778,6 +819,7 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
* the name of the file part that has just completed closed.
* @param finishedFile The name of the file part that has just completed and closed.
*/
+ @Deprecated
protected void rotateHook(@SuppressWarnings("unused") String finishedFile)
{
//Do nothing by default
@@ -887,15 +929,10 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
if (rotate) {
try {
rotate(filename);
- } catch (IOException e) {
- throw new RuntimeException(e);
- } catch (ExecutionException e) {
+ } catch (IOException | ExecutionException e) {
throw new RuntimeException(e);
}
}
- if (rotationState != null) {
- rotationState.rotated = false;
- }
}
}
}
@@ -1047,7 +1084,7 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
TOTAL_TIME_WRITING_MILLISECONDS
}
- private class FSFilterStreamContext implements FilterStreamContext<FilterOutputStream>
+ protected class FSFilterStreamContext implements FilterStreamContext<FilterOutputStream>
{
private FSDataOutputStream outputStream;