You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/03/22 00:20:29 UTC

[51/71] [abbrv] git commit: updates to ES 1.0.1, perpetual read from hdfs

updates to ES 1.0.1, perpetual read from hdfs

git-svn-id: https://svn.apache.org/repos/asf/incubator/streams/branches/STREAMS-26@1573789 13f79535-47bb-0310-9956-ffa450edef68


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/63a85a83
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/63a85a83
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/63a85a83

Branch: refs/heads/master
Commit: 63a85a83f5904f4362ebb91f659cbd6695a60f72
Parents: e6f3d4e
Author: sblackmon <sb...@unknown>
Authored: Mon Mar 3 22:36:59 2014 +0000
Committer: sblackmon <sb...@unknown>
Committed: Mon Mar 3 22:36:59 2014 +0000

----------------------------------------------------------------------
 pom.xml                                               |  9 +++++++++
 streams-contrib/streams-persist-elasticsearch/pom.xml |  2 +-
 .../elasticsearch/ElasticsearchPersistUpdater.java    |  3 +++
 streams-contrib/streams-persist-hdfs/pom.xml          |  1 +
 .../org/apache/streams/hdfs/WebHdfsPersistWriter.java |  2 +-
 streams-contrib/streams-provider-moreover/pom.xml     |  1 +
 .../apache/streams/data/moreover/MoreoverClient.java  |  7 ++++---
 .../streams/data/moreover/MoreoverConfigurator.java   | 14 ++++++++++----
 .../streams/data/moreover/MoreoverProvider.java       |  5 +----
 .../streams/data/moreover/MoreoverProviderTask.java   | 14 ++++++++------
 .../apache/streams/data/moreover/MoreoverResult.java  |  5 ++---
 streams-pojo/pom.xml                                  |  2 --
 12 files changed, 41 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/63a85a83/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 366ae94..77d5371 100644
--- a/pom.xml
+++ b/pom.xml
@@ -155,6 +155,10 @@
             <artifactId>slf4j-api</artifactId>
         </dependency>
         <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-log4j12</artifactId>
+        </dependency>
+        <dependency>
             <groupId>ch.qos.logback</groupId>
             <artifactId>logback-classic</artifactId>
         </dependency>
@@ -197,6 +201,11 @@
                 <version>${slf4j.version}</version>
             </dependency>
             <dependency>
+                <groupId>org.slf4j</groupId>
+                <artifactId>slf4j-log4j12</artifactId>
+                <version>${slf4j.version}</version>
+            </dependency>
+            <dependency>
                 <groupId>ch.qos.logback</groupId>
                 <artifactId>logback-classic</artifactId>
                 <version>${logback.version}</version>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/63a85a83/streams-contrib/streams-persist-elasticsearch/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/pom.xml b/streams-contrib/streams-persist-elasticsearch/pom.xml
index ee11944..b487e58 100644
--- a/streams-contrib/streams-persist-elasticsearch/pom.xml
+++ b/streams-contrib/streams-persist-elasticsearch/pom.xml
@@ -35,7 +35,7 @@
         <dependency>
             <groupId>org.elasticsearch</groupId>
             <artifactId>elasticsearch</artifactId>
-            <version>1.0.0</version>
+            <version>1.0.1</version>
             <scope>compile</scope>
             <type>jar</type>
         </dependency>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/63a85a83/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
