You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2016/11/25 20:24:45 UTC
[04/42] incubator-streams git commit: STREAMS-440: custom
checkstyle.xml, address compliance
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/writer/DatumCounterWriter.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/writer/DatumCounterWriter.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/writer/DatumCounterWriter.java
index b7f777e..0c7af1e 100644
--- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/writer/DatumCounterWriter.java
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/writer/DatumCounterWriter.java
@@ -18,11 +18,16 @@
package org.apache.streams.local.test.writer;
-import com.google.common.collect.Lists;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsPersistWriter;
-import java.util.*;
+import com.google.common.collect.Lists;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
@@ -31,79 +36,79 @@ import java.util.concurrent.atomic.AtomicLong;
*/
public class DatumCounterWriter implements StreamsPersistWriter{
- @Override
- public String getId() {
- return "DatumCounterWriter";
- }
+ @Override
+ public String getId() {
+ return "DatumCounterWriter";
+ }
- /**
- * Set of all ids that have been claimed. Ensures all instances are assigned unique ids
- */
- public static Set<Integer> CLAIMED_ID = new HashSet<Integer>();
- /**
- * Random instance to generate ids
- */
- public static final Random RAND = new Random();
- /**
- * Set of instance ids that received data. Usefully for testing parrallelization is actually working.
- */
- public final static Set<Integer> SEEN_DATA = Collections.newSetFromMap(new ConcurrentHashMap<Integer, Boolean>());
- /**
- * The total count of data seen by a all instances of a processor.
- */
- public static final ConcurrentHashMap<String, AtomicLong> COUNTS = new ConcurrentHashMap<>();
- /**
- * The documents received
- */
- public static final ConcurrentHashMap<String, List<Object>> RECEIVED = new ConcurrentHashMap<>();
+ /**
+ * Set of all ids that have been claimed. Ensures all instances are assigned unique ids
+ */
+ public static Set<Integer> CLAIMED_ID = new HashSet<Integer>();
+ /**
+ * Random instance to generate ids
+ */
+ public static final Random RAND = new Random();
+ /**
+ * Set of instance ids that received data. Usefully for testing parrallelization is actually working.
+ */
+ public final static Set<Integer> SEEN_DATA = Collections.newSetFromMap(new ConcurrentHashMap<Integer, Boolean>());
+ /**
+ * The total count of data seen by a all instances of a processor.
+ */
+ public static final ConcurrentHashMap<String, AtomicLong> COUNTS = new ConcurrentHashMap<>();
+ /**
+ * The documents received
+ */
+ public static final ConcurrentHashMap<String, List<Object>> RECEIVED = new ConcurrentHashMap<>();
- private int counter = 0;
- private String writerId;
- private Integer id;
+ private int counter = 0;
+ private String writerId;
+ private Integer id;
- public DatumCounterWriter(String writerId) {
- this.writerId = writerId;
- }
+ public DatumCounterWriter(String writerId) {
+ this.writerId = writerId;
+ }
- @Override
- public void write(StreamsDatum entry) {
- ++this.counter;
- SEEN_DATA.add(this.id);
- synchronized (RECEIVED) {
- List<Object> documents = RECEIVED.get(this.writerId);
- if(documents == null) {
- List<Object> docs = Lists.newLinkedList();
- docs.add(entry.getDocument());
- RECEIVED.put(this.writerId, docs);
- } else {
- documents.add(entry.getDocument());
- }
- }
+ @Override
+ public void write(StreamsDatum entry) {
+ ++this.counter;
+ SEEN_DATA.add(this.id);
+ synchronized (RECEIVED) {
+ List<Object> documents = RECEIVED.get(this.writerId);
+ if(documents == null) {
+ List<Object> docs = Lists.newLinkedList();
+ docs.add(entry.getDocument());
+ RECEIVED.put(this.writerId, docs);
+ } else {
+ documents.add(entry.getDocument());
+ }
}
+ }
- @Override
- public void prepare(Object configurationObject) {
- synchronized (CLAIMED_ID) {
- this.id = RAND.nextInt();
- while(!CLAIMED_ID.add(this.id)) {
- this.id = RAND.nextInt();
- }
- }
+ @Override
+ public void prepare(Object configurationObject) {
+ synchronized (CLAIMED_ID) {
+ this.id = RAND.nextInt();
+ while(!CLAIMED_ID.add(this.id)) {
+ this.id = RAND.nextInt();
+ }
}
+ }
- @Override
- public void cleanUp() {
- synchronized (COUNTS) {
- AtomicLong count = COUNTS.get(this.writerId);
- if(count == null) {
- COUNTS.put(this.writerId, new AtomicLong(this.counter));
- } else {
- count.addAndGet(this.counter);
- }
- }
+ @Override
+ public void cleanUp() {
+ synchronized (COUNTS) {
+ AtomicLong count = COUNTS.get(this.writerId);
+ if(count == null) {
+ COUNTS.put(this.writerId, new AtomicLong(this.counter));
+ } else {
+ count.addAndGet(this.counter);
+ }
}
+ }
- public int getDatumsCounted() {
- return this.counter;
- }
+ public int getDatumsCounted() {
+ return this.counter;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/writer/DoNothingWriter.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/writer/DoNothingWriter.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/writer/DoNothingWriter.java
index d9ec6d3..48f4b68 100644
--- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/writer/DoNothingWriter.java
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/writer/DoNothingWriter.java
@@ -20,6 +20,7 @@ package org.apache.streams.local.test.writer;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsPersistWriter;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -28,25 +29,25 @@ import org.slf4j.LoggerFactory;
*/
public class DoNothingWriter implements StreamsPersistWriter {
- private final static Logger LOGGER = LoggerFactory.getLogger(DoNothingWriter.class);
+ private final static Logger LOGGER = LoggerFactory.getLogger(DoNothingWriter.class);
- @Override
- public String getId() {
- return "DoNothingWriter";
- }
+ @Override
+ public String getId() {
+ return "DoNothingWriter";
+ }
- @Override
- public void write(StreamsDatum entry) {
+ @Override
+ public void write(StreamsDatum entry) {
- }
+ }
- @Override
- public void prepare(Object configurationObject) {
+ @Override
+ public void prepare(Object configurationObject) {
- }
+ }
- @Override
- public void cleanUp() {
- LOGGER.debug("Writer Clean Up!");
- }
+ @Override
+ public void cleanUp() {
+ LOGGER.debug("Writer Clean Up!");
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/writer/SystemOutWriter.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/writer/SystemOutWriter.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/writer/SystemOutWriter.java
index 76ce353..2711ae1 100644
--- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/writer/SystemOutWriter.java
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/writer/SystemOutWriter.java
@@ -20,6 +20,7 @@ package org.apache.streams.local.test.writer;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsPersistWriter;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -28,25 +29,25 @@ import org.slf4j.LoggerFactory;
*/
public class SystemOutWriter implements StreamsPersistWriter {
- private final static Logger LOGGER = LoggerFactory.getLogger(SystemOutWriter.class);
+ private final static Logger LOGGER = LoggerFactory.getLogger(SystemOutWriter.class);
- @Override
- public String getId() {
- return "SystemOutWriter";
- }
+ @Override
+ public String getId() {
+ return "SystemOutWriter";
+ }
- @Override
- public void write(StreamsDatum entry) {
- System.out.println(entry.document);
- }
+ @Override
+ public void write(StreamsDatum entry) {
+ System.out.println(entry.document);
+ }
- @Override
- public void prepare(Object configurationObject) {
+ @Override
+ public void prepare(Object configurationObject) {
- }
+ }
- @Override
- public void cleanUp() {
- LOGGER.debug("Clean up called writer!");
- }
+ @Override
+ public void cleanUp() {
+ LOGGER.debug("Clean up called writer!");
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/ExpectedDatumsPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/ExpectedDatumsPersistWriter.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/ExpectedDatumsPersistWriter.java
index 80d4a24..16b98c4 100644
--- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/ExpectedDatumsPersistWriter.java
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/ExpectedDatumsPersistWriter.java
@@ -20,58 +20,57 @@ package org.apache.streams.test.component;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsPersistWriter;
-import static org.junit.Assert.*;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.InputStream;
import java.util.LinkedList;
import java.util.List;
import java.util.Scanner;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
/**
* Created by rebanks on 2/27/14.
*/
public class ExpectedDatumsPersistWriter implements StreamsPersistWriter{
- @Override
- public String getId() {
- return "ExpectedDatumsPersistWriter";
- }
+ @Override
+ public String getId() {
+ return "ExpectedDatumsPersistWriter";
+ }
- private StreamsDatumConverter converter;
- private String fileName;
- private List<StreamsDatum> expectedDatums;
- private int counted = 0;
- private int expectedSize = 0;
+ private StreamsDatumConverter converter;
+ private String fileName;
+ private List<StreamsDatum> expectedDatums;
+ private int counted = 0;
+ private int expectedSize = 0;
- public ExpectedDatumsPersistWriter(StreamsDatumConverter converter, String filePathInResources) {
- this.converter = converter;
- this.fileName = filePathInResources;
- }
+ public ExpectedDatumsPersistWriter(StreamsDatumConverter converter, String filePathInResources) {
+ this.converter = converter;
+ this.fileName = filePathInResources;
+ }
- @Override
- public void write(StreamsDatum entry) {
- int index = this.expectedDatums.indexOf(entry);
- assertNotEquals("Datum not expected. "+entry.toString(), -1, index);
- this.expectedDatums.remove(index);
- ++this.counted;
- }
+ @Override
+ public void write(StreamsDatum entry) {
+ int index = this.expectedDatums.indexOf(entry);
+ assertNotEquals("Datum not expected. "+entry.toString(), -1, index);
+ this.expectedDatums.remove(index);
+ ++this.counted;
+ }
- @Override
- public void prepare(Object configurationObject) {
- Scanner scanner = new Scanner(ExpectedDatumsPersistWriter.class.getResourceAsStream(this.fileName));
- this.expectedDatums = new LinkedList<StreamsDatum>();
- while(scanner.hasNextLine()) {
- this.expectedDatums.add(this.converter.convert(scanner.nextLine()));
- }
- this.expectedSize = this.expectedDatums.size();
+ @Override
+ public void prepare(Object configurationObject) {
+ Scanner scanner = new Scanner(ExpectedDatumsPersistWriter.class.getResourceAsStream(this.fileName));
+ this.expectedDatums = new LinkedList<StreamsDatum>();
+ while(scanner.hasNextLine()) {
+ this.expectedDatums.add(this.converter.convert(scanner.nextLine()));
}
+ this.expectedSize = this.expectedDatums.size();
+ }
- @Override
- public void cleanUp() {
- assertEquals("Did not received the expected number of StreamsDatums", this.expectedSize, this.counted);
- }
+ @Override
+ public void cleanUp() {
+ assertEquals("Did not received the expected number of StreamsDatums", this.expectedSize, this.counted);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/FileReaderProvider.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/FileReaderProvider.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/FileReaderProvider.java
index 41e7eed..0fbfae9 100644
--- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/FileReaderProvider.java
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/FileReaderProvider.java
@@ -18,10 +18,11 @@
package org.apache.streams.test.component;
-import com.google.common.collect.Queues;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsProvider;
import org.apache.streams.core.StreamsResultSet;
+
+import com.google.common.collect.Queues;
import org.joda.time.DateTime;
import java.math.BigInteger;
@@ -37,64 +38,64 @@ import java.util.Scanner;
*/
public class FileReaderProvider implements StreamsProvider {
- private String fileName;
- private Scanner scanner;
- private StreamsDatumConverter converter;
-
- public FileReaderProvider(String filePathInResources, StreamsDatumConverter converter) {
- this.fileName = filePathInResources;
- this.converter = converter;
- }
-
- @Override
- public String getId() {
- return "FileReaderProvider";
- }
-
- @Override
- public void startStream() {
-
- }
-
- @Override
- public StreamsResultSet readCurrent() {
- return new StreamsResultSet(constructQueue(this.scanner));
- }
-
- @Override
- public StreamsResultSet readNew(BigInteger sequence) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public StreamsResultSet readRange(DateTime start, DateTime end) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public boolean isRunning() {
- return this.scanner != null && this.scanner.hasNextLine();
- }
-
- @Override
- public void prepare(Object configurationObject) {
- this.scanner = new Scanner(FileReaderProvider.class.getResourceAsStream(this.fileName));
- }
-
- @Override
- public void cleanUp() {
- if(this.scanner!= null) {
- this.scanner.close();
- this.scanner = null;
- }
+ private String fileName;
+ private Scanner scanner;
+ private StreamsDatumConverter converter;
+
+ public FileReaderProvider(String filePathInResources, StreamsDatumConverter converter) {
+ this.fileName = filePathInResources;
+ this.converter = converter;
+ }
+
+ @Override
+ public String getId() {
+ return "FileReaderProvider";
+ }
+
+ @Override
+ public void startStream() {
+
+ }
+
+ @Override
+ public StreamsResultSet readCurrent() {
+ return new StreamsResultSet(constructQueue(this.scanner));
+ }
+
+ @Override
+ public StreamsResultSet readNew(BigInteger sequence) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public StreamsResultSet readRange(DateTime start, DateTime end) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean isRunning() {
+ return this.scanner != null && this.scanner.hasNextLine();
+ }
+
+ @Override
+ public void prepare(Object configurationObject) {
+ this.scanner = new Scanner(FileReaderProvider.class.getResourceAsStream(this.fileName));
+ }
+
+ @Override
+ public void cleanUp() {
+ if(this.scanner!= null) {
+ this.scanner.close();
+ this.scanner = null;
}
+ }
- private Queue<StreamsDatum> constructQueue(Scanner scanner) {
- Queue<StreamsDatum> data = Queues.newLinkedBlockingQueue();
- while(scanner.hasNextLine()) {
- data.add(converter.convert(scanner.nextLine()));
- }
- cleanUp();
- return data;
+ private Queue<StreamsDatum> constructQueue(Scanner scanner) {
+ Queue<StreamsDatum> data = Queues.newLinkedBlockingQueue();
+ while(scanner.hasNextLine()) {
+ data.add(converter.convert(scanner.nextLine()));
}
+ cleanUp();
+ return data;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/StreamsDatumConverter.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/StreamsDatumConverter.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/StreamsDatumConverter.java
index e3b7dd1..9172167 100644
--- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/StreamsDatumConverter.java
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/StreamsDatumConverter.java
@@ -27,5 +27,5 @@ import java.io.Serializable;
*/
public interface StreamsDatumConverter extends Serializable {
- public StreamsDatum convert(String s);
+ public StreamsDatum convert(String s);
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/StringToDocumentConverter.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/StringToDocumentConverter.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/StringToDocumentConverter.java
index 6f4e620..3727aa1 100644
--- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/StringToDocumentConverter.java
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/StringToDocumentConverter.java
@@ -25,9 +25,9 @@ import org.apache.streams.core.StreamsDatum;
*/
public class StringToDocumentConverter implements StreamsDatumConverter {
- @Override
- public StreamsDatum convert(String s) {
- return new StreamsDatum(s);
- }
+ @Override
+ public StreamsDatum convert(String s) {
+ return new StreamsDatum(s);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/tests/TestComponentsLocalStream.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/tests/TestComponentsLocalStream.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/tests/TestComponentsLocalStream.java
index 935c8fe..5154ea3 100644
--- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/tests/TestComponentsLocalStream.java
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/tests/TestComponentsLocalStream.java
@@ -23,6 +23,7 @@ import org.apache.streams.test.component.ExpectedDatumsPersistWriter;
import org.apache.streams.test.component.FileReaderProvider;
import org.apache.streams.test.component.StringToDocumentConverter;
import org.apache.streams.util.ComponentUtils;
+
import org.junit.After;
import org.junit.Test;
@@ -31,22 +32,22 @@ import org.junit.Test;
*/
public class TestComponentsLocalStream {
- @After
- public void removeLocalMBeans() {
- try {
- ComponentUtils.removeAllMBeansOfDomain("org.apache.streams.local");
- } catch (Exception e) {
- //No op. proceed to next test
- }
+ @After
+ public void removeLocalMBeans() {
+ try {
+ ComponentUtils.removeAllMBeansOfDomain("org.apache.streams.local");
+ } catch (Exception e) {
+ //No op. proceed to next test
}
+ }
- @Test
- public void testLocalStreamWithComponent() {
- LocalStreamBuilder builder = new LocalStreamBuilder();
- builder.newReadCurrentStream("provider", new FileReaderProvider("/TestFile.txt",
- new StringToDocumentConverter()));
- builder.addStreamsPersistWriter("writer", new ExpectedDatumsPersistWriter(new StringToDocumentConverter(),
- "/TestFile.txt"), 1, "provider")
+ @Test
+ public void testLocalStreamWithComponent() {
+ LocalStreamBuilder builder = new LocalStreamBuilder();
+ builder.newReadCurrentStream("provider", new FileReaderProvider("/TestFile.txt",
+ new StringToDocumentConverter()));
+ builder.addStreamsPersistWriter("writer", new ExpectedDatumsPersistWriter(new StringToDocumentConverter(),
+ "/TestFile.txt"), 1, "provider")
.start();
- }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/tests/TestExpectedDatumsPersitWriter.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/tests/TestExpectedDatumsPersitWriter.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/tests/TestExpectedDatumsPersitWriter.java
index 11e891b..0535295 100644
--- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/tests/TestExpectedDatumsPersitWriter.java
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/tests/TestExpectedDatumsPersitWriter.java
@@ -22,8 +22,8 @@ import org.apache.streams.core.StreamsDatum;
import org.apache.streams.test.component.ExpectedDatumsPersistWriter;
import org.apache.streams.test.component.StringToDocumentConverter;
import org.apache.streams.util.ComponentUtils;
+
import org.junit.After;
-import org.junit.Ignore;
import org.junit.Test;
/**
@@ -31,37 +31,37 @@ import org.junit.Test;
*/
public class TestExpectedDatumsPersitWriter {
- private static final StreamsDatum[] INPUT_DATUMS = new StreamsDatum[] {
- new StreamsDatum("Document1"),
- new StreamsDatum("Document2"),
- new StreamsDatum("Document3"),
- new StreamsDatum("Document4")
+ private static final StreamsDatum[] INPUT_DATUMS = new StreamsDatum[] {
+ new StreamsDatum("Document1"),
+ new StreamsDatum("Document2"),
+ new StreamsDatum("Document3"),
+ new StreamsDatum("Document4")
// Uncomment to prove failures occur, or comment out a datum above
// ,new StreamsDatum("Document5")
- };
+ };
- @After
- public void removeLocalMBeans() {
- try {
- ComponentUtils.removeAllMBeansOfDomain("org.apache.streams.local");
- } catch (Exception e) {
- //No op. proceed to next test
- }
+ @After
+ public void removeLocalMBeans() {
+ try {
+ ComponentUtils.removeAllMBeansOfDomain("org.apache.streams.local");
+ } catch (Exception e) {
+ //No op. proceed to next test
}
+ }
- @Test
- public void testExpectedDatumsPersistWriterFileName() {
- testDatums(new ExpectedDatumsPersistWriter(new StringToDocumentConverter(), "/TestFile.txt"));
- }
+ @Test
+ public void testExpectedDatumsPersistWriterFileName() {
+ testDatums(new ExpectedDatumsPersistWriter(new StringToDocumentConverter(), "/TestFile.txt"));
+ }
- private void testDatums(ExpectedDatumsPersistWriter writer) {
- writer.prepare(null);
- for(StreamsDatum datum : INPUT_DATUMS) {
- writer.write(datum);
- }
- writer.cleanUp();
+ private void testDatums(ExpectedDatumsPersistWriter writer) {
+ writer.prepare(null);
+ for(StreamsDatum datum : INPUT_DATUMS) {
+ writer.write(datum);
}
+ writer.cleanUp();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/tests/TestFileReaderProvider.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/tests/TestFileReaderProvider.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/tests/TestFileReaderProvider.java
index 1ae9a24..a2b7bba 100644
--- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/tests/TestFileReaderProvider.java
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/tests/TestFileReaderProvider.java
@@ -23,41 +23,39 @@ import org.apache.streams.core.StreamsResultSet;
import org.apache.streams.test.component.FileReaderProvider;
import org.apache.streams.test.component.StringToDocumentConverter;
import org.apache.streams.util.ComponentUtils;
+
import org.junit.After;
-import org.junit.Ignore;
import org.junit.Test;
-import java.io.InputStream;
-
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
/**
*
*/
public class TestFileReaderProvider {
- @After
- public void removeLocalMBeans() {
- try {
- ComponentUtils.removeAllMBeansOfDomain("org.apache.streams.local");
- } catch (Exception e) {
- //No op. proceed to next test
- }
+ @After
+ public void removeLocalMBeans() {
+ try {
+ ComponentUtils.removeAllMBeansOfDomain("org.apache.streams.local");
+ } catch (Exception e) {
+ //No op. proceed to next test
}
+ }
- @Test
- public void testFileReaderProviderFileName() {
- String fileName = "/TestFile.txt";
- FileReaderProvider provider = new FileReaderProvider(fileName, new StringToDocumentConverter());
- provider.prepare(null);
- StreamsResultSet resultSet = provider.readCurrent();
- int count = 0;
- for(StreamsDatum datum : resultSet) {
- ++count;
- }
- assertEquals(4, count);
- provider.cleanUp();
+ @Test
+ public void testFileReaderProviderFileName() {
+ String fileName = "/TestFile.txt";
+ FileReaderProvider provider = new FileReaderProvider(fileName, new StringToDocumentConverter());
+ provider.prepare(null);
+ StreamsResultSet resultSet = provider.readCurrent();
+ int count = 0;
+ for(StreamsDatum datum : resultSet) {
+ ++count;
}
+ assertEquals(4, count);
+ provider.cleanUp();
+ }
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsComponentFactory.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsComponentFactory.java b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsComponentFactory.java
index 9b887af..44ade9c 100644
--- a/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsComponentFactory.java
+++ b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsComponentFactory.java
@@ -19,54 +19,47 @@
package org.apache.streams.pig;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import org.apache.commons.lang.ArrayUtils;
import org.apache.streams.core.StreamsProcessor;
import org.apache.streams.data.ActivityConverter;
-import org.apache.streams.data.ActivitySerializer;
-import org.slf4j.Logger;
-import java.lang.reflect.InvocationTargetException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
/**
- * Static reflection wrappers for instantiating StreamsComponents
+ * Static reflection wrappers for instantiating StreamsComponents.
*/
public class StreamsComponentFactory {
- private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(StreamsComponentFactory.class);
+ private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(StreamsComponentFactory.class);
- public static ActivityConverter getConverterInstance(Class<?> converterClazz) {
+ public static ActivityConverter getConverterInstance(Class<?> converterClazz) {
- Object object = null;
- try {
- object = converterClazz.getConstructor().newInstance();
- } catch (Exception e) {
- LOGGER.error(e.getMessage());
- }
-
- Preconditions.checkNotNull(object);
+ Object object = null;
+ try {
+ object = converterClazz.getConstructor().newInstance();
+ } catch (Exception e) {
+ LOGGER.error(e.getMessage());
+ }
- ActivityConverter converter = (ActivityConverter) object;
+ Preconditions.checkNotNull(object);
- return converter;
+ ActivityConverter converter = (ActivityConverter) object;
- }
+ return converter;
- public static StreamsProcessor getProcessorInstance(Class<?> processorClazz) {
+ }
- Object object = null;
- try {
- object = processorClazz.getConstructor().newInstance();
- } catch (Exception e) {
- LOGGER.error(e.getMessage());
- }
- StreamsProcessor processor = (StreamsProcessor) object;
- return processor;
+ public static StreamsProcessor getProcessorInstance(Class<?> processorClazz) {
+ Object object = null;
+ try {
+ object = processorClazz.getConstructor().newInstance();
+ } catch (Exception e) {
+ LOGGER.error(e.getMessage());
}
+ StreamsProcessor processor = (StreamsProcessor) object;
+ return processor;
+
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsPigBuilder.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsPigBuilder.java b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsPigBuilder.java
deleted file mode 100644
index 5ff4145..0000000
--- a/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsPigBuilder.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.pig;
-
-import org.apache.streams.config.StreamsConfiguration;
-import org.apache.streams.core.StreamBuilder;
-import org.apache.streams.core.StreamsPersistWriter;
-import org.apache.streams.core.StreamsProcessor;
-import org.apache.streams.core.StreamsProvider;
-import org.joda.time.DateTime;
-
-import java.math.BigInteger;
-
-/**
- * Goal is to be able to build a pig workflow using same syntax as other
- * StreamsBuilders
- *
- * Currently implementers must write own pig scripts to use this module
- */
-public class StreamsPigBuilder implements StreamBuilder {
-
- @Override
- public StreamBuilder setStreamsConfiguration(StreamsConfiguration configuration) {
- return null;
- }
-
- @Override
- public StreamsConfiguration getStreamsConfiguration() {
- return null;
- }
-
- @Override
- public StreamBuilder addStreamsProcessor(String s, StreamsProcessor streamsProcessor, int i, String... strings) {
- return null;
- }
-
- @Override
- public StreamBuilder addStreamsPersistWriter(String s, StreamsPersistWriter streamsPersistWriter, int i, String... strings) {
- return null;
- }
-
- @Override
- public StreamBuilder newPerpetualStream(String s, StreamsProvider streamsProvider) {
- return null;
- }
-
- @Override
- public StreamBuilder newReadCurrentStream(String s, StreamsProvider streamsProvider) {
- return null;
- }
-
- @Override
- public StreamBuilder newReadNewStream(String s, StreamsProvider streamsProvider, BigInteger bigInteger) {
- return null;
- }
-
- @Override
- public StreamBuilder newReadRangeStream(String s, StreamsProvider streamsProvider, DateTime dateTime, DateTime dateTime2) {
- return null;
- }
-
- @Override
- public void start() {
-
- }
-
- @Override
- public void stop() {
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessDatumExec.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessDatumExec.java b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessDatumExec.java
index 74f7eb5..cd08020 100644
--- a/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessDatumExec.java
+++ b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessDatumExec.java
@@ -19,21 +19,25 @@
package org.apache.streams.pig;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.data.util.RFC3339Utils;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import datafu.pig.util.AliasableEvalFunc;
import org.apache.commons.lang.ArrayUtils;
import org.apache.hadoop.conf.Configuration;
-import org.apache.pig.EvalFunc;
import org.apache.pig.builtin.MonitoredUDF;
-import org.apache.pig.data.*;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.util.UDFContext;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsProcessor;
-import org.apache.streams.data.util.RFC3339Utils;
-import org.apache.streams.jackson.StreamsJacksonMapper;
import org.joda.time.DateTime;
import org.slf4j.Logger;
@@ -48,40 +52,40 @@ import java.util.concurrent.TimeUnit;
@MonitoredUDF(timeUnit = TimeUnit.SECONDS, duration = 30, intDefault = 10)
public class StreamsProcessDatumExec extends AliasableEvalFunc<DataBag> {
- private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(StreamsProcessDatumExec.class);
-
- TupleFactory mTupleFactory = TupleFactory.getInstance();
- BagFactory mBagFactory = BagFactory.getInstance();
-
- StreamsProcessor streamsProcessor;
-
- ObjectMapper mapper = StreamsJacksonMapper.getInstance();
-
- public StreamsProcessDatumExec(String... execArgs) throws ClassNotFoundException{
- Preconditions.checkNotNull(execArgs);
- Preconditions.checkArgument(execArgs.length > 0);
- String classFullName = execArgs[0];
- Preconditions.checkNotNull(classFullName);
- String[] prepareArgs = (String[]) ArrayUtils.remove(execArgs, 0);
- streamsProcessor = StreamsComponentFactory.getProcessorInstance(Class.forName(classFullName));
- if( execArgs.length == 1 ) {
- LOGGER.debug("prepare (null)");
- streamsProcessor.prepare(null);
- } else if( execArgs.length > 1 ) {
- LOGGER.debug("prepare " + Arrays.toString(prepareArgs));
- streamsProcessor.prepare(prepareArgs);
- }
+ private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(StreamsProcessDatumExec.class);
+
+ TupleFactory mTupleFactory = TupleFactory.getInstance();
+ BagFactory mBagFactory = BagFactory.getInstance();
+
+ StreamsProcessor streamsProcessor;
+
+ ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+
+ public StreamsProcessDatumExec(String... execArgs) throws ClassNotFoundException{
+ Preconditions.checkNotNull(execArgs);
+ Preconditions.checkArgument(execArgs.length > 0);
+ String classFullName = execArgs[0];
+ Preconditions.checkNotNull(classFullName);
+ String[] prepareArgs = (String[]) ArrayUtils.remove(execArgs, 0);
+ streamsProcessor = StreamsComponentFactory.getProcessorInstance(Class.forName(classFullName));
+ if( execArgs.length == 1 ) {
+ LOGGER.debug("prepare (null)");
+ streamsProcessor.prepare(null);
+ } else if( execArgs.length > 1 ) {
+ LOGGER.debug("prepare " + Arrays.toString(prepareArgs));
+ streamsProcessor.prepare(prepareArgs);
}
+ }
- @Override
- public DataBag exec(Tuple input) throws IOException {
+ @Override
+ public DataBag exec(Tuple input) throws IOException {
- if (input == null || input.size() == 0)
- return null;
+ if (input == null || input.size() == 0)
+ return null;
- DataBag output = BagFactory.getInstance().newDefaultBag();
+ DataBag output = BagFactory.getInstance().newDefaultBag();
- Configuration conf = UDFContext.getUDFContext().getJobConf();
+ Configuration conf = UDFContext.getUDFContext().getJobConf();
// I would prefer it work this way, but at the moment it doesn't
@@ -95,91 +99,91 @@ public class StreamsProcessDatumExec extends AliasableEvalFunc<DataBag> {
// }
// String object = getString(input, "object");
- String id = (String) input.get(0);
- String source = (String) input.get(1);
- Long timestamp;
- try {
- timestamp = (Long) input.get(2);
- } catch( Exception e ) {
- timestamp = RFC3339Utils.parseUTC((String)input.get(2)).getMillis();
- }
- String object = (String) input.get(3);
+ String id = (String) input.get(0);
+ String source = (String) input.get(1);
+ Long timestamp;
+ try {
+ timestamp = (Long) input.get(2);
+ } catch( Exception e ) {
+ timestamp = RFC3339Utils.parseUTC((String)input.get(2)).getMillis();
+ }
+ String object = (String) input.get(3);
- StreamsDatum entry = new StreamsDatum(object, id, new DateTime(timestamp));
+ StreamsDatum entry = new StreamsDatum(object, id, new DateTime(timestamp));
- List<StreamsDatum> resultSet = streamsProcessor.process(entry);
- List<Tuple> resultTupleList = Lists.newArrayList();
+ List<StreamsDatum> resultSet = streamsProcessor.process(entry);
+ List<Tuple> resultTupleList = Lists.newArrayList();
- for( StreamsDatum resultDatum : resultSet ) {
- Tuple tuple = mTupleFactory.newTuple();
- tuple.append(id);
- tuple.append(source);
- tuple.append(timestamp);
+ for( StreamsDatum resultDatum : resultSet ) {
+ Tuple tuple = mTupleFactory.newTuple();
+ tuple.append(id);
+ tuple.append(source);
+ tuple.append(timestamp);
- if( resultDatum.getDocument() instanceof String )
- tuple.append(resultDatum.getDocument());
- else
- tuple.append(mapper.writeValueAsString(resultDatum.getDocument()));
- resultTupleList.add(tuple);
- }
+ if( resultDatum.getDocument() instanceof String )
+ tuple.append(resultDatum.getDocument());
+ else
+ tuple.append(mapper.writeValueAsString(resultDatum.getDocument()));
+ resultTupleList.add(tuple);
+ }
- DataBag result = mBagFactory.newDefaultBag(resultTupleList);
+ DataBag result = mBagFactory.newDefaultBag(resultTupleList);
- return result;
+ return result;
- }
+ }
- public void finish() {
- streamsProcessor.cleanUp();
+ public void finish() {
+ streamsProcessor.cleanUp();
+ }
+
+ @Override
+ public Schema getOutputSchema(Schema schema) {
+ // Check that we were passed two fields
+ String error = "Expected: id\tsource\ttimestamp\tobject";
+ if (schema.size() != 4) {
+ throw new RuntimeException(error);
}
- @Override
- public Schema getOutputSchema(Schema schema) {
- // Check that we were passed two fields
- String error = "Expected: id\tsource\ttimestamp\tobject";
- if (schema.size() != 4) {
- throw new RuntimeException(error);
- }
-
- try {
- // Get the types for both columns and check them. If they are
- // wrong, figure out what types were passed and give a good error
- // message.
- if (schema.getField(0).type != DataType.CHARARRAY &&
- schema.getField(0).type != DataType.LONG) {
- error += "Problem with id: must be CHARARRAY or LONG";
- error += "\t(";
- error += DataType.findTypeName(schema.getField(0).type);
- error += ")\n";
- throw new RuntimeException(error);
- }
- if (schema.getField(1).type != DataType.CHARARRAY) {
- error += "Problem with source: must be CHARARRAY";
- error += "\t(";
- error += DataType.findTypeName(schema.getField(1).type);
- error += ")\n";
- throw new RuntimeException(error);
- }
- if (schema.getField(2).type != DataType.CHARARRAY &&
- schema.getField(2).type != DataType.LONG) {
- error += "Problem with timestamp: must be CHARARRAY or LONG";
- error += "\t(";
- error += DataType.findTypeName(schema.getField(2).type);
- error += ")\n";
- throw new RuntimeException(error);
- }
- if (schema.getField(3).type != DataType.CHARARRAY) {
- error += "Problem with object: must be CHARARRAY";
- error += "\t(";
- error += DataType.findTypeName(schema.getField(3).type);
- error += ")\n";
- throw new RuntimeException(error);
- }
- } catch (Exception e) {
- throw new RuntimeException(error);
- }
-
- // Always hand back the same schema we are passed
- return schema;
+ try {
+ // Get the types for both columns and check them. If they are
+ // wrong, figure out what types were passed and give a good error
+ // message.
+ if (schema.getField(0).type != DataType.CHARARRAY &&
+ schema.getField(0).type != DataType.LONG) {
+ error += "Problem with id: must be CHARARRAY or LONG";
+ error += "\t(";
+ error += DataType.findTypeName(schema.getField(0).type);
+ error += ")\n";
+ throw new RuntimeException(error);
+ }
+ if (schema.getField(1).type != DataType.CHARARRAY) {
+ error += "Problem with source: must be CHARARRAY";
+ error += "\t(";
+ error += DataType.findTypeName(schema.getField(1).type);
+ error += ")\n";
+ throw new RuntimeException(error);
+ }
+ if (schema.getField(2).type != DataType.CHARARRAY &&
+ schema.getField(2).type != DataType.LONG) {
+ error += "Problem with timestamp: must be CHARARRAY or LONG";
+ error += "\t(";
+ error += DataType.findTypeName(schema.getField(2).type);
+ error += ")\n";
+ throw new RuntimeException(error);
+ }
+ if (schema.getField(3).type != DataType.CHARARRAY) {
+ error += "Problem with object: must be CHARARRAY";
+ error += "\t(";
+ error += DataType.findTypeName(schema.getField(3).type);
+ error += ")\n";
+ throw new RuntimeException(error);
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(error);
}
+
+ // Always hand back the same schema we are passed
+ return schema;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessDocumentExec.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessDocumentExec.java b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessDocumentExec.java
index 788b347..2f40923 100644
--- a/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessDocumentExec.java
+++ b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessDocumentExec.java
@@ -19,25 +19,15 @@
package org.apache.streams.pig;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+
import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
import datafu.pig.util.SimpleEvalFunc;
import org.apache.commons.lang.ArrayUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.pig.EvalFunc;
import org.apache.pig.builtin.MonitoredUDF;
-import org.apache.pig.data.BagFactory;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
-import org.apache.pig.impl.util.UDFContext;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsProcessor;
-import org.apache.streams.data.ActivitySerializer;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.pojo.json.Activity;
import org.slf4j.Logger;
import java.io.IOException;
@@ -54,59 +44,59 @@ import java.util.concurrent.TimeUnit;
@MonitoredUDF(timeUnit = TimeUnit.SECONDS, duration = 30, intDefault = 10)
public class StreamsProcessDocumentExec extends SimpleEvalFunc<String> {
- private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(StreamsProcessDocumentExec.class);
-
- StreamsProcessor streamsProcessor;
- ObjectMapper mapper = StreamsJacksonMapper.getInstance();
-
- public StreamsProcessDocumentExec(String... execArgs) throws ClassNotFoundException{
- Preconditions.checkNotNull(execArgs);
- Preconditions.checkArgument(execArgs.length > 0);
- String classFullName = execArgs[0];
- Preconditions.checkNotNull(classFullName);
- String[] prepareArgs = (String[]) ArrayUtils.remove(execArgs, 0);
- streamsProcessor = StreamsComponentFactory.getProcessorInstance(Class.forName(classFullName));
- if( execArgs.length == 1 ) {
- LOGGER.debug("prepare (null)");
- streamsProcessor.prepare(null);
- } else if( execArgs.length > 1 ) {
- LOGGER.debug("prepare " + Arrays.toString(prepareArgs));
- streamsProcessor.prepare(prepareArgs);
- }
+ private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(StreamsProcessDocumentExec.class);
+
+ StreamsProcessor streamsProcessor;
+ ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+
+ public StreamsProcessDocumentExec(String... execArgs) throws ClassNotFoundException{
+ Preconditions.checkNotNull(execArgs);
+ Preconditions.checkArgument(execArgs.length > 0);
+ String classFullName = execArgs[0];
+ Preconditions.checkNotNull(classFullName);
+ String[] prepareArgs = (String[]) ArrayUtils.remove(execArgs, 0);
+ streamsProcessor = StreamsComponentFactory.getProcessorInstance(Class.forName(classFullName));
+ if( execArgs.length == 1 ) {
+ LOGGER.debug("prepare (null)");
+ streamsProcessor.prepare(null);
+ } else if( execArgs.length > 1 ) {
+ LOGGER.debug("prepare " + Arrays.toString(prepareArgs));
+ streamsProcessor.prepare(prepareArgs);
}
+ }
- public String call(String document) throws IOException {
+ public String call(String document) throws IOException {
- Preconditions.checkNotNull(streamsProcessor);
- Preconditions.checkNotNull(document);
+ Preconditions.checkNotNull(streamsProcessor);
+ Preconditions.checkNotNull(document);
- LOGGER.debug(document);
+ LOGGER.debug(document);
- StreamsDatum entry = new StreamsDatum(document);
+ StreamsDatum entry = new StreamsDatum(document);
- Preconditions.checkNotNull(entry);
+ Preconditions.checkNotNull(entry);
- LOGGER.debug(entry.toString());
+ LOGGER.debug(entry.toString());
- List<StreamsDatum> resultSet = streamsProcessor.process(entry);
+ List<StreamsDatum> resultSet = streamsProcessor.process(entry);
- LOGGER.debug(resultSet.toString());
+ LOGGER.debug(resultSet.toString());
- Object resultDoc = null;
- for( StreamsDatum resultDatum : resultSet ) {
- resultDoc = resultDatum.getDocument();
- }
+ Object resultDoc = null;
+ for( StreamsDatum resultDatum : resultSet ) {
+ resultDoc = resultDatum.getDocument();
+ }
- Preconditions.checkNotNull(resultDoc);
+ Preconditions.checkNotNull(resultDoc);
- if( resultDoc instanceof String )
- return (String) resultDoc;
- else
- return mapper.writeValueAsString(resultDoc);
+ if( resultDoc instanceof String )
+ return (String) resultDoc;
+ else
+ return mapper.writeValueAsString(resultDoc);
- }
+ }
- public void finish() {
- streamsProcessor.cleanUp();
- }
+ public void finish() {
+ streamsProcessor.cleanUp();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsStorage.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsStorage.java b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsStorage.java
deleted file mode 100644
index 7692763..0000000
--- a/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsStorage.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.pig;
-
-import org.apache.pig.builtin.PigStorage;
-
-/**
- * It would be nice if streams persisters could be used for input / output
- * within the pig runtime.
- */
-public class StreamsStorage extends PigStorage {
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/AppendStringProcessor.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/AppendStringProcessor.java b/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/AppendStringProcessor.java
index 4db38fd..a48a5e8 100644
--- a/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/AppendStringProcessor.java
+++ b/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/AppendStringProcessor.java
@@ -21,46 +21,47 @@ package org.apache.streams.pig.test;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsProcessor;
+
import org.slf4j.Logger;
import java.util.LinkedList;
import java.util.List;
/**
- * Used to Test Pig processor wrapper with arguments to prepare method
+ * Used to Test Pig processor wrapper with arguments to prepare method.
*/
public class AppendStringProcessor implements StreamsProcessor {
- public final static String STREAMS_ID = "AppendStringProcessor";
+ public final static String STREAMS_ID = "AppendStringProcessor";
- private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(AppendStringProcessor.class);
+ private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(AppendStringProcessor.class);
- String append;
+ String append;
- public AppendStringProcessor() {
- }
+ public AppendStringProcessor() {
+ }
- @Override
- public String getId() {
- return STREAMS_ID;
- }
+ @Override
+ public String getId() {
+ return STREAMS_ID;
+ }
- @Override
- public List<StreamsDatum> process(StreamsDatum entry) {
- List<StreamsDatum> resultSet;
- resultSet = new LinkedList<StreamsDatum>();
- String value = (String) entry.getDocument()+ new String(append);
- resultSet.add(new StreamsDatum(value));
- return resultSet;
- }
+ @Override
+ public List<StreamsDatum> process(StreamsDatum entry) {
+ List<StreamsDatum> resultSet;
+ resultSet = new LinkedList<StreamsDatum>();
+ String value = (String) entry.getDocument()+ new String(append);
+ resultSet.add(new StreamsDatum(value));
+ return resultSet;
+ }
- @Override
- public void prepare(Object configurationObject) {
- append = ((String[]) configurationObject)[0];
- }
+ @Override
+ public void prepare(Object configurationObject) {
+ append = ((String[]) configurationObject)[0];
+ }
- @Override
- public void cleanUp() {
- LOGGER.info("Processor clean up");
- }
+ @Override
+ public void cleanUp() {
+ LOGGER.info("Processor clean up");
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/CopyThriceProcessor.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/CopyThriceProcessor.java b/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/CopyThriceProcessor.java
index 2b687b1..5336007 100644
--- a/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/CopyThriceProcessor.java
+++ b/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/CopyThriceProcessor.java
@@ -21,46 +21,47 @@ package org.apache.streams.pig.test;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsProcessor;
+
import org.slf4j.Logger;
import java.util.LinkedList;
import java.util.List;
/**
- * Used to Test Pig processor wrapper when multiple datums are returned
+ * Used to Test Pig processor wrapper when multiple datums are returned.
*/
public class CopyThriceProcessor implements StreamsProcessor {
- public final static String STREAMS_ID = "CopyThriceProcessor";
+ public final static String STREAMS_ID = "CopyThriceProcessor";
- private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(CopyThriceProcessor.class);
+ private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(CopyThriceProcessor.class);
- List<StreamsDatum> result;
+ List<StreamsDatum> result;
- public CopyThriceProcessor() {
- }
+ public CopyThriceProcessor() {
+ }
- @Override
- public String getId() {
- return STREAMS_ID;
- }
+ @Override
+ public String getId() {
+ return STREAMS_ID;
+ }
- @Override
- public List<StreamsDatum> process(StreamsDatum entry) {
- this.result = new LinkedList<StreamsDatum>();
- result.add(entry);
- result.add(entry);
- result.add(entry);
- return result;
- }
+ @Override
+ public List<StreamsDatum> process(StreamsDatum entry) {
+ this.result = new LinkedList<StreamsDatum>();
+ result.add(entry);
+ result.add(entry);
+ result.add(entry);
+ return result;
+ }
- @Override
- public void prepare(Object configurationObject) {
+ @Override
+ public void prepare(Object configurationObject) {
- }
+ }
- @Override
- public void cleanUp() {
- LOGGER.info("Processor clean up");
- }
+ @Override
+ public void cleanUp() {
+ LOGGER.info("Processor clean up");
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/DoNothingProcessor.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/DoNothingProcessor.java b/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/DoNothingProcessor.java
index 5528a38..07d3b6f 100644
--- a/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/DoNothingProcessor.java
+++ b/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/DoNothingProcessor.java
@@ -21,44 +21,45 @@ package org.apache.streams.pig.test;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsProcessor;
+
import org.slf4j.Logger;
import java.util.LinkedList;
import java.util.List;
/**
- * Used to Test Pig processor wrapper - datum passthrough
+ * Used to Test Pig processor wrapper - datum passthrough.
*/
public class DoNothingProcessor implements StreamsProcessor {
- public final static String STREAMS_ID = "DoNothingProcessor";
+ public final static String STREAMS_ID = "DoNothingProcessor";
- private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(DoNothingProcessor.class);
+ private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(DoNothingProcessor.class);
- List<StreamsDatum> result;
+ List<StreamsDatum> result;
- public DoNothingProcessor() {
- }
+ public DoNothingProcessor() {
+ }
- @Override
- public String getId() {
- return STREAMS_ID;
- }
+ @Override
+ public String getId() {
+ return STREAMS_ID;
+ }
- @Override
- public List<StreamsDatum> process(StreamsDatum entry) {
- this.result = new LinkedList<StreamsDatum>();
- result.add(entry);
- return result;
- }
+ @Override
+ public List<StreamsDatum> process(StreamsDatum entry) {
+ this.result = new LinkedList<StreamsDatum>();
+ result.add(entry);
+ return result;
+ }
- @Override
- public void prepare(Object configurationObject) {
- LOGGER.info("Processor prepare");
- }
+ @Override
+ public void prepare(Object configurationObject) {
+ LOGGER.info("Processor prepare");
+ }
- @Override
- public void cleanUp() {
- LOGGER.info("Processor clean up");
- }
+ @Override
+ public void cleanUp() {
+ LOGGER.info("Processor clean up");
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/PigConverterTest.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/PigConverterTest.java b/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/PigConverterTest.java
index 5dad52c..a983cc7 100644
--- a/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/PigConverterTest.java
+++ b/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/PigConverterTest.java
@@ -19,13 +19,14 @@
package org.apache.streams.pig.test;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.Lists;
-import org.apache.pig.pigunit.PigTest;
import org.apache.streams.jackson.StreamsJacksonMapper;
import org.apache.streams.twitter.converter.TwitterDateTimeFormat;
import org.apache.streams.twitter.converter.TwitterJsonRetweetActivityConverter;
import org.apache.streams.twitter.pojo.Retweet;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Lists;
+import org.apache.pig.pigunit.PigTest;
import org.apache.tools.ant.util.StringUtils;
import org.junit.Test;
@@ -34,23 +35,23 @@ import org.junit.Test;
*/
public class PigConverterTest {
- @Test
- public void testPigConverter() throws Exception {
+ @Test
+ public void testPigConverter() throws Exception {
- String[] input = {
- "159475541894897679\ttwitter,statuses/user_timeline\t1384499359006\t{\"retweeted_status\":{\"contributors\":null,\"text\":\"The Costa Concordia cruise ship accident could be a disaster for the industry | http://t.co/M9UUNvZi (via @TIMEMoneyland)\",\"geo\":null,\"retweeted\":false,\"in_reply_to_screen_name\":null,\"possibly_sensitive\":false,\"truncated\":false,\"lang\":\"en\",\"entities\":{\"symbols\":[],\"urls\":[{\"expanded_url\":\"http://ti.me/zYyEtD\",\"indices\":[80,100],\"display_url\":\"ti.me/zYyEtD\",\"url\":\"http://t.co/M9UUNvZi\"}],\"hashtags\":[],\"user_mentions\":[{\"id\":245888431,\"name\":\"TIME Moneyland\",\"indices\":[106,120],\"screen_name\":\"TIMEMoneyland\",\"id_str\":\"245888431\"}]},\"in_reply_to_status_id_str\":null,\"id\":159470076259602432,\"source\":\"<a href=\\\"http://www.hootsuite.com\\\" rel=\\\"nofollow\\\">HootSuite<\\/a>\",\"in_reply_to_user_id_str\":null,\"favorited\":false,\"in_reply_to_status_id\":null,\"retweet_count\":71,\"create
d_at\":\"Wed Jan 18 03:00:03 +0000 2012\",\"in_reply_to_user_id\":null,\"favorite_count\":14,\"id_str\":\"159470076259602432\",\"place\":null,\"user\":{\"location\":\"\",\"default_profile\":false,\"profile_background_tile\":true,\"statuses_count\":70754,\"lang\":\"en\",\"profile_link_color\":\"1B4F89\",\"profile_banner_url\":\"https://pbs.twimg.com/profile_banners/14293310/1355243462\",\"id\":14293310,\"following\":false,\"protected\":false,\"favourites_count\":59,\"profile_text_color\":\"000000\",\"description\":\"Breaking news and current events from around the globe. Hosted by TIME staff. Tweet questions to our customer service team @TIMEmag_Service.\",\"verified\":true,\"contributors_enabled\":false,\"profile_sidebar_border_color\":\"000000\",\"name\":\"TIME.com\",\"profile_background_color\":\"CC0000\",\"created_at\":\"Thu Apr 03 13:54:30 +0000 2008\",\"default_profile_image\":false,\"followers_count\":5146268,\"profile_image_url_https\":\"https://pbs.twimg.com/profile_images/1
700796190/Picture_24_normal.png\",\"geo_enabled\":false,\"profile_background_image_url\":\"http://a0.twimg.com/profile_background_images/735228291/107f1a300a90ee713937234bb3d139c0.jpeg\",\"profile_background_image_url_https\":\"https://si0.twimg.com/profile_background_images/735228291/107f1a300a90ee713937234bb3d139c0.jpeg\",\"follow_request_sent\":false,\"entities\":{\"description\":{\"urls\":[]},\"url\":{\"urls\":[{\"expanded_url\":\"http://www.time.com\",\"indices\":[0,22],\"display_url\":\"time.com\",\"url\":\"http://t.co/4aYbUuAeSh\"}]}},\"url\":\"http://t.co/4aYbUuAeSh\",\"utc_offset\":-18000,\"time_zone\":\"Eastern Time (US & Canada)\",\"notifications\":false,\"profile_use_background_image\":true,\"friends_count\":742,\"profile_sidebar_fill_color\":\"D9D9D9\",\"screen_name\":\"TIME\",\"id_str\":\"14293310\",\"profile_image_url\":\"http://pbs.twimg.com/profile_images/1700796190/Picture_24_normal.png\",\"listed_count\":76944,\"is_translator\":false},\"coordinates\":null},\"contr
ibutors\":null,\"text\":\"RT @TIME: The Costa Concordia cruise ship accident could be a disaster for the industry | http://t.co/M9UUNvZi (via @TIMEMoneyland)\",\"geo\":null,\"retweeted\":false,\"in_reply_to_screen_name\":null,\"possibly_sensitive\":false,\"truncated\":false,\"lang\":\"en\",\"entities\":{\"symbols\":[],\"urls\":[{\"expanded_url\":\"http://ti.me/zYyEtD\",\"indices\":[90,110],\"display_url\":\"ti.me/zYyEtD\",\"url\":\"http://t.co/M9UUNvZi\"}],\"hashtags\":[],\"user_mentions\":[{\"id\":14293310,\"name\":\"TIME.com\",\"indices\":[3,8],\"screen_name\":\"TIME\",\"id_str\":\"14293310\"},{\"id\":245888431,\"name\":\"TIME Moneyland\",\"indices\":[116,130],\"screen_name\":\"TIMEMoneyland\",\"id_str\":\"245888431\"}]},\"in_reply_to_status_id_str\":null,\"id\":159475541894897679,\"source\":\"<a href=\\\"http://twitter.com/download/iphone\\\" rel=\\\"nofollow\\\">Twitter for iPhone<\\/a>\",\"in_reply_to_user_id_str\":null,\"favorited\":false,\"in_reply_to_status_id\":null,\"retwe
et_count\":71,\"created_at\":\"Wed Jan 18 03:21:46 +0000 2012\",\"in_reply_to_user_id\":null,\"favorite_count\":0,\"id_str\":\"159475541894897679\",\"place\":null,\"user\":{\"location\":\"\",\"default_profile\":false,\"profile_background_tile\":true,\"statuses_count\":5053,\"lang\":\"en\",\"profile_link_color\":\"738D84\",\"id\":27552112,\"following\":false,\"protected\":false,\"favourites_count\":52,\"profile_text_color\":\"97CEC9\",\"description\":\"\",\"verified\":false,\"contributors_enabled\":false,\"profile_sidebar_border_color\":\"A9AC00\",\"name\":\"rafael medina-flores\",\"profile_background_color\":\"C5EFE3\",\"created_at\":\"Mon Mar 30 01:21:55 +0000 2009\",\"default_profile_image\":false,\"followers_count\":963,\"profile_image_url_https\":\"https://pbs.twimg.com/profile_images/2519547938/image_normal.jpg\",\"geo_enabled\":true,\"profile_background_image_url\":\"http://a0.twimg.com/profile_background_images/167479660/trireme.jpg\",\"profile_background_image_url_https\":\"
https://si0.twimg.com/profile_background_images/167479660/trireme.jpg\",\"follow_request_sent\":false,\"entities\":{\"description\":{\"urls\":[]}},\"url\":null,\"utc_offset\":-25200,\"time_zone\":\"Mountain Time (US & Canada)\",\"notifications\":false,\"profile_use_background_image\":true,\"friends_count\":1800,\"profile_sidebar_fill_color\":\"5C4F3C\",\"screen_name\":\"rmedinaflores\",\"id_str\":\"27552112\",\"profile_image_url\":\"http://pbs.twimg.com/profile_images/2519547938/image_normal.jpg\",\"listed_count\":50,\"is_translator\":false},\"coordinates\":null}"
- };
+ String[] input = {
+ "159475541894897679\ttwitter,statuses/user_timeline\t1384499359006\t{\"retweeted_status\":{\"contributors\":null,\"text\":\"The Costa Concordia cruise ship accident could be a disaster for the industry | http://t.co/M9UUNvZi (via @TIMEMoneyland)\",\"geo\":null,\"retweeted\":false,\"in_reply_to_screen_name\":null,\"possibly_sensitive\":false,\"truncated\":false,\"lang\":\"en\",\"entities\":{\"symbols\":[],\"urls\":[{\"expanded_url\":\"http://ti.me/zYyEtD\",\"indices\":[80,100],\"display_url\":\"ti.me/zYyEtD\",\"url\":\"http://t.co/M9UUNvZi\"}],\"hashtags\":[],\"user_mentions\":[{\"id\":245888431,\"name\":\"TIME Moneyland\",\"indices\":[106,120],\"screen_name\":\"TIMEMoneyland\",\"id_str\":\"245888431\"}]},\"in_reply_to_status_id_str\":null,\"id\":159470076259602432,\"source\":\"<a href=\\\"http://www.hootsuite.com\\\" rel=\\\"nofollow\\\">HootSuite<\\/a>\",\"in_reply_to_user_id_str\":null,\"favorited\":false,\"in_reply_to_status_id\":null,\"retweet_count\":71,\"created_at\":\
"Wed Jan 18 03:00:03 +0000 2012\",\"in_reply_to_user_id\":null,\"favorite_count\":14,\"id_str\":\"159470076259602432\",\"place\":null,\"user\":{\"location\":\"\",\"default_profile\":false,\"profile_background_tile\":true,\"statuses_count\":70754,\"lang\":\"en\",\"profile_link_color\":\"1B4F89\",\"profile_banner_url\":\"https://pbs.twimg.com/profile_banners/14293310/1355243462\",\"id\":14293310,\"following\":false,\"protected\":false,\"favourites_count\":59,\"profile_text_color\":\"000000\",\"description\":\"Breaking news and current events from around the globe. Hosted by TIME staff. Tweet questions to our customer service team @TIMEmag_Service.\",\"verified\":true,\"contributors_enabled\":false,\"profile_sidebar_border_color\":\"000000\",\"name\":\"TIME.com\",\"profile_background_color\":\"CC0000\",\"created_at\":\"Thu Apr 03 13:54:30 +0000 2008\",\"default_profile_image\":false,\"followers_count\":5146268,\"profile_image_url_https\":\"https://pbs.twimg.com/profile_images/170079619
0/Picture_24_normal.png\",\"geo_enabled\":false,\"profile_background_image_url\":\"http://a0.twimg.com/profile_background_images/735228291/107f1a300a90ee713937234bb3d139c0.jpeg\",\"profile_background_image_url_https\":\"https://si0.twimg.com/profile_background_images/735228291/107f1a300a90ee713937234bb3d139c0.jpeg\",\"follow_request_sent\":false,\"entities\":{\"description\":{\"urls\":[]},\"url\":{\"urls\":[{\"expanded_url\":\"http://www.time.com\",\"indices\":[0,22],\"display_url\":\"time.com\",\"url\":\"http://t.co/4aYbUuAeSh\"}]}},\"url\":\"http://t.co/4aYbUuAeSh\",\"utc_offset\":-18000,\"time_zone\":\"Eastern Time (US & Canada)\",\"notifications\":false,\"profile_use_background_image\":true,\"friends_count\":742,\"profile_sidebar_fill_color\":\"D9D9D9\",\"screen_name\":\"TIME\",\"id_str\":\"14293310\",\"profile_image_url\":\"http://pbs.twimg.com/profile_images/1700796190/Picture_24_normal.png\",\"listed_count\":76944,\"is_translator\":false},\"coordinates\":null},\"contributors\
":null,\"text\":\"RT @TIME: The Costa Concordia cruise ship accident could be a disaster for the industry | http://t.co/M9UUNvZi (via @TIMEMoneyland)\",\"geo\":null,\"retweeted\":false,\"in_reply_to_screen_name\":null,\"possibly_sensitive\":false,\"truncated\":false,\"lang\":\"en\",\"entities\":{\"symbols\":[],\"urls\":[{\"expanded_url\":\"http://ti.me/zYyEtD\",\"indices\":[90,110],\"display_url\":\"ti.me/zYyEtD\",\"url\":\"http://t.co/M9UUNvZi\"}],\"hashtags\":[],\"user_mentions\":[{\"id\":14293310,\"name\":\"TIME.com\",\"indices\":[3,8],\"screen_name\":\"TIME\",\"id_str\":\"14293310\"},{\"id\":245888431,\"name\":\"TIME Moneyland\",\"indices\":[116,130],\"screen_name\":\"TIMEMoneyland\",\"id_str\":\"245888431\"}]},\"in_reply_to_status_id_str\":null,\"id\":159475541894897679,\"source\":\"<a href=\\\"http://twitter.com/download/iphone\\\" rel=\\\"nofollow\\\">Twitter for iPhone<\\/a>\",\"in_reply_to_user_id_str\":null,\"favorited\":false,\"in_reply_to_status_id\":null,\"retweet_count
\":71,\"created_at\":\"Wed Jan 18 03:21:46 +0000 2012\",\"in_reply_to_user_id\":null,\"favorite_count\":0,\"id_str\":\"159475541894897679\",\"place\":null,\"user\":{\"location\":\"\",\"default_profile\":false,\"profile_background_tile\":true,\"statuses_count\":5053,\"lang\":\"en\",\"profile_link_color\":\"738D84\",\"id\":27552112,\"following\":false,\"protected\":false,\"favourites_count\":52,\"profile_text_color\":\"97CEC9\",\"description\":\"\",\"verified\":false,\"contributors_enabled\":false,\"profile_sidebar_border_color\":\"A9AC00\",\"name\":\"rafael medina-flores\",\"profile_background_color\":\"C5EFE3\",\"created_at\":\"Mon Mar 30 01:21:55 +0000 2009\",\"default_profile_image\":false,\"followers_count\":963,\"profile_image_url_https\":\"https://pbs.twimg.com/profile_images/2519547938/image_normal.jpg\",\"geo_enabled\":true,\"profile_background_image_url\":\"http://a0.twimg.com/profile_background_images/167479660/trireme.jpg\",\"profile_background_image_url_https\":\"https://
si0.twimg.com/profile_background_images/167479660/trireme.jpg\",\"follow_request_sent\":false,\"entities\":{\"description\":{\"urls\":[]}},\"url\":null,\"utc_offset\":-25200,\"time_zone\":\"Mountain Time (US & Canada)\",\"notifications\":false,\"profile_use_background_image\":true,\"friends_count\":1800,\"profile_sidebar_fill_color\":\"5C4F3C\",\"screen_name\":\"rmedinaflores\",\"id_str\":\"27552112\",\"profile_image_url\":\"http://pbs.twimg.com/profile_images/2519547938/image_normal.jpg\",\"listed_count\":50,\"is_translator\":false},\"coordinates\":null}"
+ };
- String doc = (String) StringUtils.split(input[0], '\t').get(3);
- ObjectMapper mapper = StreamsJacksonMapper.getInstance(Lists.newArrayList(new TwitterDateTimeFormat().getFormat()));
- String outdoc = mapper.writeValueAsString(new TwitterJsonRetweetActivityConverter().toActivityList(mapper.readValue(doc, Retweet.class)).get(0));
+ String doc = (String) StringUtils.split(input[0], '\t').get(3);
+ ObjectMapper mapper = StreamsJacksonMapper.getInstance(Lists.newArrayList(new TwitterDateTimeFormat().getFormat()));
+ String outdoc = mapper.writeValueAsString(new TwitterJsonRetweetActivityConverter().toActivityList(mapper.readValue(doc, Retweet.class)).get(0));
- String[] output = new String[1];
- output[0] = "(159475541894897679,twitter,statuses/user_timeline,1384499359006," + outdoc + ")";
+ String[] output = new String[1];
+ output[0] = "(159475541894897679,twitter,statuses/user_timeline,1384499359006," + outdoc + ")";
- PigTest test;
- test = new PigTest("src/test/resources/pigconvertertest.pig");
- test.assertOutput("in", input, "out", output);
+ PigTest test;
+ test = new PigTest("src/test/resources/pigconvertertest.pig");
+ test.assertOutput("in", input, "out", output);
- }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/PigProcessDatumTest.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/PigProcessDatumTest.java b/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/PigProcessDatumTest.java
index 80b17b4..1cb7252 100644
--- a/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/PigProcessDatumTest.java
+++ b/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/PigProcessDatumTest.java
@@ -19,74 +19,74 @@
package org.apache.streams.pig.test;
-import org.apache.pig.pigunit.PigTest;
import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.twitter.converter.TwitterJsonTweetActivityConverter;
+
+import org.apache.pig.pigunit.PigTest;
import org.apache.tools.ant.util.StringUtils;
import org.junit.Test;
import java.util.List;
/**
- * These are tests for StreamsProcessDatumExec
+ * These are tests for StreamsProcessDatumExec.
*/
public class PigProcessDatumTest {
- @Test
- public void testPigDoNothingSingleDatum() throws Exception {
- String[] args = {};
+ @Test
+ public void testPigDoNothingSingleDatum() throws Exception {
+ String[] args = {};
- String[] input = {
- "159475541894897679\ttwitter,statuses/user_timeline\t1384499359006\t{content:\"content\",[\"a\":1,\"b\":\"c\"}",
- };
+ String[] input = {
+ "159475541894897679\ttwitter,statuses/user_timeline\t1384499359006\t{content:\"content\",[\"a\":1,\"b\":\"c\"}",
+ };
- DoNothingProcessor processor = new DoNothingProcessor();
+ DoNothingProcessor processor = new DoNothingProcessor();
- String doc = (String) StringUtils.split(input[0], '\t').get(3);
- StreamsDatum inputDatum = new StreamsDatum(doc);
- inputDatum.setId((String) StringUtils.split(input[0], '\t').get(0));
+ String doc = (String) StringUtils.split(input[0], '\t').get(3);
+ StreamsDatum inputDatum = new StreamsDatum(doc);
+ inputDatum.setId((String) StringUtils.split(input[0], '\t').get(0));
- processor.prepare(null);
+ processor.prepare(null);
- StreamsDatum resultDatum = processor.process(inputDatum).get(0);
- String resultDocument = (String) resultDatum.getDocument();
+ StreamsDatum resultDatum = processor.process(inputDatum).get(0);
+ String resultDocument = (String) resultDatum.getDocument();
- String[] output = new String[1];
- output[0] = "(159475541894897679,twitter,statuses/user_timeline,1384499359006," + resultDocument + ")";
+ String[] output = new String[1];
+ output[0] = "(159475541894897679,twitter,statuses/user_timeline,1384499359006," + resultDocument + ")";
- PigTest test;
- test = new PigTest("src/test/resources/pigprocessdatumtest.pig", args);
- test.assertOutput("in", input, "out", output);
+ PigTest test;
+ test = new PigTest("src/test/resources/pigprocessdatumtest.pig", args);
+ test.assertOutput("in", input, "out", output);
- }
+ }
- @Test
- public void testPigCopyThriceSingleDatum() throws Exception {
- String[] args = {};
+ @Test
+ public void testPigCopyThriceSingleDatum() throws Exception {
+ String[] args = {};
- String[] input = {
- "159475541894897679\ttwitter,statuses/user_timeline\t1384499359006\t{content:\"content\",[\"a\":1,\"b\":\"c\"}",
- };
+ String[] input = {
+ "159475541894897679\ttwitter,statuses/user_timeline\t1384499359006\t{content:\"content\",[\"a\":1,\"b\":\"c\"}",
+ };
- CopyThriceProcessor processor = new CopyThriceProcessor();
+ CopyThriceProcessor processor = new CopyThriceProcessor();
- String doc = (String) StringUtils.split(input[0], '\t').get(3);
- StreamsDatum inputDatum = new StreamsDatum(doc);
- inputDatum.setId((String) StringUtils.split(input[0], '\t').get(0));
+ String doc = (String) StringUtils.split(input[0], '\t').get(3);
+ StreamsDatum inputDatum = new StreamsDatum(doc);
+ inputDatum.setId((String) StringUtils.split(input[0], '\t').get(0));
- processor.prepare(null);
+ processor.prepare(null);
- List<StreamsDatum> resultSet = processor.process(inputDatum);
+ List<StreamsDatum> resultSet = processor.process(inputDatum);
- String[] output = new String[resultSet.size()];
+ String[] output = new String[resultSet.size()];
- for( int i = 0; i < output.length; i++ ) {
- output[i] = "(159475541894897679,twitter,statuses/user_timeline,1384499359006," + resultSet.get(i).getDocument() + ")";
- }
+ for( int i = 0; i < output.length; i++ ) {
+ output[i] = "(159475541894897679,twitter,statuses/user_timeline,1384499359006," + resultSet.get(i).getDocument() + ")";
+ }
- PigTest test;
- test = new PigTest("src/test/resources/pigprocessdatumcopytest.pig", args);
- test.assertOutput("in", input, "out", output);
+ PigTest test;
+ test = new PigTest("src/test/resources/pigprocessdatumcopytest.pig", args);
+ test.assertOutput("in", input, "out", output);
- }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/PigProcessDocumentTest.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/PigProcessDocumentTest.java b/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/PigProcessDocumentTest.java
index dd30eb1..2832fdc 100644
--- a/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/PigProcessDocumentTest.java
+++ b/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/PigProcessDocumentTest.java
@@ -19,94 +19,95 @@
package org.apache.streams.pig.test;
-import org.apache.pig.pigunit.PigTest;
import org.apache.streams.core.StreamsDatum;
+
+import org.apache.pig.pigunit.PigTest;
import org.apache.tools.ant.util.StringUtils;
import org.junit.Test;
/**
- * These are tests for StreamsProcessDocumentExec
+ * These are tests for StreamsProcessDocumentExec.
*/
public class PigProcessDocumentTest {
- @Test
- public void testPigProcessEmptyDocument() throws Exception {
+ @Test
+ public void testPigProcessEmptyDocument() throws Exception {
- String[] input = {
- "159475541894897679\ttwitter,statuses/user_timeline\t1384499359006\t{}"
- };
+ String[] input = {
+ "159475541894897679\ttwitter,statuses/user_timeline\t1384499359006\t{}"
+ };
- DoNothingProcessor processor = new DoNothingProcessor();
+ DoNothingProcessor processor = new DoNothingProcessor();
- String doc = (String) StringUtils.split(input[0], '\t').get(3);
- StreamsDatum inputDatum = new StreamsDatum(doc);
- inputDatum.setId((String) StringUtils.split(input[0], '\t').get(0));
+ String doc = (String) StringUtils.split(input[0], '\t').get(3);
+ StreamsDatum inputDatum = new StreamsDatum(doc);
+ inputDatum.setId((String) StringUtils.split(input[0], '\t').get(0));
- processor.prepare(null);
+ processor.prepare(null);
- StreamsDatum resultDatum = processor.process(inputDatum).get(0);
- String resultDocument = (String) resultDatum.getDocument();
+ StreamsDatum resultDatum = processor.process(inputDatum).get(0);
+ String resultDocument = (String) resultDatum.getDocument();
- String[] output = new String[1];
- output[0] = "(159475541894897679,twitter,statuses/user_timeline,1384499359006,"+resultDocument+")";
+ String[] output = new String[1];
+ output[0] = "(159475541894897679,twitter,statuses/user_timeline,1384499359006,"+resultDocument+")";
- PigTest test;
- test = new PigTest("src/test/resources/pigprocessdocumenttest.pig");
- test.assertOutput("in", input, "out", output);
+ PigTest test;
+ test = new PigTest("src/test/resources/pigprocessdocumenttest.pig");
+ test.assertOutput("in", input, "out", output);
- }
+ }
- @Test
- public void testPigProcessJsonDocument() throws Exception {
+ @Test
+ public void testPigProcessJsonDocument() throws Exception {
- String[] input = {
- "159475541894897679\ttwitter,statuses/user_timeline\t1384499359006\t{content:\"content\",[\"a\":1,\"b\":\"c\"}"
- };
+ String[] input = {
+ "159475541894897679\ttwitter,statuses/user_timeline\t1384499359006\t{content:\"content\",[\"a\":1,\"b\":\"c\"}"
+ };
- DoNothingProcessor processor = new DoNothingProcessor();
+ DoNothingProcessor processor = new DoNothingProcessor();
- String doc = (String) StringUtils.split(input[0], '\t').get(3);
- StreamsDatum inputDatum = new StreamsDatum(doc);
- inputDatum.setId((String) StringUtils.split(input[0], '\t').get(0));
+ String doc = (String) StringUtils.split(input[0], '\t').get(3);
+ StreamsDatum inputDatum = new StreamsDatum(doc);
+ inputDatum.setId((String) StringUtils.split(input[0], '\t').get(0));
- processor.prepare(null);
+ processor.prepare(null);
- StreamsDatum resultDatum = processor.process(inputDatum).get(0);
- String resultDocument = (String) resultDatum.getDocument();
+ StreamsDatum resultDatum = processor.process(inputDatum).get(0);
+ String resultDocument = (String) resultDatum.getDocument();
- String[] output = new String[1];
- output[0] = "(159475541894897679,twitter,statuses/user_timeline,1384499359006,"+resultDocument+")";
+ String[] output = new String[1];
+ output[0] = "(159475541894897679,twitter,statuses/user_timeline,1384499359006,"+resultDocument+")";
- PigTest test;
- test = new PigTest("src/test/resources/pigprocessdocumenttest.pig");
- test.assertOutput("in", input, "out", output);
+ PigTest test;
+ test = new PigTest("src/test/resources/pigprocessdocumenttest.pig");
+ test.assertOutput("in", input, "out", output);
- }
+ }
- @Test
- public void testPigProcessAppendDocument() throws Exception {
+ @Test
+ public void testPigProcessAppendDocument() throws Exception {
- String[] input = {
- "159475541894897679\ttwitter,statuses/user_timeline\t1384499359006\thowdy"
- };
+ String[] input = {
+ "159475541894897679\ttwitter,statuses/user_timeline\t1384499359006\thowdy"
+ };
- AppendStringProcessor processor = new AppendStringProcessor();
+ AppendStringProcessor processor = new AppendStringProcessor();
- String doc = (String) StringUtils.split(input[0], '\t').get(3);
- StreamsDatum inputDatum = new StreamsDatum(doc);
- inputDatum.setId((String) StringUtils.split(input[0], '\t').get(0));
+ String doc = (String) StringUtils.split(input[0], '\t').get(3);
+ StreamsDatum inputDatum = new StreamsDatum(doc);
+ inputDatum.setId((String) StringUtils.split(input[0], '\t').get(0));
- processor.prepare(new String[]{"doody"});
+ processor.prepare(new String[]{"doody"});
- StreamsDatum resultDatum = processor.process(inputDatum).get(0);
- String resultDocument = (String) resultDatum.getDocument();
+ StreamsDatum resultDatum = processor.process(inputDatum).get(0);
+ String resultDocument = (String) resultDatum.getDocument();
- String[] output = new String[1];
- output[0] = "(159475541894897679,twitter,statuses/user_timeline,1384499359006,"+resultDocument+")";
+ String[] output = new String[1];
+ output[0] = "(159475541894897679,twitter,statuses/user_timeline,1384499359006,"+resultDocument+")";
- PigTest test;
- test = new PigTest("src/test/resources/pigprocessdocumentappendtest.pig");
- test.assertOutput("in", input, "out", output);
+ PigTest test;
+ test = new PigTest("src/test/resources/pigprocessdocumentappendtest.pig");
+ test.assertOutput("in", input, "out", output);
- }
+ }
}