You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/03/03 23:37:00 UTC
svn commit: r1573789 - in /incubator/streams/branches/STREAMS-26: ./
streams-contrib/streams-persist-elasticsearch/
streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/
streams-contrib/streams-persist-hdfs/ stre...
Author: sblackmon
Date: Mon Mar 3 22:36:59 2014
New Revision: 1573789
URL: http://svn.apache.org/r1573789
Log:
updates to ES 1.0.1, perpetual read from hdfs
Modified:
incubator/streams/branches/STREAMS-26/pom.xml
incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/pom.xml
incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-hdfs/pom.xml
incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-moreover/pom.xml
incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverClient.java
incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverConfigurator.java
incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverProvider.java
incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverProviderTask.java
incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverResult.java
incubator/streams/branches/STREAMS-26/streams-pojo/pom.xml
Modified: incubator/streams/branches/STREAMS-26/pom.xml
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/pom.xml?rev=1573789&r1=1573788&r2=1573789&view=diff
==============================================================================
--- incubator/streams/branches/STREAMS-26/pom.xml (original)
+++ incubator/streams/branches/STREAMS-26/pom.xml Mon Mar 3 22:36:59 2014
@@ -155,6 +155,10 @@
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </dependency>
+ <dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</dependency>
@@ -197,6 +201,11 @@
<version>${slf4j.version}</version>
</dependency>
<dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <version>${slf4j.version}</version>
+ </dependency>
+ <dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>${logback.version}</version>
Modified: incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/pom.xml
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/pom.xml?rev=1573789&r1=1573788&r2=1573789&view=diff
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/pom.xml (original)
+++ incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/pom.xml Mon Mar 3 22:36:59 2014
@@ -35,7 +35,7 @@
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
- <version>1.0.0</version>
+ <version>1.0.1</version>
<scope>compile</scope>
<type>jar</type>
</dependency>
Modified: incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java?rev=1573789&r1=1573788&r2=1573789&view=diff
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java (original)
+++ incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java Mon Mar 3 22:36:59 2014
@@ -353,6 +353,9 @@ public class ElasticsearchPersistUpdater
{
UpdateRequest updateRequest;
+ Preconditions.checkNotNull(id);
+ Preconditions.checkNotNull(json);
+
// They didn't specify an ID, so we will create one for them.
updateRequest = new UpdateRequest()
.index(indexName)
Modified: incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-hdfs/pom.xml
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-hdfs/pom.xml?rev=1573789&r1=1573788&r2=1573789&view=diff
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-hdfs/pom.xml (original)
+++ incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-hdfs/pom.xml Mon Mar 3 22:36:59 2014
@@ -84,6 +84,7 @@
<sourcePaths>
<sourcePath>src/main/jsonschema/org/apache/streams/hdfs/HdfsConfiguration.json</sourcePath>
<sourcePath>src/main/jsonschema/org/apache/streams/hdfs/HdfsWriterConfiguration.json</sourcePath>
+ <sourcePath>src/main/jsonschema/org/apache/streams/hdfs/HdfsReaderConfiguration.json</sourcePath>
</sourcePaths>
<outputDirectory>target/generated-sources/jsonschema2pojo</outputDirectory>
<targetPackage>org.apache.streams.hdfs.pojo</targetPackage>
Modified: incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java?rev=1573789&r1=1573788&r2=1573789&view=diff
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java (original)
+++ incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java Mon Mar 3 22:36:59 2014
@@ -47,7 +47,7 @@ public class WebHdfsPersistWriter implem
private OutputStreamWriter currentWriter = null;
private static final int BYTES_IN_MB = 1024*1024;
- private static final int BYTES_BEFORE_FLUSH = 5 * BYTES_IN_MB;
+ private static final int BYTES_BEFORE_FLUSH = 64 * BYTES_IN_MB;
private volatile int totalByteCount = 0;
private volatile int byteCount = 0;
Modified: incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-moreover/pom.xml
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-moreover/pom.xml?rev=1573789&r1=1573788&r2=1573789&view=diff
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-moreover/pom.xml (original)
+++ incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-moreover/pom.xml Mon Mar 3 22:36:59 2014
@@ -43,6 +43,7 @@
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
+ <scope>provided</scope>
</dependency>
<dependency>
<groupId>commons-io</groupId>
Modified: incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverClient.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverClient.java?rev=1573789&r1=1573788&r2=1573789&view=diff
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverClient.java (original)
+++ incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverClient.java Mon Mar 3 22:36:59 2014
@@ -28,14 +28,15 @@ public class MoreoverClient {
public long pullTime;
private boolean debug;
- public MoreoverClient(String id, String apiKey) {
- logger.info("Constructed new client for id:{} key:{}", id, apiKey);
+ public MoreoverClient(String id, String apiKey, String sequence) {
+ logger.info("Constructed new client for id:{} key:{} sequence:{}", id, apiKey, sequence);
this.id = id;
this.apiKey = apiKey;
+ this.lastSequenceId = new BigInteger(sequence);
}
public MoreoverResult getArticlesAfter(String sequenceId, int limit) throws IOException {
- String urlString = String.format(BASE_URL, this.apiKey, limit, (sequenceId == null ? 0 : sequenceId));
+ String urlString = String.format(BASE_URL, this.apiKey, limit, sequenceId);
logger.debug("Making call to {}", urlString);
long start = System.nanoTime();
MoreoverResult result = new MoreoverResult(id, getArticles(new URL(urlString)), start, System.nanoTime());
Modified: incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverConfigurator.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverConfigurator.java?rev=1573789&r1=1573788&r2=1573789&view=diff
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverConfigurator.java (original)
+++ incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverConfigurator.java Mon Mar 3 22:36:59 2014
@@ -22,10 +22,16 @@ public class MoreoverConfigurator {
List<MoreoverKeyData> apiKeys = Lists.newArrayList();
- for( String key : moreover.getStringList("apiKeys")) {
- apiKeys.add(new MoreoverKeyData().withId(key).withKey(key));
- // TODO: implement starting sequence
- }
+ Config apiKeysConfig = moreover.getConfig("apiKeys");
+
+ if( !apiKeysConfig.isEmpty())
+ for( String apiKeyId : apiKeysConfig.root().keySet() ) {
+ Config apiKeyConfig = apiKeysConfig.getConfig(apiKeyId);
+ apiKeys.add(new MoreoverKeyData()
+ .withId(apiKeyConfig.getString("key"))
+ .withKey(apiKeyConfig.getString("key"))
+ .withStartingSequence(apiKeyConfig.getString("startingSequence")));
+ }
moreoverConfiguration.setApiKeys(apiKeys);
return moreoverConfiguration;
Modified: incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverProvider.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverProvider.java?rev=1573789&r1=1573788&r2=1573789&view=diff
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverProvider.java (original)
+++ incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverProvider.java Mon Mar 3 22:36:59 2014
@@ -50,17 +50,14 @@ public class MoreoverProvider implements
@Override
public synchronized StreamsResultSet readCurrent() {
- LOGGER.debug("readCurrent");
- LOGGER.info("Providing {} docs", providerQueue.size());
+ LOGGER.debug("readCurrent: {}", providerQueue.size());
Collection<StreamsDatum> currentIterator = Lists.newArrayList();
Iterators.addAll(currentIterator, providerQueue.iterator());
StreamsResultSet current = new StreamsResultSet(Queues.newConcurrentLinkedQueue(currentIterator));
- LOGGER.info("Exiting");
-
providerQueue.clear();
return current;
Modified: incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverProviderTask.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverProviderTask.java?rev=1573789&r1=1573788&r2=1573789&view=diff
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverProviderTask.java (original)
+++ incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverProviderTask.java Mon Mar 3 22:36:59 2014
@@ -7,6 +7,7 @@ import org.apache.streams.core.StreamsRe
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.math.BigInteger;
import java.util.Queue;
/**
@@ -18,7 +19,7 @@ public class MoreoverProviderTask implem
public static final int REQUIRED_LATENCY = LATENCY * 1000;
private static Logger logger = LoggerFactory.getLogger(MoreoverProviderTask.class);
- private final String lastSequence;
+ private String lastSequence;
private final String apiKey;
private final String apiId;
private final Queue<StreamsDatum> results;
@@ -31,7 +32,7 @@ public class MoreoverProviderTask implem
this.apiKey = apiKey;
this.results = results;
this.lastSequence = lastSequence;
- this.moClient = new MoreoverClient(this.apiId, this.apiKey);
+ this.moClient = new MoreoverClient(this.apiId, this.apiKey, this.lastSequence);
initializeClient(moClient);
}
@@ -40,12 +41,13 @@ public class MoreoverProviderTask implem
while(true) {
try {
ensureTime(moClient);
- MoreoverResult result = started ? moClient.getNextBatch() : moClient.getArticlesAfter(lastSequence, 500);
+ MoreoverResult result = moClient.getArticlesAfter(lastSequence, 500);
started = true;
- result.process();
+ lastSequence = result.process().toString();
for(StreamsDatum entry : ImmutableSet.copyOf(result.iterator()))
results.offer(entry);
- logger.info("ApiKey={}\tlastSequenceid={}", this.apiKey, result.getMaxSequencedId());
+ logger.info("ApiKey={}\tlastSequenceid={}", this.apiKey, lastSequence);
+
} catch (Exception e) {
logger.error("Exception while polling moreover", e);
}
@@ -64,7 +66,7 @@ public class MoreoverProviderTask implem
private void initializeClient(MoreoverClient moClient) {
try {
- moClient.getArticlesAfter("0", 2);
+ moClient.getArticlesAfter(this.lastSequence, 2);
} catch (Exception e) {
logger.error("Failed to start stream, {}", this.apiKey);
logger.error("Exception : ", e);
Modified: incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverResult.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverResult.java?rev=1573789&r1=1573788&r2=1573789&view=diff
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverResult.java (original)
+++ incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverResult.java Mon Mar 3 22:36:59 2014
@@ -108,7 +108,7 @@ public class MoreoverResult implements I
return end;
}
- public void process() {
+ public BigInteger process() {
try {
this.resultObject = xmlMapper.readValue(xmlString, ArticlesResponse.class);
@@ -124,11 +124,10 @@ public class MoreoverResult implements I
logger.trace("Prior max sequence Id {} current candidate {}", this.maxSequencedId, sequenceid);
if (sequenceid.compareTo(this.maxSequencedId) > 0) {
this.maxSequencedId = sequenceid;
- logger.debug("New max sequence Id {}", this.maxSequencedId);
}
-
}
+ return this.maxSequencedId;
}
public String getXmlString() {
Modified: incubator/streams/branches/STREAMS-26/streams-pojo/pom.xml
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-pojo/pom.xml?rev=1573789&r1=1573788&r2=1573789&view=diff
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-pojo/pom.xml (original)
+++ incubator/streams/branches/STREAMS-26/streams-pojo/pom.xml Mon Mar 3 22:36:59 2014
@@ -90,13 +90,11 @@
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
- <version>1.6.1</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
- <version>1.0.9</version>
</dependency>
</dependencies>