You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/03/22 00:20:29 UTC
[51/71] [abbrv] git commit: updates to ES 1.0.1,
perpetual read from hdfs
updates to ES 1.0.1, perpetual read from hdfs
git-svn-id: https://svn.apache.org/repos/asf/incubator/streams/branches/STREAMS-26@1573789 13f79535-47bb-0310-9956-ffa450edef68
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/63a85a83
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/63a85a83
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/63a85a83
Branch: refs/heads/master
Commit: 63a85a83f5904f4362ebb91f659cbd6695a60f72
Parents: e6f3d4e
Author: sblackmon <sb...@unknown>
Authored: Mon Mar 3 22:36:59 2014 +0000
Committer: sblackmon <sb...@unknown>
Committed: Mon Mar 3 22:36:59 2014 +0000
----------------------------------------------------------------------
pom.xml | 9 +++++++++
streams-contrib/streams-persist-elasticsearch/pom.xml | 2 +-
.../elasticsearch/ElasticsearchPersistUpdater.java | 3 +++
streams-contrib/streams-persist-hdfs/pom.xml | 1 +
.../org/apache/streams/hdfs/WebHdfsPersistWriter.java | 2 +-
streams-contrib/streams-provider-moreover/pom.xml | 1 +
.../apache/streams/data/moreover/MoreoverClient.java | 7 ++++---
.../streams/data/moreover/MoreoverConfigurator.java | 14 ++++++++++----
.../streams/data/moreover/MoreoverProvider.java | 5 +----
.../streams/data/moreover/MoreoverProviderTask.java | 14 ++++++++------
.../apache/streams/data/moreover/MoreoverResult.java | 5 ++---
streams-pojo/pom.xml | 2 --
12 files changed, 41 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/63a85a83/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 366ae94..77d5371 100644
--- a/pom.xml
+++ b/pom.xml
@@ -155,6 +155,10 @@
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </dependency>
+ <dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</dependency>
@@ -197,6 +201,11 @@
<version>${slf4j.version}</version>
</dependency>
<dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <version>${slf4j.version}</version>
+ </dependency>
+ <dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>${logback.version}</version>
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/63a85a83/streams-contrib/streams-persist-elasticsearch/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/pom.xml b/streams-contrib/streams-persist-elasticsearch/pom.xml
index ee11944..b487e58 100644
--- a/streams-contrib/streams-persist-elasticsearch/pom.xml
+++ b/streams-contrib/streams-persist-elasticsearch/pom.xml
@@ -35,7 +35,7 @@
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
- <version>1.0.0</version>
+ <version>1.0.1</version>
<scope>compile</scope>
<type>jar</type>
</dependency>
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/63a85a83/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
index ef6c28d..b55ad2c 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
@@ -353,6 +353,9 @@ public class ElasticsearchPersistUpdater extends ElasticsearchPersistWriter impl
{
UpdateRequest updateRequest;
+ Preconditions.checkNotNull(id);
+ Preconditions.checkNotNull(json);
+
// They didn't specify an ID, so we will create one for them.
updateRequest = new UpdateRequest()
.index(indexName)
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/63a85a83/streams-contrib/streams-persist-hdfs/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/pom.xml b/streams-contrib/streams-persist-hdfs/pom.xml
index 1142976..1031205 100644
--- a/streams-contrib/streams-persist-hdfs/pom.xml
+++ b/streams-contrib/streams-persist-hdfs/pom.xml
@@ -84,6 +84,7 @@
<sourcePaths>
<sourcePath>src/main/jsonschema/org/apache/streams/hdfs/HdfsConfiguration.json</sourcePath>
<sourcePath>src/main/jsonschema/org/apache/streams/hdfs/HdfsWriterConfiguration.json</sourcePath>
+ <sourcePath>src/main/jsonschema/org/apache/streams/hdfs/HdfsReaderConfiguration.json</sourcePath>
</sourcePaths>
<outputDirectory>target/generated-sources/jsonschema2pojo</outputDirectory>
<targetPackage>org.apache.streams.hdfs.pojo</targetPackage>
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/63a85a83/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
index 0fc8407..55c55b3 100644
--- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
+++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
@@ -47,7 +47,7 @@ public class WebHdfsPersistWriter implements StreamsPersistWriter, Flushable, Cl
private OutputStreamWriter currentWriter = null;
private static final int BYTES_IN_MB = 1024*1024;
- private static final int BYTES_BEFORE_FLUSH = 5 * BYTES_IN_MB;
+ private static final int BYTES_BEFORE_FLUSH = 64 * BYTES_IN_MB;
private volatile int totalByteCount = 0;
private volatile int byteCount = 0;
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/63a85a83/streams-contrib/streams-provider-moreover/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-moreover/pom.xml b/streams-contrib/streams-provider-moreover/pom.xml
index 4988715..521a865 100644
--- a/streams-contrib/streams-provider-moreover/pom.xml
+++ b/streams-contrib/streams-provider-moreover/pom.xml
@@ -43,6 +43,7 @@
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
+ <scope>provided</scope>
</dependency>
<dependency>
<groupId>commons-io</groupId>
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/63a85a83/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverClient.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverClient.java b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverClient.java
index 2e63d9b..a43c4d8 100644
--- a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverClient.java
+++ b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverClient.java
@@ -28,14 +28,15 @@ public class MoreoverClient {
public long pullTime;
private boolean debug;
- public MoreoverClient(String id, String apiKey) {
- logger.info("Constructed new client for id:{} key:{}", id, apiKey);
+ public MoreoverClient(String id, String apiKey, String sequence) {
+ logger.info("Constructed new client for id:{} key:{} sequence:{}", id, apiKey, sequence);
this.id = id;
this.apiKey = apiKey;
+ this.lastSequenceId = new BigInteger(sequence);
}
public MoreoverResult getArticlesAfter(String sequenceId, int limit) throws IOException {
- String urlString = String.format(BASE_URL, this.apiKey, limit, (sequenceId == null ? 0 : sequenceId));
+ String urlString = String.format(BASE_URL, this.apiKey, limit, sequenceId);
logger.debug("Making call to {}", urlString);
long start = System.nanoTime();
MoreoverResult result = new MoreoverResult(id, getArticles(new URL(urlString)), start, System.nanoTime());
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/63a85a83/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverConfigurator.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverConfigurator.java b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverConfigurator.java
index d2589e0..c7e8c17 100644
--- a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverConfigurator.java
+++ b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverConfigurator.java
@@ -22,10 +22,16 @@ public class MoreoverConfigurator {
List<MoreoverKeyData> apiKeys = Lists.newArrayList();
- for( String key : moreover.getStringList("apiKeys")) {
- apiKeys.add(new MoreoverKeyData().withId(key).withKey(key));
- // TODO: implement starting sequence
- }
+ Config apiKeysConfig = moreover.getConfig("apiKeys");
+
+ if( !apiKeysConfig.isEmpty())
+ for( String apiKeyId : apiKeysConfig.root().keySet() ) {
+ Config apiKeyConfig = apiKeysConfig.getConfig(apiKeyId);
+ apiKeys.add(new MoreoverKeyData()
+ .withId(apiKeyConfig.getString("key"))
+ .withKey(apiKeyConfig.getString("key"))
+ .withStartingSequence(apiKeyConfig.getString("startingSequence")));
+ }
moreoverConfiguration.setApiKeys(apiKeys);
return moreoverConfiguration;
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/63a85a83/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverProvider.java b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverProvider.java
index 5fd83eb..85ef7a5 100644
--- a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverProvider.java
+++ b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverProvider.java
@@ -50,17 +50,14 @@ public class MoreoverProvider implements StreamsProvider {
@Override
public synchronized StreamsResultSet readCurrent() {
- LOGGER.debug("readCurrent");
- LOGGER.info("Providing {} docs", providerQueue.size());
+ LOGGER.debug("readCurrent: {}", providerQueue.size());
Collection<StreamsDatum> currentIterator = Lists.newArrayList();
Iterators.addAll(currentIterator, providerQueue.iterator());
StreamsResultSet current = new StreamsResultSet(Queues.newConcurrentLinkedQueue(currentIterator));
- LOGGER.info("Exiting");
-
providerQueue.clear();
return current;
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/63a85a83/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverProviderTask.java b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverProviderTask.java
index 5fa0298..4908ea5 100644
--- a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverProviderTask.java
+++ b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverProviderTask.java
@@ -7,6 +7,7 @@ import org.apache.streams.core.StreamsResultSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.math.BigInteger;
import java.util.Queue;
/**
@@ -18,7 +19,7 @@ public class MoreoverProviderTask implements Runnable {
public static final int REQUIRED_LATENCY = LATENCY * 1000;
private static Logger logger = LoggerFactory.getLogger(MoreoverProviderTask.class);
- private final String lastSequence;
+ private String lastSequence;
private final String apiKey;
private final String apiId;
private final Queue<StreamsDatum> results;
@@ -31,7 +32,7 @@ public class MoreoverProviderTask implements Runnable {
this.apiKey = apiKey;
this.results = results;
this.lastSequence = lastSequence;
- this.moClient = new MoreoverClient(this.apiId, this.apiKey);
+ this.moClient = new MoreoverClient(this.apiId, this.apiKey, this.lastSequence);
initializeClient(moClient);
}
@@ -40,12 +41,13 @@ public class MoreoverProviderTask implements Runnable {
while(true) {
try {
ensureTime(moClient);
- MoreoverResult result = started ? moClient.getNextBatch() : moClient.getArticlesAfter(lastSequence, 500);
+ MoreoverResult result = moClient.getArticlesAfter(lastSequence, 500);
started = true;
- result.process();
+ lastSequence = result.process().toString();
for(StreamsDatum entry : ImmutableSet.copyOf(result.iterator()))
results.offer(entry);
- logger.info("ApiKey={}\tlastSequenceid={}", this.apiKey, result.getMaxSequencedId());
+ logger.info("ApiKey={}\tlastSequenceid={}", this.apiKey, lastSequence);
+
} catch (Exception e) {
logger.error("Exception while polling moreover", e);
}
@@ -64,7 +66,7 @@ public class MoreoverProviderTask implements Runnable {
private void initializeClient(MoreoverClient moClient) {
try {
- moClient.getArticlesAfter("0", 2);
+ moClient.getArticlesAfter(this.lastSequence, 2);
} catch (Exception e) {
logger.error("Failed to start stream, {}", this.apiKey);
logger.error("Exception : ", e);
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/63a85a83/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverResult.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverResult.java b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverResult.java
index ed5f305..50e3f66 100644
--- a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverResult.java
+++ b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverResult.java
@@ -108,7 +108,7 @@ public class MoreoverResult implements Iterable<StreamsDatum> {
return end;
}
- public void process() {
+ public BigInteger process() {
try {
this.resultObject = xmlMapper.readValue(xmlString, ArticlesResponse.class);
@@ -124,11 +124,10 @@ public class MoreoverResult implements Iterable<StreamsDatum> {
logger.trace("Prior max sequence Id {} current candidate {}", this.maxSequencedId, sequenceid);
if (sequenceid.compareTo(this.maxSequencedId) > 0) {
this.maxSequencedId = sequenceid;
- logger.debug("New max sequence Id {}", this.maxSequencedId);
}
-
}
+ return this.maxSequencedId;
}
public String getXmlString() {
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/63a85a83/streams-pojo/pom.xml
----------------------------------------------------------------------
diff --git a/streams-pojo/pom.xml b/streams-pojo/pom.xml
index dc75aba..72f8418 100644
--- a/streams-pojo/pom.xml
+++ b/streams-pojo/pom.xml
@@ -90,13 +90,11 @@
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
- <version>1.6.1</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
- <version>1.0.9</version>
</dependency>
</dependencies>