You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/03/19 20:00:29 UTC
[1/2] git commit: Cleaning up contrib modules Preparing to merge
STREAMS-26
Repository: incubator-streams
Updated Branches:
refs/heads/STREAMS-26 34990168b -> 3a192885a
Cleaning up contrib modules
Preparing to merge STREAMS-26
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/f9165129
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/f9165129
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/f9165129
Branch: refs/heads/STREAMS-26
Commit: f916512905fc911ee7bc41fe0cef7478dc0cdefb
Parents: 3499016
Author: sblackmon <sb...@w2odigital.com>
Authored: Wed Mar 19 13:36:33 2014 -0500
Committer: sblackmon <sb...@w2odigital.com>
Committed: Wed Mar 19 13:36:33 2014 -0500
----------------------------------------------------------------------
streams-contrib/pom.xml | 11 ++++----
.../streams/hdfs/WebHdfsPersistReader.java | 27 +++++++++++++-------
.../streams/hdfs/WebHdfsPersistReaderTask.java | 4 +--
.../streams/rss/provider/RssStreamProvider.java | 2 +-
.../streams/rss/test/Top100FeedsTest.java | 4 ++-
.../streams-provider-rss.iml | 16 +++---------
.../apache/streams/sysomos/SysomosProvider.java | 20 +++++++--------
7 files changed, 43 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f9165129/streams-contrib/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/pom.xml b/streams-contrib/pom.xml
index 0cd5db7..37ee3b7 100644
--- a/streams-contrib/pom.xml
+++ b/streams-contrib/pom.xml
@@ -41,14 +41,13 @@
<module>streams-persist-hdfs</module>
<module>streams-persist-kafka</module>
<module>streams-persist-mongo</module>
- <!--<module>streams-provider-datasift</module>-->
- <!--<module>streams-provider-facebook</module>-->
- <!--<module>streams-provider-gnip</module>-->
+ <module>streams-provider-datasift</module>
+ <module>streams-provider-facebook</module>
+ <module>streams-provider-gnip</module>
<module>streams-provider-moreover</module>
<module>streams-provider-twitter</module>
- <!--<module>streams-provider-sysomos</module>-->
- <!--<module>streams-provider-rss</module>-->
- <!--<module>streams-proxy-semantria</module>-->
+ <module>streams-provider-sysomos</module>
+ <module>streams-provider-rss</module>
<module>streams-components-test</module>
</modules>
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f9165129/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
index 659c517..511f684 100644
--- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
+++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
@@ -170,26 +170,35 @@ public class WebHdfsPersistReader implements StreamsPersistReader {
private void readSourceWritePersistQueue() {
for( FileStatus fileStatus : status ) {
BufferedReader reader;
-
- if( fileStatus.isFile() && !fileStatus.getPath().getName().endsWith("_SUCCESS")) {
+ LOGGER.info("Found " + fileStatus.getPath().getName());
+ if( persistQueue.size() > 0 ) {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ }
+ }
+ if( fileStatus.isFile() && !fileStatus.getPath().getName().startsWith("_")) {
+ LOGGER.info("Processing " + fileStatus.getPath().getName());
try {
reader = new BufferedReader(new InputStreamReader(client.open(fileStatus.getPath())));
- String line;
+ String line = "";
do{
try {
line = reader.readLine();
- if( line != null ) {
+ if( !Strings.isNullOrEmpty(line) ) {
String[] fields = line.split(Character.toString(DELIMITER));
- persistQueue.offer(new StreamsDatum(fields[3]));
+ StreamsDatum entry = new StreamsDatum(fields[3], fields[0], new DateTime(fields[2]));
+ persistQueue.offer(entry);
}
- } catch (IOException e) {
- break;
+ } catch (Exception e) {
+ e.printStackTrace();
+ LOGGER.warn(e.getMessage());
}
} while( line != null );
- } catch (IOException e) {
+ } catch (Exception e) {
e.printStackTrace();
- break;
+ LOGGER.warn(e.getMessage());
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f9165129/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
index 6cd1e79..f0bee1f 100644
--- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
+++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
@@ -29,7 +29,7 @@ public class WebHdfsPersistReaderTask implements Runnable {
for( FileStatus fileStatus : reader.status ) {
BufferedReader bufferedReader;
- if( fileStatus.isFile() && !fileStatus.getPath().getName().endsWith("_SUCCESS")) {
+ if( fileStatus.isFile() && !fileStatus.getPath().getName().startsWith("_")) {
try {
bufferedReader = new BufferedReader(new InputStreamReader(reader.client.open(fileStatus.getPath())));
@@ -45,7 +45,7 @@ public class WebHdfsPersistReaderTask implements Runnable {
reader.persistQueue.offer(entry);
}
} catch (Exception e) {
- LOGGER.warn("Failed processing " + line);
+ LOGGER.warn("Failed reading " + line);
}
} while( line != null );
} catch (IOException e) {
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f9165129/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamProvider.java b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamProvider.java
index 449f187..c4eee04 100644
--- a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamProvider.java
+++ b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamProvider.java
@@ -31,7 +31,7 @@ import java.util.concurrent.*;
/**
* Created by sblackmon on 12/10/13.
*/
-public class RssStreamProvider implements StreamsProvider, Serializable {
+public class RssStreamProvider implements StreamsProvider {
private final static Logger LOGGER = LoggerFactory.getLogger(RssStreamProvider.class);
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f9165129/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/test/Top100FeedsTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/test/Top100FeedsTest.java b/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/test/Top100FeedsTest.java
index 0c17979..1277553 100644
--- a/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/test/Top100FeedsTest.java
+++ b/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/test/Top100FeedsTest.java
@@ -2,6 +2,7 @@ package org.apache.streams.rss.test;
import com.google.common.collect.Lists;
import org.apache.commons.lang.StringUtils;
+import org.apache.streams.core.tasks.StreamsProviderTask;
import org.apache.streams.pojo.json.Activity;
import org.apache.streams.rss.FeedDetails;
import org.apache.streams.rss.RssStreamConfiguration;
@@ -53,7 +54,8 @@ public class Top100FeedsTest{
configuration.setFeeds(feeds);
RssStreamProvider provider = new RssStreamProvider(configuration, Activity.class);
- provider.start();
+ provider.prepare(configuration);
+ provider.startStream();
try {
Thread.sleep(10000);
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f9165129/streams-contrib/streams-provider-rss/streams-provider-rss.iml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-rss/streams-provider-rss.iml b/streams-contrib/streams-provider-rss/streams-provider-rss.iml
index 2846a74..032b03a 100644
--- a/streams-contrib/streams-provider-rss/streams-provider-rss.iml
+++ b/streams-contrib/streams-provider-rss/streams-provider-rss.iml
@@ -1,20 +1,13 @@
<?xml version="1.0" encoding="UTF-8"?>
-<module org.jetbrains.idea.maven.project.MavenProjectsManager.isMavenModule="true" type="JAVA_MODULE" version="4">
+<module type="JAVA_MODULE" version="4">
<component name="NewModuleRootManager" LANGUAGE_LEVEL="JDK_1_6" inherit-compiler-output="false">
<output url="file://$MODULE_DIR$/target/classes" />
<output-test url="file://$MODULE_DIR$/target/test-classes" />
<content url="file://$MODULE_DIR$">
<sourceFolder url="file://$MODULE_DIR$/src/main/java" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/src/test/java" isTestSource="true" />
- <sourceFolder url="file://$MODULE_DIR$/src/main/resources" type="java-resource" />
<sourceFolder url="file://$MODULE_DIR$/src/test/resources" type="java-test-resource" />
- <sourceFolder url="file://$MODULE_DIR$/target/generated-sources/jsonschema2pojo" isTestSource="false" generated="true" />
- <excludeFolder url="file://$MODULE_DIR$/target/classes" />
- <excludeFolder url="file://$MODULE_DIR$/target/maven-archiver" />
- <excludeFolder url="file://$MODULE_DIR$/target/maven-shared-archive-resources" />
- <excludeFolder url="file://$MODULE_DIR$/target/maven-status" />
- <excludeFolder url="file://$MODULE_DIR$/target/surefire-reports" />
- <excludeFolder url="file://$MODULE_DIR$/target/test-classes" />
+ <excludeFolder url="file://$MODULE_DIR$/target" />
</content>
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
@@ -35,9 +28,6 @@
<orderEntry type="library" scope="TEST" name="Maven: junit:junit:4.11" level="project" />
<orderEntry type="library" name="Maven: org.hamcrest:hamcrest-core:1.3" level="project" />
<orderEntry type="module" module-name="streams-core" />
- <orderEntry type="module" module-name="streams-util" />
- <orderEntry type="library" name="Maven: org.apache.commons:commons-lang3:3.1" level="project" />
- <orderEntry type="library" name="Maven: com.google.guava:guava:16.0.1" level="project" />
<orderEntry type="library" name="Maven: ch.qos.logback:logback-classic:1.0.9" level="project" />
<orderEntry type="library" name="Maven: ch.qos.logback:logback-core:1.0.9" level="project" />
<orderEntry type="module" module-name="streams-pojo" />
@@ -57,6 +47,8 @@
<orderEntry type="library" name="Maven: javax.xml.bind:jsr173_api:1.0" level="project" />
<orderEntry type="library" name="Maven: commons-io:commons-io:2.4" level="project" />
<orderEntry type="module" module-name="streams-config" />
+ <orderEntry type="library" name="Maven: com.google.guava:guava:15.0" level="project" />
+ <orderEntry type="library" name="Maven: com.google.collections:google-collections:1.0" level="project" />
<orderEntry type="library" name="Maven: com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.2.1" level="project" />
<orderEntry type="library" name="Maven: com.jayway.jsonpath:json-path:0.9.0" level="project" />
<orderEntry type="library" name="Maven: net.minidev:json-smart:1.2" level="project" />
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f9165129/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/SysomosProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/SysomosProvider.java b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/SysomosProvider.java
index 29c0e60..4a5e3ba 100644
--- a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/SysomosProvider.java
+++ b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/SysomosProvider.java
@@ -64,7 +64,7 @@ public class SysomosProvider implements StreamsProvider {
ScheduledExecutorService service;
@Override
- public void start() {
+ public void startStream() {
LOGGER.trace("Starting Producer");
if(!started) {
LOGGER.trace("Producer not started. Initializing");
@@ -80,27 +80,27 @@ public class SysomosProvider implements StreamsProvider {
}
@Override
- public void stop() {
- started = false;
+ public StreamsResultSet readCurrent() {
+ return null;
}
@Override
- public Queue<StreamsDatum> getProviderQueue() {
- return providerQueue;
+ public StreamsResultSet readNew(BigInteger bigInteger) {
+ return null;
}
@Override
- public StreamsResultSet readCurrent() {
+ public StreamsResultSet readRange(DateTime dateTime, DateTime dateTime2) {
return null;
}
@Override
- public StreamsResultSet readNew(BigInteger bigInteger) {
- return null;
+ public void prepare(Object configurationObject) {
+
}
@Override
- public StreamsResultSet readRange(DateTime dateTime, DateTime dateTime2) {
- return null;
+ public void cleanUp() {
+
}
}
[2/2] git commit: Fixed long-running test
Posted by sb...@apache.org.
Fixed long-running test
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/3a192885
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/3a192885
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/3a192885
Branch: refs/heads/STREAMS-26
Commit: 3a192885a7dc13c07dabb915a7223ca748fc1e59
Parents: f916512
Author: sblackmon <sb...@w2odigital.com>
Authored: Wed Mar 19 14:00:19 2014 -0500
Committer: sblackmon <sb...@w2odigital.com>
Committed: Wed Mar 19 14:00:19 2014 -0500
----------------------------------------------------------------------
provision/provision.iml | 4 ++--
.../streams-components-test/streams-components-test.iml | 12 +++++-------
.../streams/core/builders/LocalStreamBuilder.java | 2 +-
.../streams/core/builders/LocalStreamBuilderTest.java | 10 +++++-----
4 files changed, 13 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/3a192885/provision/provision.iml
----------------------------------------------------------------------
diff --git a/provision/provision.iml b/provision/provision.iml
index fb87ba5..b1475a5 100644
--- a/provision/provision.iml
+++ b/provision/provision.iml
@@ -1,8 +1,8 @@
<?xml version="1.0" encoding="UTF-8"?>
<module org.jetbrains.idea.maven.project.MavenProjectsManager.isMavenModule="true" type="JAVA_MODULE" version="4">
<component name="NewModuleRootManager" LANGUAGE_LEVEL="JDK_1_6" inherit-compiler-output="false">
- <output url="file://$MODULE_DIR$/target/classes" />
- <output-test url="file://$MODULE_DIR$/target/test-classes" />
+ <output url="file://$MAVEN_REPOSITORY$/org/apache/streams/streams-master/0.2-incubating-SNAPSHOT/target/classes" />
+ <output-test url="file://$MAVEN_REPOSITORY$/org/apache/streams/streams-master/0.2-incubating-SNAPSHOT/target/test-classes" />
<content url="file://$MODULE_DIR$">
<excludeFolder url="file://$MODULE_DIR$/target" />
</content>
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/3a192885/streams-contrib/streams-components-test/streams-components-test.iml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-components-test/streams-components-test.iml b/streams-contrib/streams-components-test/streams-components-test.iml
index 0fcef34..7bc994e 100644
--- a/streams-contrib/streams-components-test/streams-components-test.iml
+++ b/streams-contrib/streams-components-test/streams-components-test.iml
@@ -1,26 +1,24 @@
<?xml version="1.0" encoding="UTF-8"?>
<module org.jetbrains.idea.maven.project.MavenProjectsManager.isMavenModule="true" type="JAVA_MODULE" version="4">
<component name="NewModuleRootManager" LANGUAGE_LEVEL="JDK_1_6" inherit-compiler-output="false">
- <output url="file://$MODULE_DIR$/target/classes" />
- <output-test url="file://$MODULE_DIR$/target/test-classes" />
+ <output url="file://$MAVEN_REPOSITORY$/org/apache/streams/streams-master/0.2-incubating-SNAPSHOT/target/classes" />
+ <output-test url="file://$MAVEN_REPOSITORY$/org/apache/streams/streams-master/0.2-incubating-SNAPSHOT/target/test-classes" />
<content url="file://$MODULE_DIR$">
- <sourceFolder url="file://$MODULE_DIR$/src/main/java" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/src/test/java" isTestSource="true" />
- <sourceFolder url="file://$MODULE_DIR$/src/main/resources" type="java-resource" />
<sourceFolder url="file://$MODULE_DIR$/src/test/resources" type="java-test-resource" />
<excludeFolder url="file://$MODULE_DIR$/target" />
</content>
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
- <orderEntry type="module" module-name="streams-core (3)" />
+ <orderEntry type="module" module-name="streams-core" />
<orderEntry type="library" name="Maven: joda-time:joda-time:2.2" level="project" />
- <orderEntry type="module" module-name="streams-util (3)" />
+ <orderEntry type="module" module-name="streams-util" />
<orderEntry type="library" name="Maven: org.apache.commons:commons-lang3:3.1" level="project" />
<orderEntry type="library" name="Maven: com.google.guava:guava:16.0.1" level="project" />
<orderEntry type="library" name="Maven: org.slf4j:slf4j-api:1.7.6" level="project" />
<orderEntry type="library" name="Maven: ch.qos.logback:logback-classic:1.0.9" level="project" />
<orderEntry type="library" name="Maven: ch.qos.logback:logback-core:1.0.9" level="project" />
- <orderEntry type="module" module-name="streams-pojo (3)" />
+ <orderEntry type="module" module-name="streams-pojo" />
<orderEntry type="library" name="Maven: org.jsonschema2pojo:jsonschema2pojo-core:0.4.0" level="project" />
<orderEntry type="library" name="Maven: com.fasterxml.jackson.core:jackson-databind:2.2.1" level="project" />
<orderEntry type="library" name="Maven: com.fasterxml.jackson.core:jackson-annotations:2.2.1" level="project" />
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/3a192885/streams-core/src/main/java/org/apache/streams/core/builders/LocalStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streams-core/src/main/java/org/apache/streams/core/builders/LocalStreamBuilder.java b/streams-core/src/main/java/org/apache/streams/core/builders/LocalStreamBuilder.java
index 7744be7..3e99827 100644
--- a/streams-core/src/main/java/org/apache/streams/core/builders/LocalStreamBuilder.java
+++ b/streams-core/src/main/java/org/apache/streams/core/builders/LocalStreamBuilder.java
@@ -152,7 +152,7 @@ public class LocalStreamBuilder implements StreamBuilder{
isRunning = isRunning || task.isRunning();
}
if(isRunning) {
- Thread.sleep(100000);
+ Thread.sleep(3000);
}
}
this.executor.shutdown();
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/3a192885/streams-core/src/test/java/org/apache/streams/core/builders/LocalStreamBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams-core/src/test/java/org/apache/streams/core/builders/LocalStreamBuilderTest.java b/streams-core/src/test/java/org/apache/streams/core/builders/LocalStreamBuilderTest.java
index cc10938..9177459 100644
--- a/streams-core/src/test/java/org/apache/streams/core/builders/LocalStreamBuilderTest.java
+++ b/streams-core/src/test/java/org/apache/streams/core/builders/LocalStreamBuilderTest.java
@@ -75,7 +75,7 @@ public class LocalStreamBuilderTest {
@Test
public void testBasicLinearStream2() {
- int numDatums = 1000;
+ int numDatums = 100;
StreamBuilder builder = new LocalStreamBuilder();
PassthroughDatumCounterProcessor processor = new PassthroughDatumCounterProcessor();
SystemOutWriter writer = new SystemOutWriter();
@@ -94,8 +94,8 @@ public class LocalStreamBuilderTest {
@Test
public void testParallelLinearStream1() {
- int numDatums = 10000;
- int parallelHint = 40;
+ int numDatums = 1000;
+ int parallelHint = 20;
PassthroughDatumCounterProcessor.sawData = new HashSet<Integer>();
PassthroughDatumCounterProcessor.claimedNumber = new HashSet<Integer>();
StreamBuilder builder = new LocalStreamBuilder();
@@ -119,7 +119,7 @@ public class LocalStreamBuilderTest {
@Test
public void testBasicMergeStream() {
int numDatums1 = 1;
- int numDatums2 = 1000;
+ int numDatums2 = 100;
PassthroughDatumCounterProcessor processor1 = new PassthroughDatumCounterProcessor();
PassthroughDatumCounterProcessor processor2 = new PassthroughDatumCounterProcessor();
SystemOutWriter writer = new SystemOutWriter();
@@ -141,7 +141,7 @@ public class LocalStreamBuilderTest {
@Test
public void testBasicBranch() {
- int numDatums = 1000;
+ int numDatums = 100;
StreamBuilder builder = new LocalStreamBuilder();
builder.newReadCurrentStream("prov1", new NumericMessageProvider(numDatums))
.addStreamsProcessor("proc1", new PassthroughDatumCounterProcessor(), 1, "prov1")