You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by da...@apache.org on 2015/08/14 20:01:23 UTC
[07/12] storm git commit: Support exactly once always;
added unit tests.
Support exactly once always; added unit tests.
1. Enable exaclty once irrespective of the rotation policy in use.
2. For TimedRotationPolicy set a flag and do the actual rotation in doCommit.
3. Added unit tests and updated README.md as per the new behavior.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/cdabbb3d
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/cdabbb3d
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/cdabbb3d
Branch: refs/heads/master
Commit: cdabbb3db273895a79939f40894b999167532125
Parents: df63f75
Author: Arun Mahadevan <ai...@hortonworks.com>
Authored: Mon Aug 10 13:09:53 2015 +0530
Committer: Arun Mahadevan <ai...@hortonworks.com>
Committed: Mon Aug 10 13:09:53 2015 +0530
----------------------------------------------------------------------
external/storm-hdfs/README.md | 9 +-
external/storm-hdfs/pom.xml | 11 +
.../apache/storm/hdfs/trident/HdfsState.java | 142 +++++--------
.../trident/rotation/TimedRotationPolicy.java | 27 ++-
.../storm/hdfs/trident/HdfsStateTest.java | 206 +++++++++++++++++++
5 files changed, 299 insertions(+), 96 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/cdabbb3d/external/storm-hdfs/README.md
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/README.md b/external/storm-hdfs/README.md
index bb4a618..98fcc55 100644
--- a/external/storm-hdfs/README.md
+++ b/external/storm-hdfs/README.md
@@ -319,12 +319,9 @@ that of the bolts.
### Note
Whenever a batch is replayed by storm (due to failures), the trident state implementation automatically removes
-duplicates from the current data file by copying the data up to the last transaction to another file . Since this
-operation involves a lot of data copy, the exactly once semantics is enabled only if `FileSizeRotationPolicy` with
-file size less than 1 GB is specified.
-
-The exactly once semantics is automatically disabled if `FileSizeRotationPolicy` with size greater than 1 GB or
-`TimedRotationPolicy` is in use.
+duplicates from the current data file by copying the data up to the last transaction to another file. Since this
+operation involves a lot of data copy, ensure that the data files are rotated at reasonable sizes with `FileSizeRotationPolicy`
+and at reasonable intervals with `TimedRotationPolicy` so that the recovery can complete within topology.message.timeout.secs.
##Working with Secure HDFS
If your topology is going to interact with secure HDFS, your bolts/states needs to be authenticated by NameNode. We
http://git-wip-us.apache.org/repos/asf/storm/blob/cdabbb3d/external/storm-hdfs/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/pom.xml b/external/storm-hdfs/pom.xml
index 7e927bf..15e6f72 100644
--- a/external/storm-hdfs/pom.xml
+++ b/external/storm-hdfs/pom.xml
@@ -68,6 +68,17 @@
</exclusion>
</exclusions>
</dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.11</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
<plugins>
http://git-wip-us.apache.org/repos/asf/storm/blob/cdabbb3d/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java
index 3dc566c..8b29f66 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java
@@ -54,8 +54,6 @@ import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
public class HdfsState implements State {
@@ -70,12 +68,7 @@ public class HdfsState implements State {
protected int rotation = 0;
protected transient Configuration hdfsConfig;
protected ArrayList<RotationAction> rotationActions = new ArrayList<RotationAction>();
- protected transient Object writeLock;
- protected transient Timer rotationTimer;
- /**
- * This is on by default unless TimedRotationPolicy is in use.
- */
- private boolean exactlyOnce = true;
+
abstract void closeOutputFile() throws IOException;
@@ -91,29 +84,21 @@ public class HdfsState implements State {
abstract void doRecover(Path srcPath, long nBytes) throws Exception;
- protected boolean isExactlyOnce() {
- return this.exactlyOnce;
- }
-
protected void rotateOutputFile(boolean doRotateAction) throws IOException {
LOG.info("Rotating output file...");
long start = System.currentTimeMillis();
- synchronized (this.writeLock) {
- closeOutputFile();
- this.rotation++;
- Path newFile = createOutputFile();
- if (doRotateAction) {
- LOG.info("Performing {} file rotation actions.", this.rotationActions.size());
- for (RotationAction action : this.rotationActions) {
- action.execute(this.fs, this.currentFile);
- }
+ closeOutputFile();
+ this.rotation++;
+ Path newFile = createOutputFile();
+ if (doRotateAction) {
+ LOG.info("Performing {} file rotation actions.", this.rotationActions.size());
+ for (RotationAction action : this.rotationActions) {
+ action.execute(this.fs, this.currentFile);
}
- this.currentFile = newFile;
}
+ this.currentFile = newFile;
long time = System.currentTimeMillis() - start;
LOG.info("File rotation took {} ms.", time);
-
-
}
protected void rotateOutputFile() throws IOException {
@@ -122,20 +107,17 @@ public class HdfsState implements State {
void prepare(Map conf, int partitionIndex, int numPartitions) {
- this.writeLock = new Object();
if (this.rotationPolicy == null) {
throw new IllegalStateException("RotationPolicy must be specified.");
} else if (this.rotationPolicy instanceof FileSizeRotationPolicy) {
- long limit = FileSizeRotationPolicy.Units.GB.getByteCount();
- if(((FileSizeRotationPolicy) rotationPolicy).getMaxBytes() > limit) {
- LOG.warn("*** Exactly once semantics is not supported for FileSizeRotationPolicy with size > 1 GB ***");
- LOG.warn("Turning off exactly once.");
- this.exactlyOnce = false;
- }
+ long rotationBytes = ((FileSizeRotationPolicy) rotationPolicy).getMaxBytes();
+ LOG.warn("FileSizeRotationPolicy specified with {} bytes.", rotationBytes);
+ LOG.warn("Recovery will fail if data files cannot be copied within topology.message.timeout.secs.");
+ LOG.warn("Ensure that the data files does not grow too big with the FileSizeRotationPolicy.");
} else if (this.rotationPolicy instanceof TimedRotationPolicy) {
- LOG.warn("*** Exactly once semantics is not supported for TimedRotationPolicy ***");
- LOG.warn("Turning off exactly once.");
- this.exactlyOnce = false;
+ LOG.warn("TimedRotationPolicy specified with interval {} ms.", ((TimedRotationPolicy) rotationPolicy).getInterval());
+ LOG.warn("Recovery will fail if data files cannot be copied within topology.message.timeout.secs.");
+ LOG.warn("Ensure that the data files does not grow too big with the TimedRotationPolicy.");
}
if (this.fsUrl == null) {
throw new IllegalStateException("File system URL must be specified.");
@@ -158,19 +140,7 @@ public class HdfsState implements State {
}
if (this.rotationPolicy instanceof TimedRotationPolicy) {
- long interval = ((TimedRotationPolicy) this.rotationPolicy).getInterval();
- this.rotationTimer = new Timer(true);
- TimerTask task = new TimerTask() {
- @Override
- public void run() {
- try {
- rotateOutputFile();
- } catch (IOException e) {
- LOG.warn("IOException during scheduled file rotation.", e);
- }
- }
- };
- this.rotationTimer.scheduleAtFixedRate(task, interval, interval);
+ ((TimedRotationPolicy) this.rotationPolicy).start();
}
}
@@ -181,13 +151,16 @@ public class HdfsState implements State {
private void recover(String srcFile, long nBytes) {
try {
Path srcPath = new Path(srcFile);
+ rotateOutputFile(false);
+ this.rotationPolicy.reset();
if (nBytes > 0) {
- rotateOutputFile(false);
- this.rotationPolicy.reset();
doRecover(srcPath, nBytes);
LOG.info("Recovered {} bytes from {} to {}", nBytes, srcFile, currentFile);
+ } else {
+ LOG.info("Nothing to recover from {}", srcFile);
}
fs.delete(srcPath, false);
+ LOG.info("Deleted file {} that had partial commits.", srcFile);
} catch (Exception e) {
LOG.warn("Recovery failed.", e);
throw new RuntimeException(e);
@@ -251,7 +224,7 @@ public class HdfsState implements State {
@Override
void doPrepare(Map conf, int partitionIndex, int numPartitions) throws IOException {
- LOG.info("Preparing HDFS Bolt...");
+ LOG.info("Preparing HDFS File state...");
this.fs = FileSystem.get(URI.create(this.fsUrl), hdfsConfig);
}
@@ -262,17 +235,15 @@ public class HdfsState implements State {
@Override
public void doCommit(Long txId) throws IOException {
- synchronized (writeLock) {
- if (this.rotationPolicy.mark(this.offset)) {
- rotateOutputFile();
- this.offset = 0;
- this.rotationPolicy.reset();
+ if (this.rotationPolicy.mark(this.offset)) {
+ rotateOutputFile();
+ this.offset = 0;
+ this.rotationPolicy.reset();
+ } else {
+ if (this.out instanceof HdfsDataOutputStream) {
+ ((HdfsDataOutputStream) this.out).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
} else {
- if (this.out instanceof HdfsDataOutputStream) {
- ((HdfsDataOutputStream) this.out).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
- } else {
- this.out.hsync();
- }
+ this.out.hsync();
}
}
}
@@ -308,12 +279,10 @@ public class HdfsState implements State {
@Override
public void execute(List<TridentTuple> tuples) throws IOException {
- synchronized (this.writeLock) {
- for (TridentTuple tuple : tuples) {
- byte[] bytes = this.format.format(tuple);
- out.write(bytes);
- this.offset += bytes.length;
- }
+ for (TridentTuple tuple : tuples) {
+ byte[] bytes = this.format.format(tuple);
+ out.write(bytes);
+ this.offset += bytes.length;
}
}
}
@@ -381,13 +350,11 @@ public class HdfsState implements State {
@Override
public void doCommit(Long txId) throws IOException {
- synchronized (writeLock) {
- if (this.rotationPolicy.mark(this.writer.getLength())) {
- rotateOutputFile();
- this.rotationPolicy.reset();
- } else {
- this.writer.hsync();
- }
+ if (this.rotationPolicy.mark(this.writer.getLength())) {
+ rotateOutputFile();
+ this.rotationPolicy.reset();
+ } else {
+ this.writer.hsync();
}
}
@@ -425,9 +392,7 @@ public class HdfsState implements State {
@Override
public void execute(List<TridentTuple> tuples) throws IOException {
for (TridentTuple tuple : tuples) {
- synchronized (this.writeLock) {
- this.writer.append(this.format.key(tuple), this.format.value(tuple));
- }
+ this.writer.append(this.format.key(tuple), this.format.value(tuple));
}
}
@@ -468,9 +433,7 @@ public class HdfsState implements State {
void prepare(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
this.options.prepare(conf, partitionIndex, numPartitions);
- if (options.isExactlyOnce()) {
- initLastTxn(conf, partitionIndex);
- }
+ initLastTxn(conf, partitionIndex);
}
private TxnRecord readTxnRecord(Path path) throws IOException {
@@ -558,15 +521,13 @@ public class HdfsState implements State {
@Override
public void beginCommit(Long txId) {
- if (options.isExactlyOnce()) {
- if (txId <= lastSeenTxn.txnid) {
- LOG.info("txID {} is already processed, lastSeenTxn {}. Triggering recovery.", txId, lastSeenTxn);
- long start = System.currentTimeMillis();
- options.recover(lastSeenTxn.dataFilePath, lastSeenTxn.offset);
- LOG.info("Recovery took {} ms.", System.currentTimeMillis() - start);
- }
- updateIndex(txId);
+ if (txId <= lastSeenTxn.txnid) {
+ LOG.info("txID {} is already processed, lastSeenTxn {}. Triggering recovery.", txId, lastSeenTxn);
+ long start = System.currentTimeMillis();
+ options.recover(lastSeenTxn.dataFilePath, lastSeenTxn.offset);
+ LOG.info("Recovery took {} ms.", System.currentTimeMillis() - start);
}
+ updateIndex(txId);
}
@Override
@@ -587,4 +548,11 @@ public class HdfsState implements State {
throw new FailedException(e);
}
}
+
+ /**
+ * for unit tests
+ */
+ void close() throws IOException {
+ this.options.closeOutputFile();
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/cdabbb3d/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/TimedRotationPolicy.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/TimedRotationPolicy.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/TimedRotationPolicy.java
index f407d1d..74e7fab 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/TimedRotationPolicy.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/TimedRotationPolicy.java
@@ -19,6 +19,10 @@ package org.apache.storm.hdfs.trident.rotation;
import storm.trident.tuple.TridentTuple;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.atomic.AtomicBoolean;
+
public class TimedRotationPolicy implements FileRotationPolicy {
@@ -41,6 +45,9 @@ public class TimedRotationPolicy implements FileRotationPolicy {
}
private long interval;
+ private Timer rotationTimer;
+ private AtomicBoolean rotationTimerTriggered = new AtomicBoolean();
+
public TimedRotationPolicy(float count, TimeUnit units){
this.interval = (long)(count * units.getMilliSeconds());
@@ -54,12 +61,12 @@ public class TimedRotationPolicy implements FileRotationPolicy {
*/
@Override
public boolean mark(TridentTuple tuple, long offset) {
- return false;
+ return rotationTimerTriggered.get();
}
@Override
public boolean mark(long offset) {
- return false;
+ return rotationTimerTriggered.get();
}
/**
@@ -67,10 +74,24 @@ public class TimedRotationPolicy implements FileRotationPolicy {
*/
@Override
public void reset() {
-
+ rotationTimerTriggered.set(false);
}
public long getInterval(){
return this.interval;
}
+
+ /**
+ * Start the timer to run at fixed intervals.
+ */
+ public void start() {
+ rotationTimer = new Timer(true);
+ TimerTask task = new TimerTask() {
+ @Override
+ public void run() {
+ rotationTimerTriggered.set(true);
+ }
+ };
+ rotationTimer.scheduleAtFixedRate(task, interval, interval);
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/cdabbb3d/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/trident/HdfsStateTest.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/trident/HdfsStateTest.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/trident/HdfsStateTest.java
new file mode 100644
index 0000000..e1f45df
--- /dev/null
+++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/trident/HdfsStateTest.java
@@ -0,0 +1,206 @@
+package org.apache.storm.hdfs.trident;
+
+import backtype.storm.Config;
+import backtype.storm.tuple.Fields;
+import org.apache.commons.io.FileUtils;
+import org.apache.storm.hdfs.trident.format.DelimitedRecordFormat;
+import org.apache.storm.hdfs.trident.format.FileNameFormat;
+import org.apache.storm.hdfs.trident.format.RecordFormat;
+import org.apache.storm.hdfs.trident.rotation.FileRotationPolicy;
+import org.apache.storm.hdfs.trident.rotation.FileSizeRotationPolicy;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import storm.trident.tuple.TridentTuple;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+public class HdfsStateTest {
+
+ private static final String TEST_OUT_DIR = Paths.get(System.getProperty("java.io.tmpdir"), "trident-unit-test").toString();
+
+ private static final String FILE_NAME_PREFIX = "hdfs-data-";
+ private static final String TEST_TOPOLOGY_NAME = "test-topology";
+ private static final String INDEX_FILE_PREFIX = ".index.";
+ private final TestFileNameFormat fileNameFormat = new TestFileNameFormat();
+
+ private static class TestFileNameFormat implements FileNameFormat {
+ private String currentFileName = "";
+
+ @Override
+ public void prepare(Map conf, int partitionIndex, int numPartitions) {
+
+ }
+
+ @Override
+ public String getName(long rotation, long timeStamp) {
+ currentFileName = FILE_NAME_PREFIX + Long.toString(rotation);
+ return currentFileName;
+ }
+
+ @Override
+ public String getPath() {
+ return TEST_OUT_DIR;
+ }
+
+ public String getCurrentFileName() {
+ return currentFileName;
+ }
+ }
+
+ private HdfsState createHdfsState() {
+
+ Fields hdfsFields = new Fields("f1");
+
+ RecordFormat recordFormat = new DelimitedRecordFormat().withFields(hdfsFields);
+
+ FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, FileSizeRotationPolicy.Units.MB);
+
+ HdfsState.Options options = new HdfsState.HdfsFileOptions()
+ .withFileNameFormat(fileNameFormat)
+ .withRecordFormat(recordFormat)
+ .withRotationPolicy(rotationPolicy)
+ .withFsUrl("file://" + TEST_OUT_DIR);
+
+ Map<String, String> conf = new HashMap<>();
+ conf.put(Config.TOPOLOGY_NAME, TEST_TOPOLOGY_NAME);
+
+ HdfsState state = new HdfsState(options);
+ state.prepare(conf, null, 0, 1);
+ return state;
+ }
+
+ private List<TridentTuple> createMockTridentTuples(int count) {
+ TridentTuple tuple = mock(TridentTuple.class);
+ when(tuple.getValueByField(any(String.class))).thenReturn("data");
+ List<TridentTuple> tuples = new ArrayList<>();
+ for (int i = 0; i < count; i++) {
+ tuples.add(tuple);
+ }
+ return tuples;
+ }
+
+ private List<String> getLinesFromCurrentDataFile() throws IOException {
+ Path dataFile = Paths.get(TEST_OUT_DIR, fileNameFormat.getCurrentFileName());
+ List<String> lines = Files.readAllLines(dataFile, Charset.defaultCharset());
+ return lines;
+ }
+
+ @Before
+ public void setUp() {
+ FileUtils.deleteQuietly(new File(TEST_OUT_DIR));
+ }
+
+
+ @Test
+ public void testPrepare() throws Exception {
+ HdfsState state = createHdfsState();
+ Collection<File> files = FileUtils.listFiles(new File(TEST_OUT_DIR), null, false);
+ File hdfsDataFile = Paths.get(TEST_OUT_DIR, FILE_NAME_PREFIX + "0").toFile();
+ Assert.assertTrue(files.contains(hdfsDataFile));
+ }
+
+ @Test
+ public void testIndexFileCreation() throws Exception {
+ HdfsState state = createHdfsState();
+ state.beginCommit(1L);
+ Collection<File> files = FileUtils.listFiles(new File(TEST_OUT_DIR), null, false);
+ File hdfsIndexFile = Paths.get(TEST_OUT_DIR, INDEX_FILE_PREFIX + TEST_TOPOLOGY_NAME + ".0").toFile();
+ Assert.assertTrue(files.contains(hdfsIndexFile));
+ }
+
+ @Test
+ public void testUpdateState() throws Exception {
+ HdfsState state = createHdfsState();
+ state.beginCommit(1L);
+ int tupleCount = 100;
+ state.updateState(createMockTridentTuples(tupleCount), null);
+ state.commit(1L);
+ state.close();
+ List<String> lines = getLinesFromCurrentDataFile();
+ List<String> expected = new ArrayList<>();
+ for (int i = 0; i < tupleCount; i++) {
+ expected.add("data");
+ }
+ Assert.assertEquals(tupleCount, lines.size());
+ Assert.assertEquals(expected, lines);
+ }
+
+ @Test
+ public void testRecoverOneBatch() throws Exception {
+ HdfsState state = createHdfsState();
+ // batch 1 is played with 25 tuples initially.
+ state.beginCommit(1L);
+ state.updateState(createMockTridentTuples(25), null);
+ // batch 1 is replayed with 50 tuples.
+ int replayBatchSize = 50;
+ state.beginCommit(1L);
+ state.updateState(createMockTridentTuples(replayBatchSize), null);
+ state.commit(1L);
+ // close the state to force flush
+ state.close();
+ // Ensure that the original batch1 is discarded and new one is persisted.
+ List<String> lines = getLinesFromCurrentDataFile();
+ Assert.assertEquals(replayBatchSize, lines.size());
+ List<String> expected = new ArrayList<>();
+ for (int i = 0; i < replayBatchSize; i++) {
+ expected.add("data");
+ }
+ Assert.assertEquals(expected, lines);
+ }
+
+ @Test
+ public void testRecoverMultipleBatches() throws Exception {
+ HdfsState state = createHdfsState();
+
+ // batch 1
+ int batch1Count = 10;
+ state.beginCommit(1L);
+ state.updateState(createMockTridentTuples(batch1Count), null);
+ state.commit(1L);
+
+ // batch 2
+ int batch2Count = 20;
+ state.beginCommit(2L);
+ state.updateState(createMockTridentTuples(batch2Count), null);
+ state.commit(2L);
+
+ // batch 3
+ int batch3Count = 30;
+ state.beginCommit(3L);
+ state.updateState(createMockTridentTuples(batch3Count), null);
+ state.commit(3L);
+
+ // batch 3 replayed with 40 tuples
+ int batch3ReplayCount = 40;
+ state.beginCommit(3L);
+ state.updateState(createMockTridentTuples(batch3ReplayCount), null);
+ state.commit(3L);
+ state.close();
+ /*
+ * total tuples should be
+ * recovered (batch-1 + batch-2) + replayed (batch-3)
+ */
+ List<String> lines = getLinesFromCurrentDataFile();
+ int preReplayCount = batch1Count + batch2Count + batch3Count;
+ int expectedTupleCount = batch1Count + batch2Count + batch3ReplayCount;
+
+ Assert.assertNotEquals(preReplayCount, lines.size());
+ Assert.assertEquals(expectedTupleCount, lines.size());
+ }
+}
\ No newline at end of file