You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/03/03 23:37:00 UTC

svn commit: r1573789 - in /incubator/streams/branches/STREAMS-26: ./ streams-contrib/streams-persist-elasticsearch/ streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ streams-contrib/streams-persist-hdfs/ stre...

Author: sblackmon
Date: Mon Mar  3 22:36:59 2014
New Revision: 1573789

URL: http://svn.apache.org/r1573789
Log:
updates to ES 1.0.1, perpetual read from hdfs

Modified:
    incubator/streams/branches/STREAMS-26/pom.xml
    incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/pom.xml
    incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
    incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-hdfs/pom.xml
    incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
    incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-moreover/pom.xml
    incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverClient.java
    incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverConfigurator.java
    incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverProvider.java
    incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverProviderTask.java
    incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverResult.java
    incubator/streams/branches/STREAMS-26/streams-pojo/pom.xml

Modified: incubator/streams/branches/STREAMS-26/pom.xml
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/pom.xml?rev=1573789&r1=1573788&r2=1573789&view=diff
==============================================================================
--- incubator/streams/branches/STREAMS-26/pom.xml (original)
+++ incubator/streams/branches/STREAMS-26/pom.xml Mon Mar  3 22:36:59 2014
@@ -155,6 +155,10 @@
             <artifactId>slf4j-api</artifactId>
         </dependency>
         <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-log4j12</artifactId>
+        </dependency>
+        <dependency>
             <groupId>ch.qos.logback</groupId>
             <artifactId>logback-classic</artifactId>
         </dependency>
@@ -197,6 +201,11 @@
                 <version>${slf4j.version}</version>
             </dependency>
             <dependency>
+                <groupId>org.slf4j</groupId>
+                <artifactId>slf4j-log4j12</artifactId>
+                <version>${slf4j.version}</version>
+            </dependency>
+            <dependency>
                 <groupId>ch.qos.logback</groupId>
                 <artifactId>logback-classic</artifactId>
                 <version>${logback.version}</version>

Modified: incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/pom.xml
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/pom.xml?rev=1573789&r1=1573788&r2=1573789&view=diff
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/pom.xml (original)
+++ incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/pom.xml Mon Mar  3 22:36:59 2014
@@ -35,7 +35,7 @@
         <dependency>
             <groupId>org.elasticsearch</groupId>
             <artifactId>elasticsearch</artifactId>
-            <version>1.0.0</version>
+            <version>1.0.1</version>
             <scope>compile</scope>
             <type>jar</type>
         </dependency>

