You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2016/01/15 17:46:22 UTC

[21/24] storm git commit: fixing issues introduced due to jstorm related refactoring

fixing  issues introduced due to jstorm related refactoring


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/d17b3b9c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/d17b3b9c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/d17b3b9c

Branch: refs/heads/1.x-branch
Commit: d17b3b9c3cbc89d854bfb436d213d11cfd4545ec
Parents: 2c02bc9
Author: Roshan Naik <ro...@hortonworks.com>
Authored: Thu Jan 14 00:40:43 2016 -0800
Committer: Roshan Naik <ro...@hortonworks.com>
Committed: Thu Jan 14 11:34:57 2016 -0800

----------------------------------------------------------------------
 .../jvm/storm/starter/HdfsSpoutTopology.java    | 24 ++++++++-------
 .../org/apache/storm/hdfs/spout/FileLock.java   | 11 +++++--
 .../org/apache/storm/hdfs/spout/FileReader.java |  1 -
 .../org/apache/storm/hdfs/spout/HdfsSpout.java  | 32 +++++++++++---------
 .../storm/hdfs/spout/ProgressTracker.java       | 10 +++---
 .../apache/storm/hdfs/spout/TestHdfsSpout.java  |  9 +++---
 6 files changed, 48 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/d17b3b9c/examples/storm-starter/src/jvm/storm/starter/HdfsSpoutTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/HdfsSpoutTopology.java b/examples/storm-starter/src/jvm/storm/starter/HdfsSpoutTopology.java
index ca6b045..191886c 100644
--- a/examples/storm-starter/src/jvm/storm/starter/HdfsSpoutTopology.java
+++ b/examples/storm-starter/src/jvm/storm/starter/HdfsSpoutTopology.java
@@ -18,18 +18,20 @@
 
 package storm.starter;
 
-import backtype.storm.Config;
-import backtype.storm.StormSubmitter;
-import backtype.storm.generated.Nimbus;
-import backtype.storm.topology.TopologyBuilder;
-import backtype.storm.utils.NimbusClient;
-import backtype.storm.utils.Utils;
+import org.apache.storm.Config;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.generated.Nimbus;
+import org.apache.storm.metric.LoggingMetricsConsumer;
+import org.apache.storm.starter.FastWordCountTopology;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.utils.NimbusClient;
+import org.apache.storm.utils.Utils;
 import org.apache.storm.hdfs.spout.Configs;
 import org.apache.storm.hdfs.spout.HdfsSpout;
