You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2016/01/15 17:46:22 UTC
[21/24] storm git commit: fixing issues introduced due to jstorm
related refactoring
fixing issues introduced due to jstorm related refactoring
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/d17b3b9c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/d17b3b9c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/d17b3b9c
Branch: refs/heads/1.x-branch
Commit: d17b3b9c3cbc89d854bfb436d213d11cfd4545ec
Parents: 2c02bc9
Author: Roshan Naik <ro...@hortonworks.com>
Authored: Thu Jan 14 00:40:43 2016 -0800
Committer: Roshan Naik <ro...@hortonworks.com>
Committed: Thu Jan 14 11:34:57 2016 -0800
----------------------------------------------------------------------
.../jvm/storm/starter/HdfsSpoutTopology.java | 24 ++++++++-------
.../org/apache/storm/hdfs/spout/FileLock.java | 11 +++++--
.../org/apache/storm/hdfs/spout/FileReader.java | 1 -
.../org/apache/storm/hdfs/spout/HdfsSpout.java | 32 +++++++++++---------
.../storm/hdfs/spout/ProgressTracker.java | 10 +++---
.../apache/storm/hdfs/spout/TestHdfsSpout.java | 9 +++---
6 files changed, 48 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/d17b3b9c/examples/storm-starter/src/jvm/storm/starter/HdfsSpoutTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/HdfsSpoutTopology.java b/examples/storm-starter/src/jvm/storm/starter/HdfsSpoutTopology.java
index ca6b045..191886c 100644
--- a/examples/storm-starter/src/jvm/storm/starter/HdfsSpoutTopology.java
+++ b/examples/storm-starter/src/jvm/storm/starter/HdfsSpoutTopology.java
@@ -18,18 +18,20 @@
package storm.starter;
-import backtype.storm.Config;
-import backtype.storm.StormSubmitter;
-import backtype.storm.generated.Nimbus;
-import backtype.storm.topology.TopologyBuilder;
-import backtype.storm.utils.NimbusClient;
-import backtype.storm.utils.Utils;
+import org.apache.storm.Config;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.generated.Nimbus;
+import org.apache.storm.metric.LoggingMetricsConsumer;
+import org.apache.storm.starter.FastWordCountTopology;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.utils.NimbusClient;
+import org.apache.storm.utils.Utils;
import org.apache.storm.hdfs.spout.Configs;
import org.apache.storm.hdfs.spout.HdfsSpout;
-import backtype.storm.topology.base.BaseRichBolt;
-import backtype.storm.topology.*;
-import backtype.storm.tuple.*;
-import backtype.storm.task.*;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.topology.*;
+import org.apache.storm.tuple.*;
+import org.apache.storm.task.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -121,7 +123,7 @@ public class HdfsSpoutTopology {
// 3 - Create and configure topology
conf.setDebug(true);
conf.setNumWorkers(WORKER_NUM);
- conf.registerMetricsConsumer(backtype.storm.metric.LoggingMetricsConsumer.class);
+ conf.registerMetricsConsumer(LoggingMetricsConsumer.class);
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(SPOUT_ID, spout, spoutNum);
http://git-wip-us.apache.org/repos/asf/storm/blob/d17b3b9c/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java
index 0217cf9..a7cb2b8 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java
@@ -112,8 +112,7 @@ public class FileLock {
/** returns lock on file or null if file is already locked. throws if unexpected problem */
public static FileLock tryLock(FileSystem fs, Path fileToLock, Path lockDirPath, String spoutId)
throws IOException {
- String lockFileName = lockDirPath.toString() + Path.SEPARATOR_CHAR + fileToLock.getName();
- Path lockFile = new Path(lockFileName);
+ Path lockFile = new Path(lockDirPath, fileToLock.getName());
try {
FSDataOutputStream ostream = HdfsUtils.tryCreateFile(fs, lockFile);
@@ -148,7 +147,13 @@ public class FileLock {
// timestamp in last line of file to see when the last update was made
LogEntry lastEntry = getLastEntry(fs, lockFile);
if(lastEntry==null) {
- throw new RuntimeException(lockFile.getName() + " is empty. this file is invalid.");
+ LOG.warn("Empty lock file found. Deleting it. {}", lockFile);
+ try {
+ if(!fs.delete(lockFile, false))
+ throw new IOException("Empty lock file deletion failed");
+ } catch (Exception e) {
+ LOG.error("Unable to delete empty lock file " + lockFile, e);
+ }
}
if( lastEntry.eventTime <= olderThan )
return lastEntry;
http://git-wip-us.apache.org/repos/asf/storm/blob/d17b3b9c/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileReader.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileReader.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileReader.java
index 1cb1f59..54a90d4 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileReader.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileReader.java
@@ -18,7 +18,6 @@
package org.apache.storm.hdfs.spout;
-import backtype.storm.tuple.Fields;
import org.apache.hadoop.fs.Path;
import java.io.IOException;
http://git-wip-us.apache.org/repos/asf/storm/blob/d17b3b9c/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
index 5428570..06896b2 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
@@ -30,7 +30,7 @@ import java.util.TimerTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
-import backtype.storm.Config;
+import org.apache.storm.Config;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -39,11 +39,11 @@ import org.apache.storm.hdfs.common.security.HdfsSecurityUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichSpout;
-import backtype.storm.tuple.Fields;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichSpout;
+import org.apache.storm.tuple.Fields;
public class HdfsSpout extends BaseRichSpout {
@@ -309,7 +309,7 @@ public class HdfsSpout extends BaseRichSpout {
Map<String, Object> map = (Map<String, Object>)conf.get(configKey);
if(map != null) {
for(String keyName : map.keySet()){
- LOG.info("HDFS Config override : " + keyName + " = " + String.valueOf(map.get(keyName)));
+ LOG.info("HDFS Config override : {} = {} ", keyName, String.valueOf(map.get(keyName)));
this.hdfsConfig.set(keyName, String.valueOf(map.get(keyName)));
}
try {
@@ -373,9 +373,9 @@ public class HdfsSpout extends BaseRichSpout {
this.ackEnabled = (ackerCount>0);
LOG.debug("ACKer count = {}", ackerCount);
}
- else {
- this.ackEnabled = false;
- LOG.debug("No ACKers config found");
+ else { // ackers==null when ackerCount not explicitly set on the topology
+ this.ackEnabled = true;
+ LOG.debug("ACK count not explicitly set on topology.");
}
LOG.info("ACK mode is {}", ackEnabled ? "enabled" : "disabled");
@@ -393,13 +393,15 @@ public class HdfsSpout extends BaseRichSpout {
}
}
- // -- max duplicate
- if( conf.get(Configs.MAX_OUTSTANDING) !=null )
- maxOutstanding = Integer.parseInt( conf.get(Configs.MAX_OUTSTANDING).toString() );
+ // -- max outstanding tuples
+ if( conf.get(Configs.MAX_OUTSTANDING) !=null ) {
+ maxOutstanding = Integer.parseInt(conf.get(Configs.MAX_OUTSTANDING).toString());
+ }
// -- clocks in sync
- if( conf.get(Configs.CLOCKS_INSYNC) !=null )
+ if( conf.get(Configs.CLOCKS_INSYNC) !=null ) {
clocksInSync = Boolean.parseBoolean(conf.get(Configs.CLOCKS_INSYNC).toString());
+ }
// -- spout id
spoutId = context.getThisComponentId();
@@ -530,7 +532,7 @@ public class HdfsSpout extends BaseRichSpout {
/**
* If clocks in sync, then acquires the oldest expired lock
* Else, on first call, just remembers the oldest expired lock, on next call check if the lock is updated. if not updated then acquires the lock
- * @return
+ * @return a lock object
* @throws IOException
*/
private FileLock getOldestExpiredLock() throws IOException {
http://git-wip-us.apache.org/repos/asf/storm/blob/d17b3b9c/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/ProgressTracker.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/ProgressTracker.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/ProgressTracker.java
index d7de3ed..e2e7126 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/ProgressTracker.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/ProgressTracker.java
@@ -25,7 +25,7 @@ public class ProgressTracker {
TreeSet<FileOffset> offsets = new TreeSet<>();
- public void recordAckedOffset(FileOffset newOffset) {
+ public synchronized void recordAckedOffset(FileOffset newOffset) {
if(newOffset==null) {
return;
}
@@ -40,7 +40,7 @@ public class ProgressTracker {
// remove contiguous elements from the head of the heap
// e.g.: 1,2,3,4,10,11,12,15 => 4,10,11,12,15
- private void trimHead() {
+ private synchronized void trimHead() {
if(offsets.size()<=1) {
return;
}
@@ -53,18 +53,18 @@ public class ProgressTracker {
return;
}
- public FileOffset getCommitPosition() {
+ public synchronized FileOffset getCommitPosition() {
if(!offsets.isEmpty()) {
return offsets.first().clone();
}
return null;
}
- public void dumpState(PrintStream stream) {
+ public synchronized void dumpState(PrintStream stream) {
stream.println(offsets);
}
- public int size() {
+ public synchronized int size() {
return offsets.size();
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/d17b3b9c/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java
index 835a714..330afe9 100644
--- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java
+++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java
@@ -18,9 +18,9 @@
package org.apache.storm.hdfs.spout;
-import backtype.storm.Config;
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
+import org.apache.storm.Config;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.io.Writable;
@@ -44,7 +44,6 @@ import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
-import sun.reflect.generics.reflectiveObjects.NotImplementedException;
import java.io.BufferedReader;
import java.io.File;
@@ -58,6 +57,7 @@ import java.util.List;
import java.util.Map;
import org.apache.storm.hdfs.common.HdfsUtils.Pair;
+import sun.reflect.generics.reflectiveObjects.NotImplementedException;
public class TestHdfsSpout {
@@ -557,6 +557,7 @@ public class TestHdfsSpout {
conf.put(Configs.ARCHIVE_DIR, archive.toString());
conf.put(Configs.BAD_DIR, badfiles.toString());
conf.put(Configs.HDFS_URI, hdfsCluster.getURI().toString());
+ conf.put(Config.TOPOLOGY_ACKER_EXECUTORS, "0");
return conf;
}