Modified: incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java?rev=1573789&r1=1573788&r2=1573789&view=diff
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java (original)
+++ incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java Mon Mar  3 22:36:59 2014
@@ -353,6 +353,9 @@ public class ElasticsearchPersistUpdater
     {
         UpdateRequest updateRequest;
 
+        Preconditions.checkNotNull(id);
+        Preconditions.checkNotNull(json);
+
         // They didn't specify an ID, so we will create one for them.
         updateRequest = new UpdateRequest()
             .index(indexName)

Modified: incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-hdfs/pom.xml
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-hdfs/pom.xml?rev=1573789&r1=1573788&r2=1573789&view=diff
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-hdfs/pom.xml (original)
+++ incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-hdfs/pom.xml Mon Mar  3 22:36:59 2014
@@ -84,6 +84,7 @@
                     <sourcePaths>
                         <sourcePath>src/main/jsonschema/org/apache/streams/hdfs/HdfsConfiguration.json</sourcePath>
                         <sourcePath>src/main/jsonschema/org/apache/streams/hdfs/HdfsWriterConfiguration.json</sourcePath>
+                        <sourcePath>src/main/jsonschema/org/apache/streams/hdfs/HdfsReaderConfiguration.json</sourcePath>
                     </sourcePaths>
                     <outputDirectory>target/generated-sources/jsonschema2pojo</outputDirectory>
                     <targetPackage>org.apache.streams.hdfs.pojo</targetPackage>

Modified: incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java?rev=1573789&r1=1573788&r2=1573789&view=diff
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java (original)
+++ incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java Mon Mar  3 22:36:59 2014
@@ -47,7 +47,7 @@ public class WebHdfsPersistWriter implem
     private OutputStreamWriter currentWriter = null;
 
     private static final int  BYTES_IN_MB = 1024*1024;
-    private static final int  BYTES_BEFORE_FLUSH = 5 * BYTES_IN_MB;
+    private static final int  BYTES_BEFORE_FLUSH = 64 * BYTES_IN_MB;
     private volatile int  totalByteCount = 0;
     private volatile int  byteCount = 0;
 

Modified: incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-moreover/pom.xml
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-moreover/pom.xml?rev=1573789&r1=1573788&r2=1573789&view=diff
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-moreover/pom.xml (original)
+++ incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-moreover/pom.xml Mon Mar  3 22:36:59 2014
@@ -43,6 +43,7 @@
         <dependency>
             <groupId>org.slf4j</groupId>
             <artifactId>slf4j-log4j12</artifactId>
+            <scope>provided</scope>
         </dependency>
         <dependency>
             <groupId>commons-io</groupId>

Modified: incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverClient.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverClient.java?rev=1573789&r1=1573788&r2=1573789&view=diff
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverClient.java (original)
+++ incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverClient.java Mon Mar  3 22:36:59 2014
@@ -28,14 +28,15 @@ public class MoreoverClient {
     public long pullTime;
     private boolean debug;
 
-    public MoreoverClient(String id, String apiKey) {
-        logger.info("Constructed new client for id:{} key:{}", id, apiKey);
+    public MoreoverClient(String id, String apiKey, String sequence) {
+        logger.info("Constructed new client for id:{} key:{} sequence:{}", id, apiKey, sequence);
         this.id = id;
         this.apiKey = apiKey;
+        this.lastSequenceId = new BigInteger(sequence);
     }
 
     public MoreoverResult getArticlesAfter(String sequenceId, int limit) throws IOException {
-        String urlString = String.format(BASE_URL, this.apiKey, limit, (sequenceId == null ? 0 : sequenceId));
+        String urlString = String.format(BASE_URL, this.apiKey, limit, sequenceId);
         logger.debug("Making call to {}", urlString);
         long start = System.nanoTime();
         MoreoverResult result = new MoreoverResult(id, getArticles(new URL(urlString)), start, System.nanoTime());

Modified: incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverConfigurator.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverConfigurator.java?rev=1573789&r1=1573788&r2=1573789&view=diff
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverConfigurator.java (original)
+++ incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverConfigurator.java Mon Mar  3 22:36:59 2014
@@ -22,10 +22,16 @@ public class MoreoverConfigurator {
 
         List<MoreoverKeyData> apiKeys = Lists.newArrayList();
 
-        for( String key : moreover.getStringList("apiKeys")) {
-            apiKeys.add(new MoreoverKeyData().withId(key).withKey(key));
-            // TODO: implement starting sequence
-        }
+        Config apiKeysConfig = moreover.getConfig("apiKeys");
+
+        if( !apiKeysConfig.isEmpty())
+            for( String apiKeyId : apiKeysConfig.root().keySet() ) {
+                Config apiKeyConfig = apiKeysConfig.getConfig(apiKeyId);
+                apiKeys.add(new MoreoverKeyData()
+                        .withId(apiKeyConfig.getString("key"))
+                        .withKey(apiKeyConfig.getString("key"))
+                        .withStartingSequence(apiKeyConfig.getString("startingSequence")));
+            }
         moreoverConfiguration.setApiKeys(apiKeys);
 
         return moreoverConfiguration;

Modified: incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverProvider.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverProvider.java?rev=1573789&r1=1573788&r2=1573789&view=diff
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverProvider.java (original)
+++ incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverProvider.java Mon Mar  3 22:36:59 2014
@@ -50,17 +50,14 @@ public class MoreoverProvider implements
 
     @Override
     public synchronized StreamsResultSet readCurrent() {
-        LOGGER.debug("readCurrent");
 
-        LOGGER.info("Providing {} docs", providerQueue.size());
+        LOGGER.debug("readCurrent: {}", providerQueue.size());
 
         Collection<StreamsDatum> currentIterator = Lists.newArrayList();
         Iterators.addAll(currentIterator, providerQueue.iterator());
 
         StreamsResultSet current = new StreamsResultSet(Queues.newConcurrentLinkedQueue(currentIterator));
 
-        LOGGER.info("Exiting");
-
         providerQueue.clear();
 
         return current;

Modified: incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverProviderTask.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverProviderTask.java?rev=1573789&r1=1573788&r2=1573789&view=diff
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverProviderTask.java (original)
+++ incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverProviderTask.java Mon Mar  3 22:36:59 2014
@@ -7,6 +7,7 @@ import org.apache.streams.core.StreamsRe
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.math.BigInteger;
 import java.util.Queue;
 
 /**
@@ -18,7 +19,7 @@ public class MoreoverProviderTask implem
     public static final int REQUIRED_LATENCY = LATENCY * 1000;
     private static Logger logger = LoggerFactory.getLogger(MoreoverProviderTask.class);
 
-    private final String lastSequence;
+    private String lastSequence;
     private final String apiKey;
     private final String apiId;
     private final Queue<StreamsDatum> results;
@@ -31,7 +32,7 @@ public class MoreoverProviderTask implem
         this.apiKey = apiKey;
         this.results = results;
         this.lastSequence = lastSequence;
-        this.moClient = new MoreoverClient(this.apiId, this.apiKey);
+        this.moClient = new MoreoverClient(this.apiId, this.apiKey, this.lastSequence);
         initializeClient(moClient);
     }
 
@@ -40,12 +41,13 @@ public class MoreoverProviderTask implem
         while(true) {
             try {
                 ensureTime(moClient);
-                MoreoverResult result = started ? moClient.getNextBatch() : moClient.getArticlesAfter(lastSequence, 500);
+                MoreoverResult result = moClient.getArticlesAfter(lastSequence, 500);
                 started = true;
-                result.process();
+                lastSequence = result.process().toString();
                 for(StreamsDatum entry : ImmutableSet.copyOf(result.iterator()))
                     results.offer(entry);
-                logger.info("ApiKey={}\tlastSequenceid={}", this.apiKey, result.getMaxSequencedId());
+                logger.info("ApiKey={}\tlastSequenceid={}", this.apiKey, lastSequence);
+
             } catch (Exception e) {
                 logger.error("Exception while polling moreover", e);
             }
@@ -64,7 +66,7 @@ public class MoreoverProviderTask implem
 
     private void initializeClient(MoreoverClient moClient) {
         try {
-            moClient.getArticlesAfter("0", 2);
+            moClient.getArticlesAfter(this.lastSequence, 2);
         } catch (Exception e) {
             logger.error("Failed to start stream, {}", this.apiKey);
             logger.error("Exception : ", e);

Modified: incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverResult.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverResult.java?rev=1573789&r1=1573788&r2=1573789&view=diff
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverResult.java (original)
+++ incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverResult.java Mon Mar  3 22:36:59 2014
@@ -108,7 +108,7 @@ public class MoreoverResult implements I
         return end;
     }
 
-    public void process() {
+    public BigInteger process() {
 
         try {
             this.resultObject = xmlMapper.readValue(xmlString, ArticlesResponse.class);
@@ -124,11 +124,10 @@ public class MoreoverResult implements I
             logger.trace("Prior max sequence Id {} current candidate {}", this.maxSequencedId, sequenceid);
             if (sequenceid.compareTo(this.maxSequencedId) > 0) {
                 this.maxSequencedId = sequenceid;
-                logger.debug("New max sequence Id {}", this.maxSequencedId);
             }
-
         }
 
+        return this.maxSequencedId;
     }
 
     public String getXmlString() {

Modified: incubator/streams/branches/STREAMS-26/streams-pojo/pom.xml
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-pojo/pom.xml?rev=1573789&r1=1573788&r2=1573789&view=diff
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-pojo/pom.xml (original)
+++ incubator/streams/branches/STREAMS-26/streams-pojo/pom.xml Mon Mar  3 22:36:59 2014
@@ -90,13 +90,11 @@
         <dependency>
             <groupId>org.slf4j</groupId>
             <artifactId>slf4j-api</artifactId>
-            <version>1.6.1</version>
         </dependency>
 
         <dependency>
             <groupId>ch.qos.logback</groupId>
             <artifactId>logback-classic</artifactId>
-            <version>1.0.9</version>
         </dependency>
 
     </dependencies>