-import backtype.storm.topology.base.BaseRichBolt;
-import backtype.storm.topology.*;
-import backtype.storm.tuple.*;
-import backtype.storm.task.*;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.topology.*;
+import org.apache.storm.tuple.*;
+import org.apache.storm.task.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -121,7 +123,7 @@ public class HdfsSpoutTopology {
     // 3 - Create and configure topology
     conf.setDebug(true);
     conf.setNumWorkers(WORKER_NUM);
-    conf.registerMetricsConsumer(backtype.storm.metric.LoggingMetricsConsumer.class);
+    conf.registerMetricsConsumer(LoggingMetricsConsumer.class);
 
     TopologyBuilder builder = new TopologyBuilder();
     builder.setSpout(SPOUT_ID, spout, spoutNum);

http://git-wip-us.apache.org/repos/asf/storm/blob/d17b3b9c/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java
index 0217cf9..a7cb2b8 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java
@@ -112,8 +112,7 @@ public class FileLock {
   /** returns lock on file or null if file is already locked. throws if unexpected problem */
   public static FileLock tryLock(FileSystem fs, Path fileToLock, Path lockDirPath, String spoutId)
           throws IOException {
-    String lockFileName = lockDirPath.toString() + Path.SEPARATOR_CHAR + fileToLock.getName();
-    Path lockFile = new Path(lockFileName);
+    Path lockFile = new Path(lockDirPath, fileToLock.getName());
 
     try {
       FSDataOutputStream ostream = HdfsUtils.tryCreateFile(fs, lockFile);
@@ -148,7 +147,13 @@ public class FileLock {
       // timestamp in last line of file to see when the last update was made
       LogEntry lastEntry =  getLastEntry(fs, lockFile);
       if(lastEntry==null) {
-        throw new RuntimeException(lockFile.getName() + " is empty. this file is invalid.");
+        LOG.warn("Empty lock file found. Deleting it. {}", lockFile);
+        try {
+          if(!fs.delete(lockFile, false))
+            throw new IOException("Empty lock file deletion failed");
+        } catch (Exception e) {
+          LOG.error("Unable to delete empty lock file " + lockFile, e);
+        }
       }
       if( lastEntry.eventTime <= olderThan )
         return lastEntry;

http://git-wip-us.apache.org/repos/asf/storm/blob/d17b3b9c/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileReader.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileReader.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileReader.java
index 1cb1f59..54a90d4 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileReader.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileReader.java
@@ -18,7 +18,6 @@
 
 package org.apache.storm.hdfs.spout;
 
-import backtype.storm.tuple.Fields;
 import org.apache.hadoop.fs.Path;
 
 import java.io.IOException;

http://git-wip-us.apache.org/repos/asf/storm/blob/d17b3b9c/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
index 5428570..06896b2 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
@@ -30,7 +30,7 @@ import java.util.TimerTask;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import backtype.storm.Config;
+import org.apache.storm.Config;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -39,11 +39,11 @@ import org.apache.storm.hdfs.common.security.HdfsSecurityUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichSpout;
-import backtype.storm.tuple.Fields;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichSpout;
+import org.apache.storm.tuple.Fields;
 
 public class HdfsSpout extends BaseRichSpout {
 
@@ -309,7 +309,7 @@ public class HdfsSpout extends BaseRichSpout {
       Map<String, Object> map = (Map<String, Object>)conf.get(configKey);
         if(map != null) {
           for(String keyName : map.keySet()){
-            LOG.info("HDFS Config override : " + keyName + " = " + String.valueOf(map.get(keyName)));
+            LOG.info("HDFS Config override : {} = {} ", keyName, String.valueOf(map.get(keyName)));
             this.hdfsConfig.set(keyName, String.valueOf(map.get(keyName)));
           }
           try {
@@ -373,9 +373,9 @@ public class HdfsSpout extends BaseRichSpout {
       this.ackEnabled = (ackerCount>0);
       LOG.debug("ACKer count = {}", ackerCount);
     }
-    else {
-      this.ackEnabled = false;
-      LOG.debug("No ACKers config found");
+    else { // ackers==null when ackerCount not explicitly set on the topology
+      this.ackEnabled = true;
+      LOG.debug("ACK count not explicitly set on topology.");
     }
 
     LOG.info("ACK mode is {}", ackEnabled ? "enabled" : "disabled");
@@ -393,13 +393,15 @@ public class HdfsSpout extends BaseRichSpout {
       }
     }
 
-    // -- max duplicate
-    if( conf.get(Configs.MAX_OUTSTANDING) !=null )
-      maxOutstanding = Integer.parseInt( conf.get(Configs.MAX_OUTSTANDING).toString() );
+    // -- max outstanding tuples
+    if( conf.get(Configs.MAX_OUTSTANDING) !=null ) {
+      maxOutstanding = Integer.parseInt(conf.get(Configs.MAX_OUTSTANDING).toString());
+    }
 
     // -- clocks in sync
-    if( conf.get(Configs.CLOCKS_INSYNC) !=null )
+    if( conf.get(Configs.CLOCKS_INSYNC) !=null ) {
       clocksInSync = Boolean.parseBoolean(conf.get(Configs.CLOCKS_INSYNC).toString());
+    }
 
     // -- spout id
     spoutId = context.getThisComponentId();
@@ -530,7 +532,7 @@ public class HdfsSpout extends BaseRichSpout {
   /**
    * If clocks in sync, then acquires the oldest expired lock
    * Else, on first call, just remembers the oldest expired lock, on next call check if the lock is updated. if not updated then acquires the lock
-   * @return
+   * @return a lock object
    * @throws IOException
    */
   private FileLock getOldestExpiredLock() throws IOException {

http://git-wip-us.apache.org/repos/asf/storm/blob/d17b3b9c/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/ProgressTracker.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/ProgressTracker.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/ProgressTracker.java
index d7de3ed..e2e7126 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/ProgressTracker.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/ProgressTracker.java
@@ -25,7 +25,7 @@ public class ProgressTracker {
 
   TreeSet<FileOffset> offsets = new TreeSet<>();
 
-  public void recordAckedOffset(FileOffset newOffset) {
+  public synchronized void recordAckedOffset(FileOffset newOffset) {
     if(newOffset==null) {
       return;
     }
@@ -40,7 +40,7 @@ public class ProgressTracker {
 
   // remove contiguous elements from the head of the heap
   // e.g.:  1,2,3,4,10,11,12,15  =>  4,10,11,12,15
-  private void trimHead() {
+  private synchronized void trimHead() {
     if(offsets.size()<=1) {
       return;
     }
@@ -53,18 +53,18 @@ public class ProgressTracker {
     return;
   }
 
-  public FileOffset getCommitPosition() {
+  public synchronized FileOffset getCommitPosition() {
     if(!offsets.isEmpty()) {
       return offsets.first().clone();
     }
     return null;
   }
 
-  public void dumpState(PrintStream stream) {
+  public synchronized void dumpState(PrintStream stream) {
     stream.println(offsets);
   }
 
-  public int size() {
+  public synchronized int size() {
     return offsets.size();
   }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/d17b3b9c/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java
index 835a714..330afe9 100644
--- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java
+++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java
@@ -18,9 +18,9 @@
 
 package org.apache.storm.hdfs.spout;
 
-import backtype.storm.Config;
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
+import org.apache.storm.Config;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.io.Writable;
@@ -44,7 +44,6 @@ import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
-import sun.reflect.generics.reflectiveObjects.NotImplementedException;
 
 import java.io.BufferedReader;
 import java.io.File;
@@ -58,6 +57,7 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.storm.hdfs.common.HdfsUtils.Pair;
+import sun.reflect.generics.reflectiveObjects.NotImplementedException;
 
 
 public class TestHdfsSpout {
@@ -557,6 +557,7 @@ public class TestHdfsSpout {
     conf.put(Configs.ARCHIVE_DIR, archive.toString());
     conf.put(Configs.BAD_DIR, badfiles.toString());
     conf.put(Configs.HDFS_URI, hdfsCluster.getURI().toString());
+    conf.put(Config.TOPOLOGY_ACKER_EXECUTORS, "0");
     return conf;
   }