You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/08/27 04:46:08 UTC
[2/4] apex-malhar git commit: APEXMALHAR-2063 Made window data
manager use file system wal
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/1b4536ca/library/src/main/java/org/apache/apex/malhar/lib/wal/FSWindowDataManager.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/wal/FSWindowDataManager.java b/library/src/main/java/org/apache/apex/malhar/lib/wal/FSWindowDataManager.java
index 3de6a1b..81f6aa0 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/wal/FSWindowDataManager.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/wal/FSWindowDataManager.java
@@ -18,324 +18,687 @@
*/
package org.apache.apex.malhar.lib.wal;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collection;
+import java.util.HashMap;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
+import java.util.NavigableSet;
import java.util.Set;
import java.util.SortedSet;
+import java.util.TreeSet;
import javax.validation.constraints.NotNull;
-import org.apache.hadoop.conf.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.state.managed.IncrementalCheckpointManager;
+import org.apache.apex.malhar.lib.utils.FileContextUtils;
+import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import com.google.common.collect.TreeMultimap;
+import com.google.common.primitives.Longs;
import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.api.annotation.Stateless;
-import com.datatorrent.common.util.FSStorageAgent;
+import com.datatorrent.lib.io.fs.AbstractFileInputOperator;
+import com.datatorrent.lib.util.KryoCloneUtils;
+import com.datatorrent.netlet.util.Slice;
/**
- * An {@link WindowDataManager} that uses FS to persist state.
+ * An {@link WindowDataManager} that uses FS to persist state every completed application window.<p/>
+ *
+ * FSWindowDataManager uses {@link FSWindowReplayWAL} to write to files. While saving an artifact corresponding
+ * to a window, the window date manager saves:
+ * <ol>
+ * <li>Window id</li>
+ * <li>Artifact</li>
+ * </ol>
+ * In order to ensure that all the entries corresponding to a window id are appended to the same wal part file, the
+ * wal operates in batch mode. In batch mode, the rotation of a wal part is done only after a batch is complete.<br/>
+ * <p/>
+ *
+ * <b>Replaying data of a completed window</b><br/>
+ * Main support that {@link WindowDataManager} provides to input operators is to be able to replay windows which
+ * were completely processed but not checkpointed. This is necessary for making input operators idempotent.<br/>
+ * The {@link FileSystemWAL}, however, ignores any data which is not checkpointed after failure. Therefore,
+ * {@link FSWindowDataManager} cannot rely solely on the state in wal after failures and so during recovery it modifies
+ * the wal state by traversing through the wal files.<br/>
+ * <br/>
+ * {@link IncrementalCheckpointManager}, however, relies only on the checkpointed state and therefore sets
+ * {@link #relyOnCheckpoints} to true. This is because {@link IncrementalCheckpointManager} only saves data per
+ * checkpoint window.
+ * <p/>
+ *
+ * <b>Purging of stale artifacts</b><br/>
+ * When a window gets committed, it indicates that all the operators in the DAG have completely finished processing that
+ * window. This means that the data of this window can be deleted as it will never be requested for replaying.
+ * Operators can invoke {@link #committed(long)} callback of {@link FSWindowDataManager} to trigger deletion of stale
+ * artifacts.<br/>
+ * <p/>
+ *
+ * <b>Dynamic partitioning support provided</b><br/>
+ * An operator can call {@link #partition(int, Set)} to get new instances of {@link FSWindowDataManager} during
+ * re-partitioning. When operator partitions are removed, then one of the new instances will handle the state of
+ * all deleted instances.<br/>
+ * After re-partitioning, the largest completed window is the min of max completed windows across all partitions.</br>
+ *
+ * <p/>
+ * At times, after re-partitioning, a physical operator may want to read the data saved by all the partitions for a
+ * completed window id. For example, {@link AbstractFileInputOperator}, needs to redistribute files based on the hash
+ * of file-paths and its partition keys, so it reads artifacts saved by all partitions during replay of a completed
+ * window. {@link #retrieveAllPartitions(long)} retrieves the artifacts of all partitions wrt a completed window.
+ *
*
* @since 3.4.0
*/
public class FSWindowDataManager implements WindowDataManager
{
- private static final String DEF_RECOVERY_PATH = "idempotentState";
-
- protected transient FSStorageAgent storageAgent;
+ private static final String DEF_STATE_PATH = "idempotentState";
+ private static final String WAL_FILE_NAME = "wal";
/**
- * Recovery path relative to app path where state is saved.
+ * State path relative to app filePath where state is saved.
*/
@NotNull
- private String recoveryPath;
-
- private boolean isRecoveryPathRelativeToAppPath = true;
+ private String statePath = DEF_STATE_PATH;
- /**
- * largest window for which there is recovery data across all physical operator instances.
- */
- protected transient long largestRecoveryWindow;
+ private boolean isStatePathRelativeToAppPath = true;
/**
* This is not null only for one physical instance.<br/>
* It consists of operator ids which have been deleted but have some state that can be replayed.
- * Only one of the instances would be handling (modifying) the files that belong to this state.
+ * Only one of the instances would be handling (modifying) the files that belong to this state. <br/>
+ * The value is assigned during partitioning.
*/
- protected Set<Integer> deletedOperators;
+ private Set<Integer> deletedOperators;
+
+ private boolean repartitioned;
/**
- * Sorted mapping from window id to all the operators that have state to replay for that window.
+ * Used when it is not necessary to replay every streaming/app window.
+ * Used by {@link IncrementalCheckpointManager}
*/
- protected final transient TreeMultimap<Long, Integer> replayState;
+ private boolean relyOnCheckpoints;
+
+ private transient long largestCompletedWindow = Stateless.WINDOW_ID;
+
+ private final FSWindowReplayWAL wal = new FSWindowReplayWAL();
+
+ //operator id -> wals (sorted)
+ private final transient Map<Integer, FSWindowReplayWAL> readOnlyWals = new HashMap<>();
+
+ private transient String fullStatePath;
+ private transient int operatorId;
+
+ private final transient Kryo kryo = new Kryo();
- protected transient FileSystem fs;
- protected transient Path appPath;
+ private transient FileContext fileContext;
public FSWindowDataManager()
{
- replayState = TreeMultimap.create();
- largestRecoveryWindow = Stateless.WINDOW_ID;
- recoveryPath = DEF_RECOVERY_PATH;
+ kryo.setClassLoader(Thread.currentThread().getContextClassLoader());
}
@Override
public void setup(Context.OperatorContext context)
{
- Configuration configuration = new Configuration();
- if (isRecoveryPathRelativeToAppPath) {
- appPath = new Path(context.getValue(DAG.APPLICATION_PATH) + Path.SEPARATOR + recoveryPath);
+ operatorId = context.getId();
+
+ if (isStatePathRelativeToAppPath) {
+ fullStatePath = context.getValue(DAG.APPLICATION_PATH) + Path.SEPARATOR + statePath;
} else {
- appPath = new Path(recoveryPath);
+ fullStatePath = statePath;
}
try {
- storageAgent = new FSStorageAgent(appPath.toString(), configuration);
-
- fs = FileSystem.newInstance(appPath.toUri(), configuration);
-
- if (fs.exists(appPath)) {
- FileStatus[] fileStatuses = fs.listStatus(appPath);
-
- for (FileStatus operatorDirStatus : fileStatuses) {
- int operatorId = Integer.parseInt(operatorDirStatus.getPath().getName());
-
- for (FileStatus status : fs.listStatus(operatorDirStatus.getPath())) {
- String fileName = status.getPath().getName();
- if (fileName.endsWith(FSStorageAgent.TMP_FILE)) {
- continue;
- }
- long windowId = Long.parseLong(fileName, 16);
- replayState.put(windowId, operatorId);
- if (windowId > largestRecoveryWindow) {
- largestRecoveryWindow = windowId;
- }
+ fileContext = FileContextUtils.getFileContext(fullStatePath);
+ setupWals(context.getValue(Context.OperatorContext.ACTIVATION_WINDOW_ID));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void setupWals(long activationWindow) throws IOException
+ {
+ findFiles(wal, operatorId);
+ configureWal(wal, operatorId, !relyOnCheckpoints);
+
+ if (repartitioned) {
+ createReadOnlyWals();
+ for (Map.Entry<Integer, FSWindowReplayWAL> entry : readOnlyWals.entrySet()) {
+ findFiles(entry.getValue(), entry.getKey());
+ configureWal(entry.getValue(), entry.getKey(), true);
+ }
+ }
+
+ //find largest completed window
+ if (!relyOnCheckpoints) {
+ long completedWindow = findLargestCompletedWindow(wal, null);
+ //committed will not delete temp files so it is possible that when reading from files, a smaller window
+ //than the activation window is found.
+ if (completedWindow > activationWindow) {
+ largestCompletedWindow = completedWindow;
+ }
+ if (wal.getReader().getCurrentPointer() != null) {
+ wal.getWriter().setCurrentPointer(wal.getReader().getCurrentPointer().getCopy());
+ }
+ } else {
+ wal.walEndPointerAfterRecovery = wal.getWriter().getCurrentPointer();
+ largestCompletedWindow = wal.getLastCheckpointedWindow();
+ }
+
+ if (repartitioned && largestCompletedWindow > Stateless.WINDOW_ID) {
+ //find the min of max window ids: a downstream will not finish a window until all the upstream have finished it.
+ for (Map.Entry<Integer, FSWindowReplayWAL> entry : readOnlyWals.entrySet()) {
+
+ long completedWindow = Stateless.WINDOW_ID;
+ if (!relyOnCheckpoints) {
+ long window = findLargestCompletedWindow(entry.getValue(), null);
+ if (window > activationWindow) {
+ completedWindow = window;
}
+ } else {
+ completedWindow = findLargestCompletedWindow(entry.getValue(), activationWindow);
+ }
+
+ if (completedWindow < largestCompletedWindow) {
+ largestCompletedWindow = completedWindow;
}
}
- } catch (IOException e) {
- throw new RuntimeException(e);
}
+
+ //reset readers
+ wal.getReader().seek(wal.walStartPointer);
+ for (FSWindowReplayWAL wal : readOnlyWals.values()) {
+ wal.getReader().seek(wal.walStartPointer);
+ }
+
+ wal.setup();
+ for (FSWindowReplayWAL wal : readOnlyWals.values()) {
+ wal.setup();
+ }
+
}
- @Override
- public void save(Object object, int operatorId, long windowId) throws IOException
+ protected void createReadOnlyWals() throws IOException
{
- storageAgent.save(object, operatorId, windowId);
+ RemoteIterator<FileStatus> operatorsIter = fileContext.listStatus(new Path(fullStatePath));
+ while (operatorsIter.hasNext()) {
+ FileStatus status = operatorsIter.next();
+ int operatorId = Integer.parseInt(status.getPath().getName());
+
+ if (operatorId != this.operatorId) {
+ //create read-only wal for other partitions
+ FSWindowReplayWAL wal = new FSWindowReplayWAL(true);
+ readOnlyWals.put(operatorId, wal);
+ }
+ }
}
- @Override
- public Object load(int operatorId, long windowId) throws IOException
+ private void configureWal(FSWindowReplayWAL wal, int operatorId, boolean updateWalState) throws IOException
{
- Set<Integer> operators = replayState.get(windowId);
- if (operators == null || !operators.contains(operatorId)) {
- return null;
+ String operatorDir = fullStatePath + Path.SEPARATOR + operatorId;
+ wal.setFilePath(operatorDir + Path.SEPARATOR + WAL_FILE_NAME);
+ wal.fileContext = fileContext;
+
+ if (updateWalState) {
+ if (!wal.fileDescriptors.isEmpty()) {
+ SortedSet<Integer> sortedParts = wal.fileDescriptors.keySet();
+
+ wal.walStartPointer = new FileSystemWAL.FileSystemWALPointer(sortedParts.first(), 0);
+
+ FSWindowReplayWAL.FileDescriptor last = wal.fileDescriptors.get(sortedParts.last()).last();
+ if (last.isTmp) {
+ wal.tempPartFiles.put(last.part, last.filePath.toString());
+ }
+ }
}
- return storageAgent.load(operatorId, windowId);
}
- @Override
- public void delete(int operatorId, long windowId) throws IOException
+ private void findFiles(FSWindowReplayWAL wal, int operatorId) throws IOException
{
- storageAgent.delete(operatorId, windowId);
+ String operatorDir = fullStatePath + Path.SEPARATOR + operatorId;
+ Path operatorPath = new Path(operatorDir);
+ if (fileContext.util().exists(operatorPath)) {
+ RemoteIterator<FileStatus> walFilesIter = fileContext.listStatus(operatorPath);
+
+ while (walFilesIter.hasNext()) {
+ FileStatus fileStatus = walFilesIter.next();
+ FSWindowReplayWAL.FileDescriptor descriptor = FSWindowReplayWAL.FileDescriptor.create(fileStatus.getPath());
+ wal.fileDescriptors.put(descriptor.part, descriptor);
+ }
+ }
}
- @Override
- public Map<Integer, Object> load(long windowId) throws IOException
+ private long findLargestCompletedWindow(FSWindowReplayWAL wal, Long ceilingWindow) throws IOException
+ {
+ if (!wal.fileDescriptors.isEmpty()) {
+ FileSystemWAL.FileSystemWALReader reader = wal.getReader();
+
+ //to find the largest window, we only need to look at the last file.
+ NavigableSet<Integer> descendingParts = new TreeSet<>(wal.fileDescriptors.keySet()).descendingSet();
+ for (int part : descendingParts) {
+ FSWindowReplayWAL.FileDescriptor last = wal.fileDescriptors.get(part).last();
+ reader.seek(new FileSystemWAL.FileSystemWALPointer(last.part, 0));
+
+ long endOffset = -1;
+
+ long lastWindow = Stateless.WINDOW_ID;
+ Slice slice = readNext(reader);
+
+ while (slice != null) {
+ boolean skipComplete = skipNext(reader); //skip the artifact because we need just the largest window id.
+ if (!skipComplete) {
+ //artifact not saved so this window was not finished.
+ break;
+ }
+ long offset = reader.getCurrentPointer().getOffset();
+
+ long window = Longs.fromByteArray(slice.toByteArray());
+ if (ceilingWindow != null && window > ceilingWindow) {
+ break;
+ }
+ endOffset = offset;
+ lastWindow = window;
+ slice = readNext(reader); //either null or next window
+ }
+
+ if (endOffset != -1) {
+ wal.walEndPointerAfterRecovery = new FileSystemWAL.FileSystemWALPointer(last.part, endOffset);
+ wal.windowWalParts.put(lastWindow, wal.walEndPointerAfterRecovery.getPartNum());
+ return lastWindow;
+ }
+ }
+ }
+ return Stateless.WINDOW_ID;
+ }
+
+ /**
+ * Helper method that catches IOException while reading from wal to check if an entry was saved completely or not.
+ * @param reader wal reader
+ * @return wal entry
+ */
+ protected Slice readNext(FileSystemWAL.FileSystemWALReader reader)
{
- Set<Integer> operators = replayState.get(windowId);
- if (operators == null) {
+ try {
+ return reader.next();
+ } catch (IOException ex) {
+ //exception while reading wal entry which can be because there may have been failure while persisting an
+ //artifact so this window is not a finished window.
+ try {
+ reader.close();
+ } catch (IOException ioe) {
+ //closing the reader quietly.
+ }
return null;
}
- Map<Integer, Object> data = Maps.newHashMap();
- for (int operatorId : operators) {
- data.put(operatorId, load(operatorId, windowId));
+ }
+
+ /**
+ * Helper method that catches IOException while skipping an entry from wal to check if an entry was saved
+ * completely or not.
+ * @param reader wal reader
+ * @return true if skip was successful; false otherwise.
+ */
+ private boolean skipNext(FileSystemWAL.FileSystemWALReader reader)
+ {
+ try {
+ reader.skipNext();
+ return true;
+ } catch (IOException ex) {
+ //exception while skipping wal entry which can be because there may have been failure while persisting an
+ //artifact so this window is not a finished window.
+ try {
+ reader.close();
+ } catch (IOException e) {
+ //closing the reader quietly
+ }
+ return false;
}
- return data;
}
- @Override
- public long[] getWindowIds(int operatorId) throws IOException
+ private void closeReaders() throws IOException
{
- Path operatorPath = new Path(appPath, String.valueOf(operatorId));
- if (!fs.exists(operatorPath) || fs.listStatus(operatorPath).length == 0) {
- return null;
+ //close all reader stream and remove read-only wals
+ wal.getReader().close();
+ if (readOnlyWals.size() > 0) {
+ Iterator<Map.Entry<Integer, FSWindowReplayWAL>> walIterator = readOnlyWals.entrySet().iterator();
+ while (walIterator.hasNext()) {
+ Map.Entry<Integer, FSWindowReplayWAL> entry = walIterator.next();
+ entry.getValue().getReader().close();
+
+ int operatorId = entry.getKey();
+ if (deletedOperators == null || !deletedOperators.contains(operatorId)) {
+ //the read only wal can be removed.
+ walIterator.remove();
+ }
+ }
}
- return storageAgent.getWindowIds(operatorId);
+ }
+
+ /**
+ * Save writes 2 entries to the wal: <br/>
+ * <ol>
+ * <li>window id</li>
+ * <li>artifact</li>
+ * </ol>
+ * Note: The wal is being used in batch mode so the part file will never be rotated between the 2 entries.<br/>
+ * The wal part file may be rotated after both the entries, when
+ * {@link FileSystemWAL.FileSystemWALWriter#rotateIfNecessary()} is triggered.
+ *
+ * @param object state
+ * @param windowId window id
+ * @throws IOException
+ */
+ @Override
+ public void save(Object object, long windowId) throws IOException
+ {
+ closeReaders();
+ FileSystemWAL.FileSystemWALWriter writer = wal.getWriter();
+
+ byte[] windowIdBytes = Longs.toByteArray(windowId);
+ writer.append(new Slice(windowIdBytes));
+ writer.append(toSlice(object));
+ wal.beforeCheckpoint(windowId);
+ wal.windowWalParts.put(windowId, writer.getCurrentPointer().getPartNum());
+ writer.rotateIfNecessary();
}
+ /**
+ * The implementation assumes that artifacts are retrieved in increasing order of window ids. Typically it is used
+ * to replay tuples of successive windows in input operators after failure.
+ * @param windowId window id
+ * @return saved state for the window id.
+ * @throws IOException
+ */
@Override
- public long[] getWindowIds() throws IOException
+ public Object retrieve(long windowId) throws IOException
{
- SortedSet<Long> windowIds = replayState.keySet();
- long[] windowIdsArray = new long[windowIds.size()];
+ return retrieve(wal, windowId);
+ }
- int index = 0;
+ @Override
+ public Map<Integer, Object> retrieveAllPartitions(long windowId) throws IOException
+ {
+ if (windowId > largestCompletedWindow) {
+ return null;
+ }
+ Map<Integer, Object> artifacts = Maps.newHashMap();
+ Object artifact = retrieve(wal, windowId);
+ if (artifact != null) {
+ artifacts.put(operatorId, artifact);
+ }
+ if (repartitioned) {
+ for (Map.Entry<Integer, FSWindowReplayWAL> entry : readOnlyWals.entrySet()) {
+ artifact = retrieve(entry.getValue(), windowId);
+ if (artifact != null) {
+ artifacts.put(entry.getKey(), artifact);
+ }
+ }
+ }
+ return artifacts;
+ }
- for (Long windowId: windowIds) {
- windowIdsArray[index] = windowId;
- index++;
+ private Object retrieve(FSWindowReplayWAL wal, long windowId) throws IOException
+ {
+ if (windowId > largestCompletedWindow || wal.walEndPointerAfterRecovery == null) {
+ return null;
}
- return windowIdsArray;
- }
+ FileSystemWAL.FileSystemWALReader reader = wal.getReader();
+
+ while (reader.getCurrentPointer() == null ||
+ reader.getCurrentPointer().compareTo(wal.walEndPointerAfterRecovery) < 0) {
+ long currentWindow;
+ if (wal.retrievedWindow == null) {
+ wal.retrievedWindow = readNext(reader);
+ Preconditions.checkNotNull(wal.retrievedWindow);
+ }
+ currentWindow = Longs.fromByteArray(wal.retrievedWindow.toByteArray());
+
+ if (windowId == currentWindow) {
+ Slice data = readNext(reader);
+ Preconditions.checkNotNull(data, "data is null");
+
+ wal.windowWalParts.put(currentWindow, reader.getCurrentPointer().getPartNum());
+ wal.retrievedWindow = readNext(reader); //null or next window
+
+ return fromSlice(data);
+ } else if (windowId < currentWindow) {
+ //no artifact saved corresponding to that window and artifact is not read.
+ return null;
+ } else {
+ //windowId > current window so we skip the data
+ skipNext(reader);
+ wal.windowWalParts.put(currentWindow, reader.getCurrentPointer().getPartNum());
+
+ wal.retrievedWindow = readNext(reader); //null or next window
+ if (wal.retrievedWindow == null) {
+ //nothing else to read
+ return null;
+ }
+ }
+ }
+ return null;
+ }
+
/**
- * This deletes all the recovery files of window ids <= windowId.
+ * Deletes artifacts for all windows less than equal to committed window id.<p/>
+ *
+ * {@link FSWindowDataManager} uses {@link FSWindowReplayWAL} to record data which writes to temp part files.
+ * The temp part files are finalized only when they are rotated. So when a window is committed, artifacts for
+ * windows <= committed window may still be in temporary files. These temporary files are needed for Wal recovery so
+ * we do not alter them and we delete a part file completely (opposed to partial deletion) for efficiency.<br/>
+ * Therefore, data of a window gets deleted only when it satisfies all the following criteria:
+ * <ul>
+ * <li>window <= committed window id</li>
+ * <li>the part file of the artifact is rotated.</li>
+ * <li>the part file doesn't contain artifacts for windows greater than the artifact's window to avoid partial
+ * file deletion.</li>
+ * </ul>
*
- * @param operatorId operator id.
- * @param windowId the largest window id for which the states will be deleted.
+ * In addition to this we also delete:
+ * <ol>
+ * <li>Some stray temporary files are also deleted which correspond to completely deleted parts.</li>
+ * <li>Once the committed window > largest recovery window, we delete the files of partitions that were removed.</li>
+ * </ol>
+ *
+ * @param committedWindowId window id
* @throws IOException
*/
@Override
- public void deleteUpTo(int operatorId, long windowId) throws IOException
+ public void committed(long committedWindowId) throws IOException
{
- //deleting the replay state
- if (windowId <= largestRecoveryWindow && deletedOperators != null && !deletedOperators.isEmpty()) {
- Iterator<Map.Entry<Long, Collection<Integer>>> iterator = replayState.asMap().entrySet().iterator();
- while (iterator.hasNext()) {
- Map.Entry<Long, Collection<Integer>> windowEntry = iterator.next();
- long lwindow = windowEntry.getKey();
- if (lwindow > windowId) {
- break;
- }
- for (Integer loperator : windowEntry.getValue()) {
-
- if (deletedOperators.contains(loperator)) {
- storageAgent.delete(loperator, lwindow);
-
- Path loperatorPath = new Path(appPath, Integer.toString(loperator));
- if (fs.listStatus(loperatorPath).length == 0) {
- //The operator was deleted and it has nothing to replay.
- deletedOperators.remove(loperator);
- fs.delete(loperatorPath, true);
- }
- } else if (loperator == operatorId) {
- storageAgent.delete(loperator, lwindow);
+ closeReaders();
+ //find the largest window <= committed window id and the part file corresponding to it is finalized.
+ Map.Entry<Long, Integer> largestEntryForDeletion = null;
+
+ Iterator<Map.Entry<Long, Integer>> iterator = wal.windowWalParts.entrySet().iterator();
+
+ while (iterator.hasNext()) {
+ Map.Entry<Long, Integer> entry = iterator.next();
+ //only completely finalized part files are deleted.
+ if (entry.getKey() <= committedWindowId && !wal.tempPartFiles.containsKey(entry.getValue())) {
+ largestEntryForDeletion = entry;
+ iterator.remove();
+ }
+ if (entry.getKey() > committedWindowId) {
+ break;
+ }
+ }
+
+ if (largestEntryForDeletion != null && !wal.windowWalParts.containsValue(
+ largestEntryForDeletion.getValue()) /* no artifacts for higher window present*/) {
+
+ int highestPartToDelete = largestEntryForDeletion.getValue();
+ wal.getWriter().delete(new FileSystemWAL.FileSystemWALPointer(highestPartToDelete + 1, 0));
+
+ //also delete any old stray temp files that correspond to parts < deleteTillPointer.partNum
+ Iterator<Map.Entry<Integer, FSWindowReplayWAL.FileDescriptor>> fileIterator =
+ wal.fileDescriptors.entries().iterator();
+ while (fileIterator.hasNext()) {
+ Map.Entry<Integer, FSWindowReplayWAL.FileDescriptor> entry = fileIterator.next();
+ if (entry.getKey() <= highestPartToDelete && entry.getValue().isTmp) {
+ if (fileContext.util().exists(entry.getValue().filePath)) {
+ fileContext.delete(entry.getValue().filePath, true);
}
+ } else if (entry.getKey() > highestPartToDelete) {
+ break;
}
- iterator.remove();
}
}
- if (fs.listStatus(new Path(appPath, Integer.toString(operatorId))).length > 0) {
- long[] windowsAfterReplay = storageAgent.getWindowIds(operatorId);
- Arrays.sort(windowsAfterReplay);
- for (long lwindow : windowsAfterReplay) {
- if (lwindow <= windowId) {
- storageAgent.delete(operatorId, lwindow);
+ //delete data of partitions that have been removed
+ if (deletedOperators != null) {
+ Iterator<Integer> operatorIter = deletedOperators.iterator();
+
+ while (operatorIter.hasNext()) {
+ int deletedOperatorId = operatorIter.next();
+ FSWindowReplayWAL wal = readOnlyWals.get(deletedOperatorId);
+ if (committedWindowId > largestCompletedWindow) {
+ Path operatorDir = new Path(fullStatePath + Path.SEPARATOR + deletedOperatorId);
+
+ if (fileContext.util().exists(operatorDir)) {
+ fileContext.delete(operatorDir, true);
+ }
+ wal.teardown();
+ operatorIter.remove();
+ readOnlyWals.remove(deletedOperatorId);
}
}
+
+ if (deletedOperators.isEmpty()) {
+ deletedOperators = null;
+ }
}
}
- @Override
- public long getLargestRecoveryWindow()
+ private Slice toSlice(Object object)
+ {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ Output output = new Output(baos);
+ kryo.writeClassAndObject(output, object);
+ output.close();
+ byte[] bytes = baos.toByteArray();
+
+ return new Slice(bytes);
+ }
+
+ protected Object fromSlice(Slice slice)
+ {
+ Input input = new Input(slice.buffer, slice.offset, slice.length);
+ Object object = kryo.readClassAndObject(input);
+ input.close();
+ return object;
+ }
+
+ public long getLargestCompletedWindow()
{
- return largestRecoveryWindow;
+ return largestCompletedWindow;
}
@Override
- public void partitioned(Collection<WindowDataManager> newManagers, Set<Integer> removedOperatorIds)
+ public List<WindowDataManager> partition(int newCount, Set<Integer> removedOperatorIds)
{
- Preconditions.checkArgument(newManagers != null && !newManagers.isEmpty(),
- "there has to be one idempotent storage manager");
- org.apache.apex.malhar.lib.wal.FSWindowDataManager deletedOperatorsManager = null;
+ repartitioned = true;
+ KryoCloneUtils<FSWindowDataManager> cloneUtils = KryoCloneUtils.createCloneUtils(this);
+
+ FSWindowDataManager[] windowDataManagers = cloneUtils.getClones(newCount);
if (removedOperatorIds != null && !removedOperatorIds.isEmpty()) {
- if (this.deletedOperators == null) {
- this.deletedOperators = Sets.newHashSet();
- }
- this.deletedOperators.addAll(removedOperatorIds);
+ windowDataManagers[0].deletedOperators = removedOperatorIds;
}
- for (WindowDataManager storageManager : newManagers) {
-
- org.apache.apex.malhar.lib.wal.FSWindowDataManager lmanager = (org.apache.apex.malhar.lib.wal.FSWindowDataManager)storageManager;
- lmanager.recoveryPath = this.recoveryPath;
- lmanager.storageAgent = this.storageAgent;
+ List<WindowDataManager> mangers = new ArrayList<>();
+ mangers.addAll(Arrays.asList(windowDataManagers));
+ return mangers;
+ }
- if (lmanager.deletedOperators != null) {
- deletedOperatorsManager = lmanager;
- }
- //only one physical instance can manage deleted operators so clearing this field for rest of the instances.
- if (lmanager != deletedOperatorsManager) {
- lmanager.deletedOperators = null;
- }
+ @Override
+ public void teardown()
+ {
+ wal.teardown();
+ for (FSWindowReplayWAL wal : readOnlyWals.values()) {
+ wal.teardown();
}
+ }
- if (removedOperatorIds == null || removedOperatorIds.isEmpty()) {
- //Nothing to do
- return;
- }
- if (this.deletedOperators != null) {
-
- /*If some operators were removed then there needs to be a manager which can clean there state when it is not
- needed.*/
- if (deletedOperatorsManager == null) {
- //None of the managers were handling deleted operators data.
- deletedOperatorsManager = (org.apache.apex.malhar.lib.wal.FSWindowDataManager)newManagers.iterator().next();
- deletedOperatorsManager.deletedOperators = Sets.newHashSet();
- }
+ protected void setRelyOnCheckpoints(boolean relyOnCheckpoints)
+ {
+ this.relyOnCheckpoints = relyOnCheckpoints;
+ }
- deletedOperatorsManager.deletedOperators.addAll(removedOperatorIds);
- }
+ /**
+ * @return wal instance
+ */
+ protected FSWindowReplayWAL getWal()
+ {
+ return wal;
}
- @Override
- public void teardown()
+ @VisibleForTesting
+ public Set<Integer> getDeletedOperators()
{
- try {
- fs.close();
- } catch (IOException e) {
- throw new RuntimeException(e);
+ if (deletedOperators == null) {
+ return null;
}
+ return ImmutableSet.copyOf(deletedOperators);
}
/**
- * @return recovery path
+ * @return recovery filePath
*/
- public String getRecoveryPath()
+ public String getStatePath()
{
- return recoveryPath;
+ return statePath;
}
/**
- * Sets the recovery path. If {@link #isRecoveryPathRelativeToAppPath} is true then this path is handled relative
- * to the application path; otherwise it is handled as an absolute path.
+ * Sets the state path. If {@link #isStatePathRelativeToAppPath} is true then this filePath is handled
+ * relative
+ * to the application filePath; otherwise it is handled as an absolute filePath.
*
- * @param recoveryPath recovery path
+ * @param statePath recovery filePath
*/
- public void setRecoveryPath(String recoveryPath)
+ public void setStatePath(String statePath)
{
- this.recoveryPath = recoveryPath;
+ this.statePath = statePath;
}
/**
- * @return true if recovery path is relative to app path; false otherwise.
+ * @return true if state path is relative to app path; false otherwise.
*/
- public boolean isRecoveryPathRelativeToAppPath()
+ public boolean isStatePathRelativeToAppPath()
{
- return isRecoveryPathRelativeToAppPath;
+ return isStatePathRelativeToAppPath;
}
/**
- * Specifies whether the recovery path is relative to application path.
+ * Specifies whether the state path is relative to application filePath.
*
- * @param recoveryPathRelativeToAppPath true if recovery path is relative to application path; false otherwise.
+ * @param statePathRelativeToAppPath true if state path is relative to application path; false
+ * otherwise.
*/
- public void setRecoveryPathRelativeToAppPath(boolean recoveryPathRelativeToAppPath)
+ public void setStatePathRelativeToAppPath(boolean statePathRelativeToAppPath)
{
- isRecoveryPathRelativeToAppPath = recoveryPathRelativeToAppPath;
+ isStatePathRelativeToAppPath = statePathRelativeToAppPath;
}
+
+ private static Logger LOG = LoggerFactory.getLogger(FSWindowDataManager.class);
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/1b4536ca/library/src/main/java/org/apache/apex/malhar/lib/wal/FSWindowReplayWAL.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/wal/FSWindowReplayWAL.java b/library/src/main/java/org/apache/apex/malhar/lib/wal/FSWindowReplayWAL.java
new file mode 100644
index 0000000..326c7a3
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/wal/FSWindowReplayWAL.java
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.wal;
+
+import java.io.IOException;
+import java.util.TreeMap;
+
+import javax.validation.constraints.NotNull;
+
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.collect.TreeMultimap;
+
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * A {@link FileSystemWAL} that WindowDataManager uses to save state of every window.
+ */
+public class FSWindowReplayWAL extends FileSystemWAL
+{
+ transient boolean readOnly;
+
+ transient TreeMultimap<Integer, FileDescriptor> fileDescriptors = TreeMultimap.create();
+
+ //all the readers will read to this point while replaying.
+ transient FileSystemWALPointer walEndPointerAfterRecovery;
+ transient Slice retrievedWindow;
+
+ transient TreeMap<Long, Integer> windowWalParts = new TreeMap<>();
+
+ FSWindowReplayWAL()
+ {
+ super();
+ setInBatchMode(true);
+ setFileSystemWALWriter(new WriterThatFinalizesImmediately(this));
+ }
+
+ FSWindowReplayWAL(boolean readOnly)
+ {
+ this();
+ this.readOnly = readOnly;
+ }
+
+ @Override
+ public void setup()
+ {
+ try {
+ if (getMaxLength() == 0) {
+ setMaxLength(fileContext.getDefaultFileSystem().getServerDefaults().getBlockSize());
+ }
+ if (!readOnly) {
+ getWriter().recover();
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("while setup");
+ }
+ }
+
+ public FileSystemWALPointer getWalEndPointerAfterRecovery()
+ {
+ return walEndPointerAfterRecovery;
+ }
+
+ /**
+ * Finalizes files just after rotation. Doesn't wait for the window to be committed.
+ */
+ static class WriterThatFinalizesImmediately extends FileSystemWAL.FileSystemWALWriter
+ {
+
+ private WriterThatFinalizesImmediately()
+ {
+ super();
+ }
+
+ protected WriterThatFinalizesImmediately(@NotNull FileSystemWAL fileSystemWal)
+ {
+ super(fileSystemWal);
+ }
+
+ /**
+ * Finalize the file immediately after rotation.
+ * @param partNum part number
+ * @throws IOException
+ */
+ @Override
+ protected void rotated(int partNum) throws IOException
+ {
+ finalize(partNum);
+ }
+
+ @Override
+ protected void recover() throws IOException
+ {
+ restoreActivePart();
+ }
+ }
+
+ static class FileDescriptor implements Comparable<FileDescriptor>
+ {
+ int part;
+ boolean isTmp;
+ long time;
+ Path filePath;
+
+ static FileDescriptor create(Path filePath)
+ {
+ FileDescriptor descriptor = new FileDescriptor();
+ descriptor.filePath = filePath;
+
+ String name = filePath.getName();
+ String[] parts = name.split("\\.");
+
+ String[] namePart = parts[0].split("_");
+ descriptor.part = Integer.parseInt(namePart[1]);
+
+ if (parts.length == 3) {
+ descriptor.isTmp = true;
+ descriptor.time = Long.parseLong(parts[1]);
+ }
+
+ return descriptor;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof FileDescriptor)) {
+ return false;
+ }
+
+ FileDescriptor that = (FileDescriptor)o;
+
+ if (part != that.part) {
+ return false;
+ }
+ if (isTmp != that.isTmp) {
+ return false;
+ }
+ return time == that.time;
+
+ }
+
+ @Override
+ public int hashCode()
+ {
+ int result = part;
+ result = 31 * result + (isTmp ? 1 : 0);
+ result = 31 * result + (int)(time ^ (time >>> 32));
+ return result;
+ }
+
+ @Override
+ public int compareTo(FileDescriptor o)
+ {
+ if (part < o.part) {
+ return -1;
+ } else if (part > o.part) {
+ return 1;
+ } else {
+ if (isTmp && !o.isTmp) {
+ return -1;
+ } else if (!isTmp && o.isTmp) {
+ return 1;
+ }
+ return Long.compare(time, o.time);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/1b4536ca/library/src/main/java/org/apache/apex/malhar/lib/wal/FileSystemWAL.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/wal/FileSystemWAL.java b/library/src/main/java/org/apache/apex/malhar/lib/wal/FileSystemWAL.java
index f454188..b7d5ba1 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/wal/FileSystemWAL.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/wal/FileSystemWAL.java
@@ -56,7 +56,7 @@ import com.datatorrent.netlet.util.Slice;
* <p/>
* Note:<br/>
* The FileSystem Writer and Reader operations should not alternate because intermingling these operations will cause
- * problems. Typically the WAL Reader will only used in recovery.<br/>
+ * problems. Typically the WAL Reader will only used in recovery or replay of finished windows.<br/>
*
* Also this implementation is thread unsafe- the filesystem wal writer and reader operations should be performed in
* operator's thread.
@@ -73,7 +73,7 @@ public class FileSystemWAL implements WAL<FileSystemWAL.FileSystemWALReader, Fil
@Min(0)
private long maxLength;
- private FileSystemWALPointer walStartPointer = new FileSystemWALPointer(0, 0);
+ FileSystemWALPointer walStartPointer = new FileSystemWALPointer(0, 0);
@NotNull
private FileSystemWAL.FileSystemWALReader fileSystemWALReader = new FileSystemWALReader(this);
@@ -81,21 +81,26 @@ public class FileSystemWAL implements WAL<FileSystemWAL.FileSystemWALReader, Fil
@NotNull
private FileSystemWAL.FileSystemWALWriter fileSystemWALWriter = new FileSystemWALWriter(this);
- //part => tmp file path;
- private final Map<Integer, String> tempPartFiles = new TreeMap<>();
+ //part => tmp file filePath;
+ final Map<Integer, String> tempPartFiles = new TreeMap<>();
private long lastCheckpointedWindow = Stateless.WINDOW_ID;
+ private boolean hardLimitOnMaxLength;
+
+ private boolean inBatchMode;
+
+ transient FileContext fileContext;
+
@Override
public void setup()
{
try {
- FileContext fileContext = FileContextUtils.getFileContext(filePath);
+ fileContext = FileContextUtils.getFileContext(filePath);
if (maxLength == 0) {
maxLength = fileContext.getDefaultFileSystem().getServerDefaults().getBlockSize();
}
- fileSystemWALWriter.open(fileContext);
- fileSystemWALReader.open(fileContext);
+ fileSystemWALWriter.recover();
} catch (IOException e) {
throw new RuntimeException("during setup", e);
}
@@ -158,6 +163,14 @@ public class FileSystemWAL implements WAL<FileSystemWAL.FileSystemWALReader, Fil
{
return filePath + "_" + partNumber;
}
+
+ /**
+ * @return the wal start pointer
+ */
+ public FileSystemWALPointer getWalStartPointer()
+ {
+ return walStartPointer;
+ }
@Override
public FileSystemWALReader getReader()
@@ -227,6 +240,47 @@ public class FileSystemWAL implements WAL<FileSystemWAL.FileSystemWALReader, Fil
this.maxLength = maxLength;
}
+ /**
+ * @return true if there is a hard limit on max length; false otherwise.
+ */
+ public boolean isHardLimitOnMaxLength()
+ {
+ return hardLimitOnMaxLength;
+ }
+
+ /**
+ * When hard limit on max length is true, then a wal part file will never exceed the the max length.<br/>
+ * Entry is appended to the next part if adding it to the current part exceeds the max length.
+ * <p/>
+ * When hard limit on max length if false, then a wal part file can exceed the max length if the last entry makes the
+ * wal part exceeds the max length. By default this is set to false.
+ *
+ * @param hardLimitOnMaxLength
+ */
+ public void setHardLimitOnMaxLength(boolean hardLimitOnMaxLength)
+ {
+ this.hardLimitOnMaxLength = hardLimitOnMaxLength;
+ }
+
+ /**
+ * @return true if writing in batch mode; false otherwise.
+ */
+ protected boolean isInBatchMode()
+ {
+ return inBatchMode;
+ }
+
+ /**
+ * When in batch mode, a file is rotated only when a batch gets completed. This facilitates writing multiple entries
+ * that will all be written to the same part file.
+ *
+ * @param inBatchMode write in batch mode or not.
+ */
+ protected void setInBatchMode(boolean inBatchMode)
+ {
+ this.inBatchMode = inBatchMode;
+ }
+
public static class FileSystemWALPointer implements Comparable<FileSystemWALPointer>
{
private final int partNum;
@@ -261,6 +315,11 @@ public class FileSystemWAL implements WAL<FileSystemWAL.FileSystemWALReader, Fil
return offset;
}
+ public FileSystemWALPointer getCopy()
+ {
+ return new FileSystemWALPointer(partNum, offset);
+ }
+
@Override
public String toString()
{
@@ -273,16 +332,15 @@ public class FileSystemWAL implements WAL<FileSystemWAL.FileSystemWALReader, Fil
*/
public static class FileSystemWALReader implements WAL.WALReader<FileSystemWALPointer>
{
- private FileSystemWALPointer currentPointer;
+ private transient FileSystemWALPointer currentPointer;
private transient DataInputStream inputStream;
private transient Path currentOpenPath;
private transient boolean isOpenPathTmp;
private final FileSystemWAL fileSystemWAL;
- private transient FileContext fileContext;
- private FileSystemWALReader()
+ protected FileSystemWALReader()
{
//for kryo
fileSystemWAL = null;
@@ -291,16 +349,10 @@ public class FileSystemWAL implements WAL<FileSystemWAL.FileSystemWALReader, Fil
public FileSystemWALReader(@NotNull FileSystemWAL fileSystemWal)
{
this.fileSystemWAL = Preconditions.checkNotNull(fileSystemWal, "wal");
- currentPointer = new FileSystemWALPointer(fileSystemWal.walStartPointer.partNum,
- fileSystemWal.walStartPointer.offset);
}
- protected void open(@NotNull FileContext fileContext) throws IOException
- {
- this.fileContext = Preconditions.checkNotNull(fileContext, "fileContext");
- }
-
- protected void close() throws IOException
+ @Override
+ public void close() throws IOException
{
if (inputStream != null) {
inputStream.close();
@@ -350,9 +402,9 @@ public class FileSystemWAL implements WAL<FileSystemWAL.FileSystemWALReader, Fil
isOpenPathTmp = false;
}
- LOG.debug("path to read {} and pointer {}", pathToReadFrom, walPointer);
- if (fileContext.util().exists(pathToReadFrom)) {
- DataInputStream stream = fileContext.open(pathToReadFrom);
+ LOG.debug("filePath to read {} and pointer {}", pathToReadFrom, walPointer);
+ if (fileSystemWAL.fileContext.util().exists(pathToReadFrom)) {
+ DataInputStream stream = fileSystemWAL.fileContext.open(pathToReadFrom);
if (walPointer.offset > 0) {
stream.skip(walPointer.offset);
}
@@ -365,6 +417,20 @@ public class FileSystemWAL implements WAL<FileSystemWAL.FileSystemWALReader, Fil
@Override
public Slice next() throws IOException
{
+ return readOrSkip(false);
+ }
+
+ @Override
+ public void skipNext() throws IOException
+ {
+ readOrSkip(true);
+ }
+
+ private Slice readOrSkip(boolean skip) throws IOException
+ {
+ if (currentPointer == null) {
+ currentPointer = fileSystemWAL.walStartPointer;
+ }
do {
if (inputStream == null) {
inputStream = getInputStream(currentPointer);
@@ -376,21 +442,36 @@ public class FileSystemWAL implements WAL<FileSystemWAL.FileSystemWALReader, Fil
inputStream = getInputStream(currentPointer);
}
- if (inputStream != null && currentPointer.offset < fileContext.getFileStatus(currentOpenPath).getLen()) {
+ if (inputStream != null && currentPointer.offset <
+ fileSystemWAL.fileContext.getFileStatus(currentOpenPath).getLen()) {
int len = inputStream.readInt();
Preconditions.checkState(len >= 0, "negative length");
- byte[] data = new byte[len];
- inputStream.readFully(data);
+ if (!skip) {
+ byte[] data = new byte[len];
+ inputStream.readFully(data);
+ currentPointer.offset += len + 4;
+ return new Slice(data);
- currentPointer.offset += data.length + 4;
- return new Slice(data);
+ } else {
+ long actualSkipped = inputStream.skip(len);
+ if (actualSkipped != len) {
+ throw new IOException("unable to skip " + len);
+ }
+ currentPointer.offset += len + 4;
+ return null;
+ }
}
} while (nextSegment());
close();
return null;
}
+ public FileSystemWALPointer getCurrentPointer()
+ {
+ return currentPointer;
+ }
+
@Override
public FileSystemWALPointer getStartPointer()
{
@@ -410,52 +491,52 @@ public class FileSystemWAL implements WAL<FileSystemWAL.FileSystemWALReader, Fil
private final Map<Long, Integer> pendingFinalization = new TreeMap<>();
private final FileSystemWAL fileSystemWAL;
- private transient FileContext fileContext;
private int latestFinalizedPart = -1;
private int lowestDeletedPart = -1;
- private FileSystemWALWriter()
+ protected FileSystemWALWriter()
{
//for kryo
fileSystemWAL = null;
}
- public FileSystemWALWriter(@NotNull FileSystemWAL fileSystemWal)
+ protected FileSystemWALWriter(@NotNull FileSystemWAL fileSystemWal)
{
this.fileSystemWAL = Preconditions.checkNotNull(fileSystemWal, "wal");
}
- protected void open(@NotNull FileContext fileContext) throws IOException
+ protected void recover() throws IOException
{
- this.fileContext = Preconditions.checkNotNull(fileContext, "file context");
- recover();
+ restoreActivePart();
+ deleteStrayTmpFiles();
}
- private void recover() throws IOException
+ void restoreActivePart() throws IOException
{
- LOG.debug("current point", currentPointer);
+ LOG.debug("restore part {}", currentPointer);
String tmpFilePath = fileSystemWAL.tempPartFiles.get(currentPointer.getPartNum());
if (tmpFilePath != null) {
- Path tmpPath = new Path(tmpFilePath);
- if (fileContext.util().exists(tmpPath)) {
- LOG.debug("tmp path exists {}", tmpPath);
+ Path inputPath = new Path(tmpFilePath);
+ if (fileSystemWAL.fileContext.util().exists(inputPath)) {
+ LOG.debug("input path exists {}", inputPath);
+ //temp file output stream
outputStream = getOutputStream(new FileSystemWALPointer(currentPointer.partNum, 0));
- DataInputStream inputStreamOldTmp = fileContext.open(tmpPath);
+ DataInputStream inputStream = fileSystemWAL.fileContext.open(inputPath);
- IOUtils.copyPartial(inputStreamOldTmp, currentPointer.offset, outputStream);
+ IOUtils.copyPartial(inputStream, currentPointer.offset, outputStream);
- outputStream.flush();
- //remove old tmp
- inputStreamOldTmp.close();
- LOG.debug("delete tmp {}", tmpPath);
- fileContext.delete(tmpPath, true);
+ flush();
+ inputStream.close();
}
}
+ }
- //find all valid path names
+ void deleteStrayTmpFiles() throws IOException
+ {
+ //find all valid filePath names
Set<String> validPathNames = new HashSet<>();
for (Map.Entry<Integer, String> entry : fileSystemWAL.tempPartFiles.entrySet()) {
if (entry.getKey() <= currentPointer.partNum) {
@@ -468,22 +549,22 @@ public class FileSystemWAL implements WAL<FileSystemWAL.FileSystemWALReader, Fil
//which aren't accounted by tmp files map
Path walPath = new Path(fileSystemWAL.filePath);
Path parentWAL = walPath.getParent();
- if (parentWAL != null && fileContext.util().exists(parentWAL)) {
- RemoteIterator<FileStatus> remoteIterator = fileContext.listStatus(parentWAL);
+ if (parentWAL != null && fileSystemWAL.fileContext.util().exists(parentWAL)) {
+ RemoteIterator<FileStatus> remoteIterator = fileSystemWAL.fileContext.listStatus(parentWAL);
while (remoteIterator.hasNext()) {
FileStatus status = remoteIterator.next();
String fileName = status.getPath().getName();
if (fileName.startsWith(walPath.getName()) && fileName.endsWith(TMP_EXTENSION) &&
!validPathNames.contains(fileName)) {
LOG.debug("delete stray tmp {}", status.getPath());
- fileContext.delete(status.getPath(), true);
+ fileSystemWAL.fileContext.delete(status.getPath(), true);
}
}
}
-
}
- protected void close() throws IOException
+ @Override
+ public void close() throws IOException
{
if (outputStream != null) {
outputStream.close();
@@ -499,23 +580,27 @@ public class FileSystemWAL implements WAL<FileSystemWAL.FileSystemWALReader, Fil
outputStream = getOutputStream(currentPointer);
}
- int entryLength = entry.length + 4;
+ int entrySize = entry.length + 4;
+
+ if (fileSystemWAL.hardLimitOnMaxLength) {
+ Preconditions.checkArgument(entrySize > fileSystemWAL.maxLength, "entry too big. increase the max length");
+ }
// rotate if needed
- if (shouldRotate(entryLength)) {
+ if (fileSystemWAL.hardLimitOnMaxLength && shouldRotate(entrySize) && !fileSystemWAL.inBatchMode) {
rotate(true);
}
outputStream.writeInt(entry.length);
outputStream.write(entry.buffer, entry.offset, entry.length);
- currentPointer.offset += entryLength;
+ currentPointer.offset += entrySize;
- if (currentPointer.offset == fileSystemWAL.maxLength) {
+ if (currentPointer.offset >= fileSystemWAL.maxLength && !fileSystemWAL.inBatchMode) {
//if the file is completed then we can rotate it. do not have to wait for next entry
rotate(false);
}
- return entryLength;
+ return entrySize;
}
@Override
@@ -543,9 +628,9 @@ public class FileSystemWAL implements WAL<FileSystemWAL.FileSystemWALReader, Fil
if (i <= latestFinalizedPart) {
//delete a part only if it is finalized.
Path partPath = new Path(fileSystemWAL.getPartFilePath(i));
- if (fileContext.util().exists(partPath)) {
+ if (fileSystemWAL.fileContext.util().exists(partPath)) {
LOG.debug("delete {}", partPath);
- fileContext.delete(partPath, true);
+ fileSystemWAL.fileContext.delete(partPath, true);
lastPartDeleted = i;
} else {
break;
@@ -560,18 +645,18 @@ public class FileSystemWAL implements WAL<FileSystemWAL.FileSystemWALReader, Fil
if (pointer.partNum <= latestFinalizedPart && pointer.offset > 0) {
String part = fileSystemWAL.getPartFilePath(pointer.partNum);
Path inputPartPath = new Path(part);
- long length = fileContext.getFileStatus(inputPartPath).getLen();
+ long length = fileSystemWAL.fileContext.getFileStatus(inputPartPath).getLen();
LOG.debug("truncate {} from {} length {}", part, pointer.offset, length);
if (length > pointer.offset) {
- String temp = getTmpFilePath(part);
+ String temp = createTmpFilePath(part);
Path tmpPart = new Path(temp);
- DataInputStream inputStream = fileContext.open(inputPartPath);
- DataOutputStream outputStream = fileContext.create(tmpPart, EnumSet.of(CreateFlag.CREATE, CreateFlag.APPEND),
- Options.CreateOpts.CreateParent.createParent());
+ DataInputStream inputStream = fileSystemWAL.fileContext.open(inputPartPath);
+ DataOutputStream outputStream = fileSystemWAL.fileContext.create(tmpPart,
+ EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE), Options.CreateOpts.CreateParent.createParent());
IOUtils.copyPartial(inputStream, pointer.offset, length - pointer.offset, outputStream);
inputStream.close();
@@ -582,7 +667,7 @@ public class FileSystemWAL implements WAL<FileSystemWAL.FileSystemWALReader, Fil
fileSystemWAL.walStartPointer.offset = 0;
}
- fileContext.rename(tmpPart, inputPartPath, Options.Rename.OVERWRITE);
+ fileSystemWAL.fileContext.rename(tmpPart, inputPartPath, Options.Rename.OVERWRITE);
}
}
}
@@ -590,8 +675,8 @@ public class FileSystemWAL implements WAL<FileSystemWAL.FileSystemWALReader, Fil
protected void flush() throws IOException
{
if (outputStream != null) {
- if (fileContext.getDefaultFileSystem() instanceof LocalFs ||
- fileContext.getDefaultFileSystem() instanceof RawLocalFs) {
+ if (fileSystemWAL.fileContext.getDefaultFileSystem() instanceof LocalFs ||
+ fileSystemWAL.fileContext.getDefaultFileSystem() instanceof RawLocalFs) {
//until the stream is closed on the local FS, readers don't see any data.
close();
} else {
@@ -607,19 +692,38 @@ public class FileSystemWAL implements WAL<FileSystemWAL.FileSystemWALReader, Fil
return currentPointer.offset + entryLength > fileSystemWAL.maxLength;
}
- protected void rotate(boolean openNextFile) throws IOException
+ void rotate(boolean openNextFile) throws IOException
{
flush();
close();
- //all parts up to current part can be finalized.
- pendingFinalization.put(fileSystemWAL.getLastCheckpointedWindow(), currentPointer.partNum);
- LOG.debug("rotate {} to {}", currentPointer.partNum, currentPointer.partNum + 1);
+ int partNum = currentPointer.partNum;
+ LOG.debug("rotate {} to {}", partNum, currentPointer.partNum + 1);
currentPointer = new FileSystemWALPointer(currentPointer.partNum + 1, 0);
if (openNextFile) {
//if adding the new entry to the file can cause the current file to exceed the max length then it is rotated.
outputStream = getOutputStream(currentPointer);
}
+
+ rotated(partNum);
+ }
+
+ /**
+ * When the wal is used in batch-mode, this method will trigger rotation if the current part file is completed.
+ * @throws IOException
+ */
+ protected void rotateIfNecessary() throws IOException
+ {
+ if (fileSystemWAL.inBatchMode && currentPointer.offset >= fileSystemWAL.maxLength) {
+ //if the file is completed then we can rotate it
+ rotate(false);
+ }
+ }
+
+ protected void rotated(int partNum) throws IOException
+ {
+ //all parts up to current part can be finalized.
+ pendingFinalization.put(fileSystemWAL.getLastCheckpointedWindow(), partNum);
}
protected void finalizeFiles(long window) throws IOException
@@ -640,14 +744,7 @@ public class FileSystemWAL implements WAL<FileSystemWAL.FileSystemWALReader, Fil
int partToFinalizeTill = entry.getValue();
for (int i = largestPartAvailable; i <= partToFinalizeTill; i++) {
- String tmpToFinalize = fileSystemWAL.tempPartFiles.remove(i);
- Path tmpPath = new Path(tmpToFinalize);
-
- if (fileContext.util().exists(tmpPath)) {
- LOG.debug("finalize {} of part {}", tmpToFinalize, i);
- fileContext.rename(tmpPath, new Path(fileSystemWAL.getPartFilePath(i)), Options.Rename.OVERWRITE);
- latestFinalizedPart = i;
- }
+ finalize(i);
}
largestPartAvailable = partToFinalizeTill + 1;
}
@@ -659,37 +756,60 @@ public class FileSystemWAL implements WAL<FileSystemWAL.FileSystemWALReader, Fil
}
}
+ protected void finalize(int partNum) throws IOException
+ {
+ String tmpToFinalize = fileSystemWAL.tempPartFiles.remove(partNum);
+ Path tmpPath = new Path(tmpToFinalize);
+ if (fileSystemWAL.fileContext.util().exists(tmpPath)) {
+ LOG.debug("finalize {} of part {}", tmpPath, partNum);
+ fileSystemWAL.fileContext.rename(tmpPath, new Path(fileSystemWAL.getPartFilePath(partNum)),
+ Options.Rename.OVERWRITE);
+ latestFinalizedPart = partNum;
+ }
+ }
+
private DataOutputStream getOutputStream(FileSystemWALPointer pointer) throws IOException
{
Preconditions.checkArgument(outputStream == null, "output stream is not null");
- if (pointer.offset > 0 && (fileContext.getDefaultFileSystem() instanceof LocalFs ||
- fileContext.getDefaultFileSystem() instanceof RawLocalFs)) {
- //on local file system the stream is closed instead of flush so we open it again in append mode if the
- //offset > 0.
- return fileContext.create(new Path(fileSystemWAL.tempPartFiles.get(pointer.partNum)),
+ if (pointer.offset > 0 && (fileSystemWAL.fileContext.getDefaultFileSystem() instanceof LocalFs ||
+ fileSystemWAL.fileContext.getDefaultFileSystem() instanceof RawLocalFs)) {
+ //On local file system the stream is always closed and never flushed so we open it again in append mode if the
+ //offset > 0. This block is entered only when appending to wal while writing on local fs.
+ return fileSystemWAL.fileContext.create(new Path(fileSystemWAL.tempPartFiles.get(pointer.partNum)),
EnumSet.of(CreateFlag.CREATE, CreateFlag.APPEND), Options.CreateOpts.CreateParent.createParent());
}
String partFile = fileSystemWAL.getPartFilePath(pointer.partNum);
- String tmpFilePath = getTmpFilePath(partFile);
+ String tmpFilePath = createTmpFilePath(partFile);
fileSystemWAL.tempPartFiles.put(pointer.partNum, tmpFilePath);
Preconditions.checkArgument(pointer.offset == 0, "offset > 0");
LOG.debug("open {} => {}", pointer.partNum, tmpFilePath);
- outputStream = fileContext.create(new Path(tmpFilePath),
+ outputStream = fileSystemWAL.fileContext.create(new Path(tmpFilePath),
EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE), Options.CreateOpts.CreateParent.createParent());
return outputStream;
}
+ //visible to WindowDataManager
+ FileSystemWALPointer getCurrentPointer()
+ {
+ return currentPointer;
+ }
+
+ //visible to WindowDataManager
+ void setCurrentPointer(@NotNull FileSystemWALPointer pointer)
+ {
+ this.currentPointer = Preconditions.checkNotNull(pointer, "pointer");
+ }
}
- private static String getTmpFilePath(String filePath)
+ private static String createTmpFilePath(String filePath)
{
return filePath + '.' + System.currentTimeMillis() + TMP_EXTENSION;
}
- private static final String TMP_EXTENSION = ".tmp";
+ static final String TMP_EXTENSION = ".tmp";
private static final Logger LOG = LoggerFactory.getLogger(FileSystemWAL.class);
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/1b4536ca/library/src/main/java/org/apache/apex/malhar/lib/wal/WAL.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/wal/WAL.java b/library/src/main/java/org/apache/apex/malhar/lib/wal/WAL.java
index 45432d5..da21a6d 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/wal/WAL.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/wal/WAL.java
@@ -51,7 +51,7 @@ public interface WAL<READER extends WAL.WALReader, WRITER extends WAL.WALWriter>
* Provides iterator like interface to read entries from the WAL.
* @param <P> type of Pointer in the WAL
*/
- interface WALReader<P>
+ interface WALReader<P> extends AutoCloseable
{
/**
* Seek to middle of the WAL. This is used primarily during recovery,
@@ -66,6 +66,11 @@ public interface WAL<READER extends WAL.WALReader, WRITER extends WAL.WALWriter>
Slice next() throws IOException;
/**
+ * Skips the next entry in the WAL.
+ */
+ void skipNext() throws IOException;
+
+ /**
* Returns the start pointer from which data is available to read.<br/>
* WAL Writer supports purging of aged data so the start pointer will change over time.
*
@@ -78,7 +83,7 @@ public interface WAL<READER extends WAL.WALReader, WRITER extends WAL.WALWriter>
* Provide method to write entries to the WAL.
* @param <P> type of Pointer in the WAL
*/
- interface WALWriter<P>
+ interface WALWriter<P> extends AutoCloseable
{
/**
* Write an entry to the WAL
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/1b4536ca/library/src/main/java/org/apache/apex/malhar/lib/wal/WindowDataManager.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/wal/WindowDataManager.java b/library/src/main/java/org/apache/apex/malhar/lib/wal/WindowDataManager.java
index a1917a6..2622a0d 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/wal/WindowDataManager.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/wal/WindowDataManager.java
@@ -19,37 +19,61 @@
package org.apache.apex.malhar.lib.wal;
import java.io.IOException;
-import java.util.Collection;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import com.datatorrent.api.Component;
import com.datatorrent.api.Context;
-import com.datatorrent.api.StorageAgent;
import com.datatorrent.api.annotation.Stateless;
import com.datatorrent.lib.io.fs.AbstractFileInputOperator;
/**
- * An idempotent storage manager allows an operator to emit the same tuples in every replayed application window.
- * An idempotent agent cannot make any guarantees about the tuples emitted in the application window which fails.
+ * WindowDataManager manages the state of an operator every application window. It can be used to replay tuples in
+ * the input operator after re-deployment for a window which was not check-pointed but processing was completed before
+ * failure.<br/>
+ *
+ * However, it cannot make any guarantees about the tuples emitted in the application window during which the operator
+ * failed.<br/>
*
* The order of tuples is guaranteed for ordered input sources.
*
- * <b>Important:</b> In order for an idempotent storage manager to function correctly it cannot allow
+ * <b>Important:</b> In order for an WindowDataManager to function correctly it cannot allow
* checkpoints to occur within an application window and checkpoints must be aligned with
* application window boundaries.
*
* @since 2.0.0
*/
-public interface WindowDataManager extends StorageAgent, Component<Context.OperatorContext>
+public interface WindowDataManager extends Component<Context.OperatorContext>
{
/**
- * Gets the largest window for which there is recovery data.
+ * Save the state for a window id.
+ * @param object state
+ * @param windowId window id
+ * @throws IOException
+ */
+ void save(Object object, long windowId) throws IOException;
+
+ /**
+ * Gets the object saved for the provided window id. <br/>
+ * Typically it is used to replay tuples of successive windows in input operators after failure.
+ *
+ * @param windowId window id
+ * @return saved state for the window id.
+ * @throws IOException
+ */
+ Object retrieve(long windowId) throws IOException;
+
+ /**
+ * Gets the largest window which was completed.
* @return Returns the window id
*/
- long getLargestRecoveryWindow();
+ long getLargestCompletedWindow();
/**
+ * Fetches the state saved for a window id for all the partitions.
+ * <p/>
* When an operator can partition itself dynamically then there is no guarantee that an input state which was being
* handled by one instance previously will be handled by the same instance after partitioning. <br/>
* For eg. An {@link AbstractFileInputOperator} instance which reads a File X till offset l (not check-pointed) may no
@@ -58,39 +82,27 @@ public interface WindowDataManager extends StorageAgent, Component<Context.Opera
* The new instance wouldn't know from what point to read the File X unless it reads the idempotent storage of all the
* operators for the window being replayed and fix it's state.
*
- * @param windowId window id.
- * @return mapping of operator id to the corresponding state
+ * @param windowId window id
+ * @return saved state per operator partitions for the given window.
* @throws IOException
*/
- Map<Integer, Object> load(long windowId) throws IOException;
+ Map<Integer, Object> retrieveAllPartitions(long windowId) throws IOException;
/**
- * Delete the artifacts of the operator for windows <= windowId.
+ * Delete the artifacts for windows <= windowId.
*
- * @param operatorId operator id
* @param windowId window id
* @throws IOException
*/
- void deleteUpTo(int operatorId, long windowId) throws IOException;
+ void committed(long windowId) throws IOException;
/**
- * This informs the idempotent storage manager that operator is partitioned so that it can set properties and
- * distribute state.
+ * Creates new window data managers during repartitioning.
*
- * @param newManagers all the new idempotent storage managers.
+ * @param newCount count of new window data managers.
* @param removedOperatorIds set of operator ids which were removed after partitioning.
*/
- void partitioned(Collection<WindowDataManager> newManagers, Set<Integer> removedOperatorIds);
-
- /**
- * Returns an array of windowIds for which data was stored by atleast one partition. The array
- * of winodwIds is sorted.
- *
- * @return An array of windowIds for which data was stored by atleast one partition. The array
- * of winodwIds is sorted.
- * @throws IOException
- */
- long[] getWindowIds() throws IOException;
+ List<WindowDataManager> partition(int newCount, Set<Integer> removedOperatorIds);
/**
* This {@link WindowDataManager} will never do recovery. This is a convenience class so that operators
@@ -98,21 +110,25 @@ public interface WindowDataManager extends StorageAgent, Component<Context.Opera
*/
class NoopWindowDataManager implements WindowDataManager
{
- @Override
- public long getLargestRecoveryWindow()
+ public long getLargestCompletedWindow()
{
return Stateless.WINDOW_ID;
}
@Override
- public Map<Integer, Object> load(long windowId) throws IOException
+ public Map<Integer, Object> retrieveAllPartitions(long windowId) throws IOException
{
return null;
}
@Override
- public void partitioned(Collection<WindowDataManager> newManagers, Set<Integer> removedOperatorIds)
+ public List<WindowDataManager> partition(int newCount, Set<Integer> removedOperatorIds)
{
+ List<WindowDataManager> managers = new ArrayList<>();
+ for (int i = 0; i < newCount; i++) {
+ managers.add(new NoopWindowDataManager());
+ }
+ return managers;
}
@Override
@@ -126,36 +142,19 @@ public interface WindowDataManager extends StorageAgent, Component<Context.Opera
}
@Override
- public void save(Object object, int operatorId, long windowId) throws IOException
+ public void save(Object object, long windowId) throws IOException
{
}
@Override
- public Object load(int operatorId, long windowId) throws IOException
+ public Object retrieve(long windowId) throws IOException
{
return null;
}
@Override
- public void delete(int operatorId, long windowId) throws IOException
- {
- }
-
- @Override
- public void deleteUpTo(int operatorId, long windowId) throws IOException
- {
- }
-
- @Override
- public long[] getWindowIds(int operatorId) throws IOException
- {
- return new long[0];
- }
-
- @Override
- public long[] getWindowIds() throws IOException
+ public void committed(long windowId) throws IOException
{
- return new long[0];
}
}
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/1b4536ca/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPojoPollableOpeartorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPojoPollableOpeartorTest.java b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPojoPollableOpeartorTest.java
index 2f3f356..0f9a7c9 100644
--- a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPojoPollableOpeartorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPojoPollableOpeartorTest.java
@@ -178,8 +178,8 @@ public class JdbcPojoPollableOpeartorTest extends JdbcOperatorTest
public void testRecovery() throws IOException
{
int operatorId = 1;
- when(windowDataManagerMock.getLargestRecoveryWindow()).thenReturn(1L);
- when(windowDataManagerMock.load(operatorId, 1)).thenReturn(new MutablePair<Integer, Integer>(0, 4));
+ when(windowDataManagerMock.getLargestCompletedWindow()).thenReturn(1L);
+ when(windowDataManagerMock.retrieve(1)).thenReturn(new MutablePair<Integer, Integer>(0, 4));
insertEvents(10, true, 0);
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/1b4536ca/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java
index 7990049..e1f23d1 100644
--- a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java
@@ -44,7 +44,6 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import com.esotericsoftware.kryo.Kryo;
-import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
@@ -617,7 +616,7 @@ public class AbstractFileInputOperatorTest
LineByLineFileInputOperator oper = new LineByLineFileInputOperator();
FSWindowDataManager manager = new FSWindowDataManager();
- manager.setRecoveryPath(testMeta.dir + "/recovery");
+ manager.setStatePath(testMeta.dir + "/recovery");
oper.setWindowDataManager(manager);
@@ -666,7 +665,7 @@ public class AbstractFileInputOperatorTest
LineByLineFileInputOperator oper = new LineByLineFileInputOperator();
FSWindowDataManager manager = new FSWindowDataManager();
- manager.setRecoveryPath(testMeta.dir + "/recovery");
+ manager.setStatePath(testMeta.dir + "/recovery");
oper.setWindowDataManager(manager);
@@ -709,7 +708,7 @@ public class AbstractFileInputOperatorTest
LineByLineFileInputOperator oper = new LineByLineFileInputOperator();
FSWindowDataManager manager = new FSWindowDataManager();
- manager.setRecoveryPath(testMeta.dir + "/recovery");
+ manager.setStatePath(testMeta.dir + "/recovery");
oper.setEmitBatchSize(5);
oper.setWindowDataManager(manager);
@@ -772,7 +771,7 @@ public class AbstractFileInputOperatorTest
LineByLineFileInputOperator oper = new LineByLineFileInputOperator();
FSWindowDataManager manager = new FSWindowDataManager();
- manager.setRecoveryPath(testMeta.dir + "/recovery");
+ manager.setStatePath(testMeta.dir + "/recovery");
oper.setWindowDataManager(manager);
@@ -817,12 +816,12 @@ public class AbstractFileInputOperatorTest
}
@Test
- public void testIdempotentStorageManagerPartitioning() throws Exception
+ public void testWindowDataManagerPartitioning() throws Exception
{
LineByLineFileInputOperator oper = new LineByLineFileInputOperator();
oper.getScanner().setFilePatternRegexp(".*partition([\\d]*)");
oper.setDirectory(new File(testMeta.dir).getAbsolutePath());
- oper.setWindowDataManager(new TestStorageManager());
+ oper.setWindowDataManager(new FSWindowDataManager());
oper.operatorId = 7;
Path path = new Path(new File(testMeta.dir).getAbsolutePath());
@@ -838,15 +837,15 @@ public class AbstractFileInputOperatorTest
Assert.assertEquals(2, newPartitions.size());
Assert.assertEquals(1, oper.getCurrentPartitions());
- List<TestStorageManager> storageManagers = Lists.newLinkedList();
+ List<FSWindowDataManager> storageManagers = Lists.newLinkedList();
for (Partition<AbstractFileInputOperator<String>> p : newPartitions) {
- storageManagers.add((TestStorageManager)p.getPartitionedInstance().getWindowDataManager());
+ storageManagers.add((FSWindowDataManager)p.getPartitionedInstance().getWindowDataManager());
}
Assert.assertEquals("count of storage managers", 2, storageManagers.size());
int countOfDeleteManagers = 0;
- TestStorageManager deleteManager = null;
- for (TestStorageManager storageManager : storageManagers) {
+ FSWindowDataManager deleteManager = null;
+ for (FSWindowDataManager storageManager : storageManagers) {
if (storageManager.getDeletedOperators() != null) {
countOfDeleteManagers++;
deleteManager = storageManager;
@@ -858,17 +857,6 @@ public class AbstractFileInputOperatorTest
Assert.assertEquals("deleted operators", Sets.newHashSet(7), deleteManager.getDeletedOperators());
}
- private static class TestStorageManager extends FSWindowDataManager
- {
- Set<Integer> getDeletedOperators()
- {
- if (deletedOperators != null) {
- return ImmutableSet.copyOf(deletedOperators);
- }
- return null;
- }
- }
-
/** scanner to extract partition id from start of the filename */
static class MyScanner extends AbstractFileInputOperator.DirectoryScanner
{