index ef6c28d..b55ad2c 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
@@ -353,6 +353,9 @@ public class ElasticsearchPersistUpdater extends ElasticsearchPersistWriter impl
     {
         UpdateRequest updateRequest;
 
+        Preconditions.checkNotNull(id);
+        Preconditions.checkNotNull(json);
+
         // They didn't specify an ID, so we will create one for them.
         updateRequest = new UpdateRequest()
             .index(indexName)

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/63a85a83/streams-contrib/streams-persist-hdfs/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/pom.xml b/streams-contrib/streams-persist-hdfs/pom.xml
index 1142976..1031205 100644
--- a/streams-contrib/streams-persist-hdfs/pom.xml
+++ b/streams-contrib/streams-persist-hdfs/pom.xml
@@ -84,6 +84,7 @@
                     <sourcePaths>
                         <sourcePath>src/main/jsonschema/org/apache/streams/hdfs/HdfsConfiguration.json</sourcePath>
                         <sourcePath>src/main/jsonschema/org/apache/streams/hdfs/HdfsWriterConfiguration.json</sourcePath>
+                        <sourcePath>src/main/jsonschema/org/apache/streams/hdfs/HdfsReaderConfiguration.json</sourcePath>
                     </sourcePaths>
                     <outputDirectory>target/generated-sources/jsonschema2pojo</outputDirectory>
                     <targetPackage>org.apache.streams.hdfs.pojo</targetPackage>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/63a85a83/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
index 0fc8407..55c55b3 100644
--- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
+++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
@@ -47,7 +47,7 @@ public class WebHdfsPersistWriter implements StreamsPersistWriter, Flushable, Cl
     private OutputStreamWriter currentWriter = null;
 
     private static final int  BYTES_IN_MB = 1024*1024;
-    private static final int  BYTES_BEFORE_FLUSH = 5 * BYTES_IN_MB;
+    private static final int  BYTES_BEFORE_FLUSH = 64 * BYTES_IN_MB;
     private volatile int  totalByteCount = 0;
     private volatile int  byteCount = 0;
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/63a85a83/streams-contrib/streams-provider-moreover/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-moreover/pom.xml b/streams-contrib/streams-provider-moreover/pom.xml
index 4988715..521a865 100644
--- a/streams-contrib/streams-provider-moreover/pom.xml
+++ b/streams-contrib/streams-provider-moreover/pom.xml
@@ -43,6 +43,7 @@
         <dependency>
             <groupId>org.slf4j</groupId>
             <artifactId>slf4j-log4j12</artifactId>
+            <scope>provided</scope>
         </dependency>
         <dependency>
             <groupId>commons-io</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/63a85a83/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverClient.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverClient.java b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverClient.java
index 2e63d9b..a43c4d8 100644
--- a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverClient.java
+++ b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverClient.java
@@ -28,14 +28,15 @@ public class MoreoverClient {
     public long pullTime;
     private boolean debug;
 
-    public MoreoverClient(String id, String apiKey) {
-        logger.info("Constructed new client for id:{} key:{}", id, apiKey);
+    public MoreoverClient(String id, String apiKey, String sequence) {
+        logger.info("Constructed new client for id:{} key:{} sequence:{}", id, apiKey, sequence);
         this.id = id;
         this.apiKey = apiKey;
+        this.lastSequenceId = new BigInteger(sequence);
     }
 
     public MoreoverResult getArticlesAfter(String sequenceId, int limit) throws IOException {
-        String urlString = String.format(BASE_URL, this.apiKey, limit, (sequenceId == null ? 0 : sequenceId));
+        String urlString = String.format(BASE_URL, this.apiKey, limit, sequenceId);
         logger.debug("Making call to {}", urlString);
         long start = System.nanoTime();
         MoreoverResult result = new MoreoverResult(id, getArticles(new URL(urlString)), start, System.nanoTime());

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/63a85a83/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverConfigurator.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverConfigurator.java b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverConfigurator.java
index d2589e0..c7e8c17 100644
--- a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverConfigurator.java
+++ b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverConfigurator.java
@@ -22,10 +22,16 @@ public class MoreoverConfigurator {
 
         List<MoreoverKeyData> apiKeys = Lists.newArrayList();
 
-        for( String key : moreover.getStringList("apiKeys")) {
-            apiKeys.add(new MoreoverKeyData().withId(key).withKey(key));
-            // TODO: implement starting sequence
-        }
+        Config apiKeysConfig = moreover.getConfig("apiKeys");
+
+        if( !apiKeysConfig.isEmpty())
+            for( String apiKeyId : apiKeysConfig.root().keySet() ) {
+                Config apiKeyConfig = apiKeysConfig.getConfig(apiKeyId);
+                apiKeys.add(new MoreoverKeyData()
+                        .withId(apiKeyConfig.getString("key"))
+                        .withKey(apiKeyConfig.getString("key"))
+                        .withStartingSequence(apiKeyConfig.getString("startingSequence")));
+            }
         moreoverConfiguration.setApiKeys(apiKeys);
 
         return moreoverConfiguration;

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/63a85a83/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverProvider.java b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverProvider.java
index 5fd83eb..85ef7a5 100644
--- a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverProvider.java
+++ b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverProvider.java
@@ -50,17 +50,14 @@ public class MoreoverProvider implements StreamsProvider {
 
     @Override
     public synchronized StreamsResultSet readCurrent() {
-        LOGGER.debug("readCurrent");
 
-        LOGGER.info("Providing {} docs", providerQueue.size());
+        LOGGER.debug("readCurrent: {}", providerQueue.size());
 
         Collection<StreamsDatum> currentIterator = Lists.newArrayList();
         Iterators.addAll(currentIterator, providerQueue.iterator());
 
         StreamsResultSet current = new StreamsResultSet(Queues.newConcurrentLinkedQueue(currentIterator));
 
-        LOGGER.info("Exiting");
-
         providerQueue.clear();
 
         return current;

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/63a85a83/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverProviderTask.java b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverProviderTask.java
index 5fa0298..4908ea5 100644
--- a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverProviderTask.java
+++ b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverProviderTask.java
@@ -7,6 +7,7 @@ import org.apache.streams.core.StreamsResultSet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.math.BigInteger;
 import java.util.Queue;
 
 /**
@@ -18,7 +19,7 @@ public class MoreoverProviderTask implements Runnable {
     public static final int REQUIRED_LATENCY = LATENCY * 1000;
     private static Logger logger = LoggerFactory.getLogger(MoreoverProviderTask.class);
 
-    private final String lastSequence;
+    private String lastSequence;
     private final String apiKey;
     private final String apiId;
     private final Queue<StreamsDatum> results;
@@ -31,7 +32,7 @@ public class MoreoverProviderTask implements Runnable {
         this.apiKey = apiKey;
         this.results = results;
         this.lastSequence = lastSequence;
-        this.moClient = new MoreoverClient(this.apiId, this.apiKey);
+        this.moClient = new MoreoverClient(this.apiId, this.apiKey, this.lastSequence);
         initializeClient(moClient);
     }
 
@@ -40,12 +41,13 @@ public class MoreoverProviderTask implements Runnable {
         while(true) {
             try {
                 ensureTime(moClient);
-                MoreoverResult result = started ? moClient.getNextBatch() : moClient.getArticlesAfter(lastSequence, 500);
+                MoreoverResult result = moClient.getArticlesAfter(lastSequence, 500);
                 started = true;
-                result.process();
+                lastSequence = result.process().toString();
                 for(StreamsDatum entry : ImmutableSet.copyOf(result.iterator()))
                     results.offer(entry);
-                logger.info("ApiKey={}\tlastSequenceid={}", this.apiKey, result.getMaxSequencedId());
+                logger.info("ApiKey={}\tlastSequenceid={}", this.apiKey, lastSequence);
+
             } catch (Exception e) {
                 logger.error("Exception while polling moreover", e);
             }
@@ -64,7 +66,7 @@ public class MoreoverProviderTask implements Runnable {
 
     private void initializeClient(MoreoverClient moClient) {
         try {
-            moClient.getArticlesAfter("0", 2);
+            moClient.getArticlesAfter(this.lastSequence, 2);
         } catch (Exception e) {
             logger.error("Failed to start stream, {}", this.apiKey);
             logger.error("Exception : ", e);

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/63a85a83/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverResult.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverResult.java b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverResult.java
index ed5f305..50e3f66 100644
--- a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverResult.java
+++ b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverResult.java
@@ -108,7 +108,7 @@ public class MoreoverResult implements Iterable<StreamsDatum> {
         return end;
     }
 
-    public void process() {
+    public BigInteger process() {
 
         try {
             this.resultObject = xmlMapper.readValue(xmlString, ArticlesResponse.class);
@@ -124,11 +124,10 @@ public class MoreoverResult implements Iterable<StreamsDatum> {
             logger.trace("Prior max sequence Id {} current candidate {}", this.maxSequencedId, sequenceid);
             if (sequenceid.compareTo(this.maxSequencedId) > 0) {
                 this.maxSequencedId = sequenceid;
-                logger.debug("New max sequence Id {}", this.maxSequencedId);
             }
-
         }
 
+        return this.maxSequencedId;
     }
 
     public String getXmlString() {

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/63a85a83/streams-pojo/pom.xml
----------------------------------------------------------------------
diff --git a/streams-pojo/pom.xml b/streams-pojo/pom.xml
index dc75aba..72f8418 100644
--- a/streams-pojo/pom.xml
+++ b/streams-pojo/pom.xml
@@ -90,13 +90,11 @@
         <dependency>
             <groupId>org.slf4j</groupId>
             <artifactId>slf4j-api</artifactId>
-            <version>1.6.1</version>
         </dependency>
 
         <dependency>
             <groupId>ch.qos.logback</groupId>
             <artifactId>logback-classic</artifactId>
-            <version>1.0.9</version>
         </dependency>
 
     </dependencies>