You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/08/26 21:10:22 UTC
[4/6] apex-malhar git commit: Fixed checkstyle errors for demos.
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/WindowedWordCount.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/WindowedWordCount.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/WindowedWordCount.java
index 6a6777e..c8a0e51 100644
--- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/WindowedWordCount.java
+++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/WindowedWordCount.java
@@ -57,7 +57,7 @@ import static org.apache.apex.malhar.stream.api.Option.Options.name;
public class WindowedWordCount implements StreamingApplication
{
static final int WINDOW_SIZE = 1; // Default window duration in minutes
-
+
/**
* A input operator that reads from and output a file line by line to downstream with a time gap between
* every two lines.
@@ -65,23 +65,23 @@ public class WindowedWordCount implements StreamingApplication
public static class TextInput extends BaseOperator implements InputOperator
{
private static boolean done = false;
-
+
public final transient DefaultOutputPort<String> output = new DefaultOutputPort<>();
-
+
private transient BufferedReader reader;
-
+
public static boolean isDone()
{
return done;
}
-
+
@Override
public void setup(Context.OperatorContext context)
{
done = false;
initReader();
}
-
+
private void initReader()
{
try {
@@ -91,13 +91,13 @@ public class WindowedWordCount implements StreamingApplication
throw Throwables.propagate(ex);
}
}
-
+
@Override
public void teardown()
{
IOUtils.closeQuietly(reader);
}
-
+
@Override
public void emitTuples()
{
@@ -118,16 +118,16 @@ public class WindowedWordCount implements StreamingApplication
}
}
}
-
+
public static class Collector extends BaseOperator
{
private static Map<KeyValPair<Long, String>, Long> result = new HashMap<>();
-
+
public static Map<KeyValPair<Long, String>, Long> getResult()
{
return result;
}
-
+
public final transient DefaultInputPort<PojoEvent> input = new DefaultInputPort<PojoEvent>()
{
@Override
@@ -137,7 +137,7 @@ public class WindowedWordCount implements StreamingApplication
}
};
}
-
+
/**
* A Pojo Tuple class used for outputting result to JDBC.
*/
@@ -146,44 +146,44 @@ public class WindowedWordCount implements StreamingApplication
private String word;
private long count;
private long timestamp;
-
+
@Override
public String toString()
{
return "PojoEvent (word=" + getWord() + ", count=" + getCount() + ", timestamp=" + getTimestamp() + ")";
}
-
+
public String getWord()
{
return word;
}
-
+
public void setWord(String word)
{
this.word = word;
}
-
+
public long getCount()
{
return count;
}
-
+
public void setCount(long count)
{
this.count = count;
}
-
+
public long getTimestamp()
{
return timestamp;
}
-
+
public void setTimestamp(long timestamp)
{
this.timestamp = timestamp;
}
}
-
+
/**
* A map function that wrap the input string with a random generated timestamp.
*/
@@ -191,12 +191,12 @@ public class WindowedWordCount implements StreamingApplication
{
private static final Duration RAND_RANGE = Duration.standardMinutes(10);
private final Long minTimestamp;
-
+
AddTimestampFn()
{
this.minTimestamp = System.currentTimeMillis();
}
-
+
@Override
public Tuple.TimestampedTuple<String> f(String input)
{
@@ -207,7 +207,7 @@ public class WindowedWordCount implements StreamingApplication
return new Tuple.TimestampedTuple<>(randomTimestamp, input);
}
}
-
+
/** A MapFunction that converts a Word and Count into a PojoEvent. */
public static class FormatAsTableRowFn implements Function.MapFunction<Tuple.WindowedTuple<KeyValPair<String, Long>>, PojoEvent>
{
@@ -221,7 +221,7 @@ public class WindowedWordCount implements StreamingApplication
return row;
}
}
-
+
/**
* Populate dag with High-Level API.
* @param dag
@@ -232,10 +232,10 @@ public class WindowedWordCount implements StreamingApplication
{
TextInput input = new TextInput();
Collector collector = new Collector();
-
+
// Create stream from the TextInput operator.
ApexStream<Tuple.TimestampedTuple<String>> stream = StreamFactory.fromInput(input, input.output, name("input"))
-
+
// Extract all the words from the input line of text.
.flatMap(new Function.FlatMapFunction<String, String>()
{
@@ -245,18 +245,18 @@ public class WindowedWordCount implements StreamingApplication
return Arrays.asList(input.split("[\\p{Punct}\\s]+"));
}
}, name("ExtractWords"))
-
+
// Wrap the word with a randomly generated timestamp.
.map(new AddTimestampFn(), name("AddTimestampFn"));
-
-
+
+
// apply window and trigger option.
// TODO: change trigger option to atWaterMark when available.
WindowedStream<Tuple.TimestampedTuple<String>> windowedWords = stream
.window(new WindowOption.TimeWindows(Duration.standardMinutes(WINDOW_SIZE)),
new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(1));
-
-
+
+
WindowedStream<PojoEvent> wordCounts =
// Perform a countByKey transformation to count the appearance of each word in every time window.
windowedWords.countByKey(new Function.ToKeyValue<Tuple.TimestampedTuple<String>, String, Long>()
@@ -268,10 +268,10 @@ public class WindowedWordCount implements StreamingApplication
new KeyValPair<String, Long>(input.getValue(), 1L));
}
}, name("count words"))
-
+
// Format the output and print out the result.
.map(new FormatAsTableRowFn(), name("FormatAsTableRowFn")).print();
-
+
wordCounts.endWith(collector, collector.input, name("Collector")).populateDag(dag);
}
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/AutoComplete.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/AutoComplete.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/AutoComplete.java
index 29c8cf9..00c40e7 100644
--- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/AutoComplete.java
+++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/AutoComplete.java
@@ -79,12 +79,12 @@ public class AutoComplete implements StreamingApplication
public final transient DefaultOutputPort<String> output = new DefaultOutputPort<>();
private transient BufferedReader reader;
-
+
public static boolean isDone()
{
return done;
}
-
+
@Override
public void setup(OperatorContext context)
{
@@ -128,16 +128,16 @@ public class AutoComplete implements StreamingApplication
}
}
}
-
+
public static class Collector extends BaseOperator
{
private static Map<String, List<CompletionCandidate>> result = new HashMap<>();
-
+
public static Map<String, List<CompletionCandidate>> getResult()
{
return result;
}
-
+
public final transient DefaultInputPort<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>> input = new DefaultInputPort<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>>()
{
@Override
@@ -193,7 +193,7 @@ public class AutoComplete implements StreamingApplication
@Override
public Tuple<KeyValPair<String, CompletionCandidate>> f(KeyValPair<String, CompletionCandidate> tuple)
{
- // TODO: Should be removed after Auto-wrapping is supported.
+ // TODO: Should be removed after Auto-wrapping is supported.
return new Tuple.WindowedTuple<>(Window.GLOBAL_WINDOW, tuple);
}
});
@@ -271,7 +271,8 @@ public class AutoComplete implements StreamingApplication
{
return new Tuple.PlainTuple<>(new KeyValPair<>(input, 1L));
}
- }, name("countByKey")).map(new Function.MapFunction<Tuple.WindowedTuple<KeyValPair<String,Long>>, CompletionCandidate>()
+ }, name("countByKey"))
+ .map(new Function.MapFunction<Tuple.WindowedTuple<KeyValPair<String,Long>>, CompletionCandidate>()
{
@Override
public CompletionCandidate f(Tuple.WindowedTuple<KeyValPair<String, Long>> input)
@@ -300,7 +301,7 @@ public class AutoComplete implements StreamingApplication
ApexStream<String> tags = StreamFactory.fromInput(input, input.output, name("tweetSampler"))
.flatMap(new ExtractHashtags());
-
+
tags.window(windowOption, new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(1))
.addCompositeStreams(ComputeTopCompletions.top(10, true)).endWith(collector, collector.input, name("collector"))
.populateDag(dag);
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/CompletionCandidate.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/CompletionCandidate.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/CompletionCandidate.java
index 8a7113e..5531b5e 100644
--- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/CompletionCandidate.java
+++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/CompletionCandidate.java
@@ -45,7 +45,7 @@ public class CompletionCandidate implements Comparable<CompletionCandidate>
// Empty constructor required for Kryo.
public CompletionCandidate()
{
-
+
}
@Override
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/PojoEvent.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/PojoEvent.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/PojoEvent.java
index 2a4c003..e7eb90c 100644
--- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/PojoEvent.java
+++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/PojoEvent.java
@@ -24,18 +24,18 @@ package org.apache.apex.malhar.stream.sample.complete;
public class PojoEvent extends Object
{
private String stringValue;
-
+
@Override
public String toString()
{
return "PojoEvent [stringValue=" + getStringValue() + "]";
}
-
+
public void setStringValue(String newString)
{
this.stringValue = newString;
}
-
+
public String getStringValue()
{
return this.stringValue;
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/StreamingWordExtract.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/StreamingWordExtract.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/StreamingWordExtract.java
index 2ffdc82..845901a 100644
--- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/StreamingWordExtract.java
+++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/StreamingWordExtract.java
@@ -45,17 +45,17 @@ public class StreamingWordExtract implements StreamingApplication
{
private static int wordCount = 0; // A counter to count number of words have been extracted.
private static int entriesMapped = 0; // A counter to count number of entries have been mapped.
-
+
public int getWordCount()
{
return wordCount;
}
-
+
public int getEntriesMapped()
{
return entriesMapped;
}
-
+
/**
* A MapFunction that tokenizes lines of text into individual words.
*/
@@ -69,8 +69,8 @@ public class StreamingWordExtract implements StreamingApplication
return result;
}
}
-
-
+
+
/**
* A MapFunction that uppercases a word.
*/
@@ -82,8 +82,8 @@ public class StreamingWordExtract implements StreamingApplication
return input.toUpperCase();
}
}
-
-
+
+
/**
* A filter function to filter out empty strings.
*/
@@ -95,14 +95,14 @@ public class StreamingWordExtract implements StreamingApplication
return !input.isEmpty();
}
}
-
-
+
+
/**
* A map function to map the result string to a pojo entry.
*/
public static class PojoMapper implements Function.MapFunction<String, Object>
{
-
+
@Override
public Object f(String input)
{
@@ -112,7 +112,7 @@ public class StreamingWordExtract implements StreamingApplication
return pojo;
}
}
-
+
/**
* Add field infos to the {@link JdbcPOJOInsertOutputOperator}.
*/
@@ -122,7 +122,7 @@ public class StreamingWordExtract implements StreamingApplication
fieldInfos.add(new JdbcFieldInfo("STRINGVALUE", "stringValue", JdbcFieldInfo.SupportType.STRING, VARCHAR));
return fieldInfos;
}
-
+
/**
* Populate dag with High-Level API.
* @param dag
@@ -136,25 +136,25 @@ public class StreamingWordExtract implements StreamingApplication
JdbcTransactionalStore outputStore = new JdbcTransactionalStore();
jdbcOutput.setStore(outputStore);
jdbcOutput.setTablename("TestTable");
-
+
// Create a stream reading from a folder.
ApexStream<String> stream = StreamFactory.fromFolder("./src/test/resources/data");
// Extract all the words from the input line of text.
stream.flatMap(new ExtractWords())
-
+
// Filter out the empty strings.
.filter(new EmptyStringFilter())
-
+
// Change every word to uppercase.
.map(new Uppercase())
-
+
// Map the resulted word to a Pojo entry.
.map(new PojoMapper())
-
+
// Output the entries to JdbcOutput and insert them into a table.
.endWith(jdbcOutput, jdbcOutput.input, Option.Options.name("jdbcOutput"));
-
+
stream.populateDag(dag);
}
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessions.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessions.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessions.java
index f2e70b1..d7d62fe 100644
--- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessions.java
+++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessions.java
@@ -63,23 +63,23 @@ public class TopWikipediaSessions implements StreamingApplication
{
private String[] names = new String[]{"user1", "user2", "user3", "user4"};
public transient DefaultOutputPort<KeyValPair<String, Long>> output = new DefaultOutputPort<>();
-
+
private static final Duration RAND_RANGE = Duration.standardDays(365);
private Long minTimestamp;
private long sleepTime;
private static int tupleCount = 0;
-
+
public static int getTupleCount()
{
return tupleCount;
}
-
+
private String randomName(String[] names)
{
int index = new Random().nextInt(names.length);
return names[index];
}
-
+
@Override
public void setup(Context.OperatorContext context)
{
@@ -88,7 +88,7 @@ public class TopWikipediaSessions implements StreamingApplication
minTimestamp = System.currentTimeMillis();
sleepTime = context.getValue(Context.OperatorContext.SPIN_MILLIS);
}
-
+
@Override
public void emitTuples()
{
@@ -103,17 +103,17 @@ public class TopWikipediaSessions implements StreamingApplication
}
}
}
-
+
public static class Collector extends BaseOperator
{
private final int resultSize = 5;
private static List<List<TempWrapper>> result = new ArrayList<>();
-
+
public static List<List<TempWrapper>> getResult()
{
return result;
}
-
+
public final transient DefaultInputPort<Tuple.WindowedTuple<List<TempWrapper>>> input = new DefaultInputPort<Tuple.WindowedTuple<List<TempWrapper>>>()
{
@Override
@@ -126,8 +126,8 @@ public class TopWikipediaSessions implements StreamingApplication
}
};
}
-
-
+
+
/**
* Convert the upstream (user, time) combination to a timestamped tuple of user.
*/
@@ -138,13 +138,13 @@ public class TopWikipediaSessions implements StreamingApplication
{
long timestamp = input.getValue();
String userName = input.getKey();
-
+
// Sets the implicit timestamp field to be used in windowing.
return new Tuple.TimestampedTuple<>(timestamp, userName);
-
+
}
}
-
+
/**
* Computes the number of edits in each user session. A session is defined as
* a string of edits where each is separated from the next by less than an hour.
@@ -156,10 +156,10 @@ public class TopWikipediaSessions implements StreamingApplication
public WindowedStream<Tuple.WindowedTuple<KeyValPair<String, Long>>> compose(ApexStream<Tuple.TimestampedTuple<String>> inputStream)
{
return inputStream
-
+
// Chuck the stream into session windows.
.window(new WindowOption.SessionWindows(Duration.standardHours(1)), new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(1))
-
+
// Count the number of edits for a user within one session.
.countByKey(new Function.ToKeyValue<Tuple.TimestampedTuple<String>, String, Long>()
{
@@ -171,7 +171,7 @@ public class TopWikipediaSessions implements StreamingApplication
}, name("ComputeSessions"));
}
}
-
+
/**
* A comparator class used for comparing two TempWrapper objects.
*/
@@ -183,7 +183,7 @@ public class TopWikipediaSessions implements StreamingApplication
return Long.compare(o1.getValue().getValue(), o2.getValue().getValue());
}
}
-
+
/**
* A function to extract timestamp from a TempWrapper object.
*/
@@ -196,7 +196,7 @@ public class TopWikipediaSessions implements StreamingApplication
return input.getTimestamp();
}
}
-
+
/**
* A temporary wrapper to wrap a KeyValPair and a timestamp together to represent a timestamped tuple, the reason
* for this is that we cannot resolve a type conflict when calling accumulate(). After the issue resolved, we can
@@ -206,39 +206,39 @@ public class TopWikipediaSessions implements StreamingApplication
{
private KeyValPair<String, Long> value;
private Long timestamp;
-
+
public TempWrapper()
{
-
+
}
-
+
public TempWrapper(KeyValPair<String, Long> value, Long timestamp)
{
this.value = value;
this.timestamp = timestamp;
}
-
+
@Override
public String toString()
{
return this.value + " - " + this.timestamp;
}
-
+
public Long getTimestamp()
{
return timestamp;
}
-
+
public void setTimestamp(Long timestamp)
{
this.timestamp = timestamp;
}
-
+
public KeyValPair<String, Long> getValue()
{
return value;
}
-
+
public void setValue(KeyValPair<String, Long> value)
{
this.value = value;
@@ -251,16 +251,16 @@ public class TopWikipediaSessions implements StreamingApplication
private static class TopPerMonth
extends CompositeStreamTransform<ApexStream<Tuple.WindowedTuple<KeyValPair<String, Long>>>, WindowedStream<Tuple.WindowedTuple<List<TempWrapper>>>>
{
-
+
@Override
public WindowedStream<Tuple.WindowedTuple<List<TempWrapper>>> compose(ApexStream<Tuple.WindowedTuple<KeyValPair<String, Long>>> inputStream)
{
TopN<TempWrapper> topN = new TopN<>();
topN.setN(10);
topN.setComparator(new Comp());
-
+
return inputStream
-
+
// Map the input WindowedTuple to a TempWrapper object.
.map(new Function.MapFunction<Tuple.WindowedTuple<KeyValPair<String, Long>>, TempWrapper>()
{
@@ -270,15 +270,15 @@ public class TopWikipediaSessions implements StreamingApplication
return new TempWrapper(input.getValue(), input.getWindows().get(0).getBeginTimestamp());
}
}, name("TempWrapper"))
-
+
// Apply window and trigger option again, this time chuck the stream into fixed time windows.
.window(new WindowOption.TimeWindows(Duration.standardDays(30)), new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(Duration.standardSeconds(5)))
-
+
// Compute the top 10 user-sessions with most number of edits.
.accumulate(topN, name("TopN")).with("timestampExtractor", new TimestampExtractor());
}
}
-
+
/**
* A map function that combine the user and his/her edit session together to a string and use that string as a key
* with number of edits in that session as value to create a new key value pair to send to downstream.
@@ -293,7 +293,7 @@ public class TopWikipediaSessions implements StreamingApplication
input.getValue().getValue()));
}
}
-
+
/**
* A flapmap function that turns the result into readable format.
*/
@@ -311,7 +311,7 @@ public class TopWikipediaSessions implements StreamingApplication
return result;
}
}
-
+
/**
* A composite transform that compute the top wikipedia sessions.
*/
@@ -327,7 +327,7 @@ public class TopWikipediaSessions implements StreamingApplication
.addCompositeStreams(new TopPerMonth());
}
}
-
+
@Override
public void populateDAG(DAG dag, Configuration conf)
{
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutes.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutes.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutes.java
index 26a2823..3045238 100644
--- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutes.java
+++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutes.java
@@ -63,7 +63,7 @@ public class TrafficRoutes implements StreamingApplication
static Map<String, String> sdStations = buildStationInfo();
static final int WINDOW_DURATION = 3; // Default sliding window duration in minutes
static final int WINDOW_SLIDE_EVERY = 1; // Default window 'slide every' setting in minutes
-
+
/**
* This class holds information about a station reading's average speed.
*/
@@ -75,54 +75,54 @@ public class TrafficRoutes implements StreamingApplication
Double avgSpeed;
@Nullable
Long timestamp;
-
+
public StationSpeed() {}
-
+
public StationSpeed(String stationId, Double avgSpeed, Long timestamp)
{
this.stationId = stationId;
this.avgSpeed = avgSpeed;
this.timestamp = timestamp;
}
-
+
public void setAvgSpeed(@Nullable Double avgSpeed)
{
this.avgSpeed = avgSpeed;
}
-
+
public void setStationId(@Nullable String stationId)
{
this.stationId = stationId;
}
-
+
public void setTimestamp(@Nullable Long timestamp)
{
this.timestamp = timestamp;
}
-
+
@Nullable
public Long getTimestamp()
{
return timestamp;
}
-
+
public String getStationId()
{
return this.stationId;
}
-
+
public Double getAvgSpeed()
{
return this.avgSpeed;
}
-
+
@Override
public int compareTo(StationSpeed other)
{
return Long.compare(this.timestamp, other.timestamp);
}
}
-
+
/**
* This class holds information about a route's speed/slowdown.
*/
@@ -134,63 +134,63 @@ public class TrafficRoutes implements StreamingApplication
Double avgSpeed;
@Nullable
Boolean slowdownEvent;
-
+
public RouteInfo()
{
-
+
}
-
+
public RouteInfo(String route, Double avgSpeed, Boolean slowdownEvent)
{
this.route = route;
this.avgSpeed = avgSpeed;
this.slowdownEvent = slowdownEvent;
}
-
+
public String getRoute()
{
return this.route;
}
-
+
public Double getAvgSpeed()
{
return this.avgSpeed;
}
-
+
public Boolean getSlowdownEvent()
{
return this.slowdownEvent;
}
}
-
+
/**
* Extract the timestamp field from the input string, and wrap the input string in a {@link Tuple.TimestampedTuple}
* with the extracted timestamp.
*/
static class ExtractTimestamps implements Function.MapFunction<String, Tuple.TimestampedTuple<String>>
{
-
+
@Override
public Tuple.TimestampedTuple<String> f(String input)
{
String[] items = input.split(",");
String timestamp = tryParseTimestamp(items);
-
+
return new Tuple.TimestampedTuple<>(Long.parseLong(timestamp), input);
}
}
-
+
/**
* Filter out readings for the stations along predefined 'routes', and output
* (station, speed info) keyed on route.
*/
static class ExtractStationSpeedFn implements Function.FlatMapFunction<Tuple.TimestampedTuple<String>, KeyValPair<String, StationSpeed>>
{
-
+
@Override
public Iterable<KeyValPair<String, StationSpeed>> f(Tuple.TimestampedTuple<String> input)
{
-
+
ArrayList<KeyValPair<String, StationSpeed>> result = new ArrayList<>();
String[] items = input.getValue().split(",");
String stationType = tryParseStationType(items);
@@ -210,7 +210,7 @@ public class TrafficRoutes implements StreamingApplication
return result;
}
}
-
+
/**
* For a given route, track average speed for the window. Calculate whether
* traffic is currently slowing down, via a predefined threshold. If a supermajority of
@@ -261,7 +261,7 @@ public class TrafficRoutes implements StreamingApplication
return result;
}
}
-
+
/**
* Output Pojo class for outputting result to JDBC.
*/
@@ -271,11 +271,11 @@ public class TrafficRoutes implements StreamingApplication
private Boolean slowdownEvent;
private String key;
private Long timestamp;
-
+
public OutputPojo()
{
}
-
+
public OutputPojo(Double avgSpeed, Boolean slowdownEvent, String key, Long timestamp)
{
this.avgSpeed = avgSpeed;
@@ -283,64 +283,64 @@ public class TrafficRoutes implements StreamingApplication
this.key = key;
this.timestamp = timestamp;
}
-
+
@Override
public String toString()
{
return key + " + " + avgSpeed + " + " + slowdownEvent + " + " + timestamp;
}
-
+
public void setTimestamp(Long timestamp)
{
this.timestamp = timestamp;
}
-
+
public Long getTimestamp()
{
return timestamp;
}
-
+
public void setAvgSpeed(Double avgSpeed)
{
this.avgSpeed = avgSpeed;
}
-
+
public Double getAvgSpeed()
{
return avgSpeed;
}
-
+
public void setKey(String key)
{
this.key = key;
}
-
+
public String getKey()
{
return key;
}
-
+
public void setSlowdownEvent(Boolean slowdownEvent)
{
this.slowdownEvent = slowdownEvent;
}
-
+
public Boolean getSlowdownEvent()
{
return slowdownEvent;
}
-
+
}
-
+
public static class Collector extends BaseOperator
{
private static Map<KeyValPair<Long, String>, KeyValPair<Double, Boolean>> result = new HashMap<>();
-
+
public static Map<KeyValPair<Long, String>, KeyValPair<Double, Boolean>> getResult()
{
return result;
}
-
+
public final transient DefaultInputPort<OutputPojo> input = new DefaultInputPort<OutputPojo>()
{
@Override
@@ -350,7 +350,7 @@ public class TrafficRoutes implements StreamingApplication
}
};
}
-
+
/**
* Format the results of the slowdown calculations to a OutputPojo.
*/
@@ -364,8 +364,8 @@ public class TrafficRoutes implements StreamingApplication
return row;
}
}
-
-
+
+
/**
* This composite transformation extracts speed info from traffic station readings.
* It groups the readings by 'route' and analyzes traffic slowdown for that route.
@@ -389,19 +389,19 @@ public class TrafficRoutes implements StreamingApplication
return new Tuple.TimestampedTuple<>(input.getValue().getTimestamp(), input);
}
}, name("GroupByKey"));
-
+
// Analyze 'slowdown' over the route readings.
WindowedStream<Tuple.TimestampedTuple<KeyValPair<String, RouteInfo>>> stats = timeGroup
.flatMap(new GatherStats(), name("GatherStats"));
-
+
// Format the results for writing to JDBC table.
WindowedStream<OutputPojo> results = stats.map(new FormatStatsFn(), name("FormatStatsFn"));
-
+
return results;
}
}
-
-
+
+
private static Double tryParseAvgSpeed(String[] inputItems)
{
try {
@@ -412,27 +412,27 @@ public class TrafficRoutes implements StreamingApplication
return null;
}
}
-
+
private static String tryParseStationType(String[] inputItems)
{
return tryParseString(inputItems, 2);
}
-
+
private static String tryParseStationId(String[] inputItems)
{
return tryParseString(inputItems, 1);
}
-
+
private static String tryParseTimestamp(String[] inputItems)
{
return tryParseString(inputItems, 0);
}
-
+
private static String tryParseString(String[] inputItems, int index)
{
return inputItems.length >= index ? inputItems[index] : null;
}
-
+
/**
* Define some small hard-wired San Diego 'routes' to track based on sensor station ID.
*/
@@ -444,33 +444,33 @@ public class TrafficRoutes implements StreamingApplication
stations.put("1108702", "SDRoute2");
return stations;
}
-
+
/**
* A dummy generator to generate some traffic information.
*/
public static class InfoGen extends BaseOperator implements InputOperator
{
public transient DefaultOutputPort<String> output = new DefaultOutputPort<>();
-
+
private String[] stationTypes = new String[]{"ML", "BL", "GL"};
private int[] stationIDs = new int[]{1108413, 1108699, 1108702};
private double ave = 55.0;
private long timestamp;
private static final Duration RAND_RANGE = Duration.standardMinutes(10);
private static int tupleCount = 0;
-
+
public static int getTupleCount()
{
return tupleCount;
}
-
+
@Override
public void setup(Context.OperatorContext context)
{
tupleCount = 0;
timestamp = System.currentTimeMillis();
}
-
+
@Override
public void emitTuples()
{
@@ -481,7 +481,7 @@ public class TrafficRoutes implements StreamingApplication
try {
output.emit(time + "," + stationID + "," + stationType + "," + speed);
tupleCount++;
-
+
Thread.sleep(50);
} catch (Exception e) {
// Ignore it
@@ -490,29 +490,29 @@ public class TrafficRoutes implements StreamingApplication
}
}
}
-
+
@Override
public void populateDAG(DAG dag, Configuration conf)
{
InfoGen infoGen = new InfoGen();
Collector collector = new Collector();
-
+
// Create a stream from the input operator.
ApexStream<Tuple.TimestampedTuple<String>> stream = StreamFactory.fromInput(infoGen, infoGen.output, name("infoGen"))
-
+
// Extract the timestamp from the input and wrap it into a TimestampedTuple.
.map(new ExtractTimestamps(), name("ExtractTimestamps"));
-
+
stream
// Extract the average speed of a station.
.flatMap(new ExtractStationSpeedFn(), name("ExtractStationSpeedFn"))
-
+
// Apply window and trigger option.
.window(new WindowOption.SlidingTimeWindows(Duration.standardMinutes(WINDOW_DURATION), Duration.standardMinutes(WINDOW_SLIDE_EVERY)), new TriggerOption().withEarlyFiringsAtEvery(Duration.millis(5000)).accumulatingFiredPanes())
-
+
// Apply TrackSpeed composite transformation to compute the route information.
.addCompositeStreams(new TrackSpeed())
-
+
// print the result to console.
.print()
.endWith(collector, collector.input, name("Collector"))
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TwitterAutoComplete.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TwitterAutoComplete.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TwitterAutoComplete.java
index ecad622..a4fdf24 100644
--- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TwitterAutoComplete.java
+++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TwitterAutoComplete.java
@@ -200,7 +200,8 @@ public class TwitterAutoComplete implements StreamingApplication
{
return new Tuple.PlainTuple<>(new KeyValPair<>(input, 1L));
}
- }, name("Hashtag Count")).map(new Function.MapFunction<Tuple.WindowedTuple<KeyValPair<String,Long>>, CompletionCandidate>()
+ }, name("Hashtag Count"))
+ .map(new Function.MapFunction<Tuple.WindowedTuple<KeyValPair<String,Long>>, CompletionCandidate>()
{
@Override
public CompletionCandidate f(Tuple.WindowedTuple<KeyValPair<String, Long>> input)
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamples.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamples.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamples.java
index d88a8dc..7c16521 100644
--- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamples.java
+++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamples.java
@@ -94,7 +94,7 @@ public class CombinePerKeyExamples implements StreamingApplication
return new SampleBean(input.getValue().getKey(), input.getValue().getValue());
}
}
-
+
/**
* A reduce function to concat two strings together.
*/
@@ -106,7 +106,7 @@ public class CombinePerKeyExamples implements StreamingApplication
return input1 + ", " + input2;
}
}
-
+
/**
* Reads the public 'Shakespeare' data, and for each word in the dataset
* over a given length, generates a string containing the list of play names
@@ -114,17 +114,17 @@ public class CombinePerKeyExamples implements StreamingApplication
*/
private static class PlaysForWord extends CompositeStreamTransform<ApexStream<SampleBean>, WindowedStream<SampleBean>>
{
-
+
@Override
public WindowedStream<SampleBean> compose(ApexStream<SampleBean> inputStream)
{
return inputStream
// Extract words from the input SampleBeam stream.
.map(new ExtractLargeWordsFn(), name("ExtractLargeWordsFn"))
-
+
// Apply window and trigger option to the streams.
.window(new WindowOption.GlobalWindow(), new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(1))
-
+
// Apply reduceByKey transformation to concat the names of all the plays that a word has appeared in together.
.reduceByKey(new Concat(), new Function.ToKeyValue<KeyValPair<String,String>, String, String>()
{
@@ -134,13 +134,13 @@ public class CombinePerKeyExamples implements StreamingApplication
return new Tuple.PlainTuple<KeyValPair<String, String>>(input);
}
}, name("Concat"))
-
+
// Format the output back to a SampleBeam object.
.map(new FormatShakespeareOutputFn(), name("FormatShakespeareOutputFn"));
}
}
-
-
+
+
/**
* A Java Beam class that contains information about a word appears in a corpus written by Shakespeare.
*/
@@ -157,13 +157,13 @@ public class CombinePerKeyExamples implements StreamingApplication
this.word = word;
this.corpus = corpus;
}
-
+
@Override
public String toString()
{
return this.word + " : " + this.corpus;
}
-
+
private String word;
private String corpus;
@@ -188,7 +188,7 @@ public class CombinePerKeyExamples implements StreamingApplication
return corpus;
}
}
-
+
/**
* A dummy info generator to generate {@link SampleBean} objects to mimic reading from real 'Shakespeare'
* data.
@@ -200,19 +200,19 @@ public class CombinePerKeyExamples implements StreamingApplication
private String[] words = new String[]{"A", "B", "C", "D", "E", "F", "G"};
private String[] corpuses = new String[]{"1", "2", "3", "4", "5", "6", "7", "8"};
private static int i;
-
+
public static int getI()
{
return i;
}
-
+
@Override
public void setup(Context.OperatorContext context)
{
super.setup(context);
i = 0;
}
-
+
@Override
public void emitTuples()
{
@@ -229,20 +229,20 @@ public class CombinePerKeyExamples implements StreamingApplication
}
i++;
}
-
+
}
}
-
+
public static class Collector extends BaseOperator
{
static List<SampleBean> result;
-
+
@Override
public void setup(Context.OperatorContext context)
{
result = new ArrayList<>();
}
-
+
public final transient DefaultInputPort<SampleBean> input = new DefaultInputPort<SampleBean>()
{
@Override
@@ -252,7 +252,7 @@ public class CombinePerKeyExamples implements StreamingApplication
}
};
}
-
+
/**
* Populate dag using High-Level API.
* @param dag
@@ -268,6 +268,6 @@ public class CombinePerKeyExamples implements StreamingApplication
.print()
.endWith(collector, collector.input, name("Collector"))
.populateDag(dag);
-
+
}
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExample.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExample.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExample.java
index 2930010..0cd7c58 100644
--- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExample.java
+++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExample.java
@@ -47,22 +47,22 @@ import static org.apache.apex.malhar.stream.api.Option.Options.name;
@ApplicationAnnotation(name = "DeDupExample")
public class DeDupExample implements StreamingApplication
{
-
+
public static class Collector extends BaseOperator
{
private static Tuple.WindowedTuple<List<String>> result;
private static boolean done = false;
-
+
public static Tuple.WindowedTuple<List<String>> getResult()
{
return result;
}
-
+
public static boolean isDone()
{
return done;
}
-
+
@Override
public void setup(Context.OperatorContext context)
{
@@ -70,7 +70,7 @@ public class DeDupExample implements StreamingApplication
result = new Tuple.WindowedTuple<>();
done = false;
}
-
+
public transient DefaultInputPort<Tuple.WindowedTuple<List<String>>> input = new DefaultInputPort<Tuple.WindowedTuple<List<String>>>()
{
@Override
@@ -83,15 +83,15 @@ public class DeDupExample implements StreamingApplication
}
};
}
-
+
@Override
public void populateDAG(DAG dag, Configuration conf)
{
Collector collector = new Collector();
-
+
// Create a stream that reads from files in a local folder and output lines one by one to downstream.
ApexStream<String> stream = StreamFactory.fromFolder("./src/test/resources/wordcount", name("textInput"))
-
+
// Extract all the words from the input line of text.
.flatMap(new Function.FlatMapFunction<String, String>()
{
@@ -101,7 +101,7 @@ public class DeDupExample implements StreamingApplication
return Arrays.asList(input.split("[\\p{Punct}\\s]+"));
}
}, name("ExtractWords"))
-
+
// Change the words to lower case, also shutdown the app when the word "bye" is detected.
.map(new Function.MapFunction<String, String>()
{
@@ -111,14 +111,14 @@ public class DeDupExample implements StreamingApplication
return input.toLowerCase();
}
}, name("ToLowerCase"));
-
+
// Apply window and trigger option.
stream.window(new WindowOption.GlobalWindow(),
new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(Duration.standardSeconds(1)))
-
+
// Remove the duplicate words and print out the result.
.accumulate(new RemoveDuplicates<String>(), name("RemoveDuplicates")).print().endWith(collector, collector.input)
-
+
.populateDag(dag);
}
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/InputPojo.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/InputPojo.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/InputPojo.java
index 3643eab..1ba2a90 100644
--- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/InputPojo.java
+++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/InputPojo.java
@@ -27,48 +27,48 @@ public class InputPojo extends Object
private int day;
private int year;
private double meanTemp;
-
+
@Override
public String toString()
{
return "PojoEvent [month=" + getMonth() + ", day=" + getDay() + ", year=" + getYear() + ", meanTemp=" + getMeanTemp() + "]";
}
-
+
public void setMonth(int month)
{
this.month = month;
}
-
+
public int getMonth()
{
return this.month;
}
-
+
public void setDay(int day)
{
this.day = day;
}
-
+
public int getDay()
{
return day;
}
-
+
public void setYear(int year)
{
this.year = year;
}
-
+
public int getYear()
{
return year;
}
-
+
public void setMeanTemp(double meanTemp)
{
this.meanTemp = meanTemp;
}
-
+
public double getMeanTemp()
{
return meanTemp;
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamples.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamples.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamples.java
index 02980e4..4538aef 100644
--- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamples.java
+++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamples.java
@@ -55,7 +55,7 @@ import static org.apache.apex.malhar.stream.api.Option.Options.name;
@ApplicationAnnotation(name = "MaxPerKeyExamples")
public class MaxPerKeyExamples implements StreamingApplication
{
-
+
/**
* A map function to extract the mean temperature from {@link InputPojo}.
*/
@@ -69,8 +69,8 @@ public class MaxPerKeyExamples implements StreamingApplication
return new KeyValPair<Integer, Double>(month, meanTemp);
}
}
-
-
+
+
/**
* A map function to format output to {@link OutputPojo}.
*/
@@ -85,7 +85,7 @@ public class MaxPerKeyExamples implements StreamingApplication
return row;
}
}
-
+
/**
* A composite transformation to perform three tasks:
* 1. extract the month and its mean temperature from input pojo.
@@ -99,7 +99,7 @@ public class MaxPerKeyExamples implements StreamingApplication
{
// InputPojo... => <month, meanTemp> ...
WindowedStream<KeyValPair<Integer, Double>> temps = rows.map(new ExtractTempFn(), name("ExtractTempFn"));
-
+
// month, meanTemp... => <month, max mean temp>...
WindowedStream<Tuple.WindowedTuple<KeyValPair<Integer, Double>>> tempMaxes =
temps.accumulateByKey(new Max<Double>(),
@@ -111,14 +111,14 @@ public class MaxPerKeyExamples implements StreamingApplication
return new Tuple.WindowedTuple<KeyValPair<Integer, Double>>(Window.GLOBAL_WINDOW, input);
}
}, name("MaxPerMonth"));
-
+
// <month, max>... => OutputPojo...
WindowedStream<OutputPojo> results = tempMaxes.map(new FormatMaxesFn(), name("FormatMaxesFn"));
-
+
return results;
}
}
-
+
/**
* Method to set field info for {@link JdbcPOJOInputOperator}.
* @return
@@ -132,7 +132,7 @@ public class MaxPerKeyExamples implements StreamingApplication
fieldInfos.add(new FieldInfo("MEANTEMP", "meanTemp", FieldInfo.SupportType.DOUBLE));
return fieldInfos;
}
-
+
/**
* Method to set field info for {@link JdbcPOJOInsertOutputOperator}.
* @return
@@ -144,8 +144,8 @@ public class MaxPerKeyExamples implements StreamingApplication
fieldInfos.add(new JdbcFieldInfo("MEANTEMP", "meanTemp", JdbcFieldInfo.SupportType.DOUBLE, DOUBLE));
return fieldInfos;
}
-
-
+
+
/**
* Populate the dag using High-Level API.
* @param dag
@@ -156,21 +156,21 @@ public class MaxPerKeyExamples implements StreamingApplication
{
JdbcPOJOInputOperator jdbcInput = new JdbcPOJOInputOperator();
jdbcInput.setFieldInfos(addInputFieldInfos());
-
+
JdbcStore store = new JdbcStore();
jdbcInput.setStore(store);
-
+
JdbcPOJOInsertOutputOperator jdbcOutput = new JdbcPOJOInsertOutputOperator();
jdbcOutput.setFieldInfos(addOutputFieldInfos());
JdbcTransactionalStore outputStore = new JdbcTransactionalStore();
jdbcOutput.setStore(outputStore);
-
+
// Create stream that reads from a Jdbc Input.
ApexStream<Object> stream = StreamFactory.fromInput(jdbcInput, jdbcInput.outputPort, name("jdbcInput"))
-
+
// Apply window and trigger option to the stream.
.window(new WindowOption.GlobalWindow(), new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(1))
-
+
// Because Jdbc Input sends out a stream of Object, need to cast them to InputPojo.
.map(new Function.MapFunction<Object, InputPojo>()
{
@@ -180,10 +180,10 @@ public class MaxPerKeyExamples implements StreamingApplication
return (InputPojo)input;
}
}, name("ObjectToInputPojo"))
-
+
// Plug in the composite transformation to the stream to calculate the maximum temperature for each month.
.addCompositeStreams(new MaxMeanTemp())
-
+
// Cast the resulted OutputPojo to Object for Jdbc Output to consume.
.map(new Function.MapFunction<OutputPojo, Object>()
{
@@ -193,11 +193,11 @@ public class MaxPerKeyExamples implements StreamingApplication
return (Object)input;
}
}, name("OutputPojoToObject"))
-
+
// Output the result to Jdbc Output.
.endWith(jdbcOutput, jdbcOutput.input, name("jdbcOutput"));
-
+
stream.populateDag(dag);
-
+
}
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/OutputPojo.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/OutputPojo.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/OutputPojo.java
index db2a09e..59831b7 100644
--- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/OutputPojo.java
+++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/OutputPojo.java
@@ -25,28 +25,28 @@ public class OutputPojo
{
private int month;
private double meanTemp;
-
+
@Override
public String toString()
{
return "PojoEvent [month=" + getMonth() + ", meanTemp=" + getMeanTemp() + "]";
}
-
+
public void setMonth(int month)
{
this.month = month;
}
-
+
public int getMonth()
{
return this.month;
}
-
+
public void setMeanTemp(double meanTemp)
{
this.meanTemp = meanTemp;
}
-
+
public double getMeanTemp()
{
return meanTemp;
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/TriggerExample.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/TriggerExample.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/TriggerExample.java
index bf23e3a..dd09352 100644
--- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/TriggerExample.java
+++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/TriggerExample.java
@@ -189,7 +189,7 @@ public class TriggerExample
// At 11:03:00 (processing time) the system watermark may have advanced to 10:54:00. As a
// result, when the data record with event time 10:05:00 arrives at 11:03:00, it is considered
// late, and dropped.
-
+
WindowedStream<SampleBean> defaultTriggerResults = inputStream
.window(new WindowOption.TimeWindows(Duration.standardMinutes(windowDuration)),
new TriggerOption().discardingFiredPanes())
@@ -306,7 +306,7 @@ public class TriggerExample
@Override
public WindowedStream<SampleBean> compose(WindowedStream<String> inputStream)
{
-
+
WindowedStream<KeyValPair<String, Iterable<Integer>>> flowPerFreeway = inputStream
.groupByKey(new ExtractFlowInfo());
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/MinimalWordCountTest.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/MinimalWordCountTest.java b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/MinimalWordCountTest.java
index 101953f..d32da72 100644
--- a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/MinimalWordCountTest.java
+++ b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/MinimalWordCountTest.java
@@ -37,11 +37,11 @@ public class MinimalWordCountTest
{
LocalMode lma = LocalMode.newInstance();
Configuration conf = new Configuration(false);
-
+
MinimalWordCount app = new MinimalWordCount();
lma.prepareDAG(app, conf);
-
+
LocalMode.Controller lc = lma.getController();
((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>()
{
@@ -51,9 +51,9 @@ public class MinimalWordCountTest
return MinimalWordCount.Collector.isDone();
}
});
-
+
lc.run(10000);
-
+
Assert.assertTrue(MinimalWordCount.Collector.result.get("error") == 7);
Assert.assertTrue(MinimalWordCount.Collector.result.get("word") == 119);
Assert.assertTrue(MinimalWordCount.Collector.result.get("bye") == 1);
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/WindowedWordCountTest.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/WindowedWordCountTest.java b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/WindowedWordCountTest.java
index 952356f..f6270d4 100644
--- a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/WindowedWordCountTest.java
+++ b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/WindowedWordCountTest.java
@@ -56,16 +56,16 @@ public class WindowedWordCountTest
return WindowedWordCount.TextInput.isDone();
}
});
-
+
lc.run(60000);
-
+
Assert.assertEquals(127, countSum(WindowedWordCount.Collector.getResult()));
Assert.assertEquals(28, countSumWord(WindowedWordCount.Collector.getResult(), "word2"));
Assert.assertEquals(7, countSumWord(WindowedWordCount.Collector.getResult(), "error"));
Assert.assertEquals(21, countSumWord(WindowedWordCount.Collector.getResult(), "word9"));
Assert.assertEquals(1, countSumWord(WindowedWordCount.Collector.getResult(), "bye"));
}
-
+
public long countSum(Map<KeyValPair<Long, String>, Long> map)
{
long sum = 0;
@@ -74,7 +74,7 @@ public class WindowedWordCountTest
}
return sum;
}
-
+
public long countSumWord(Map<KeyValPair<Long, String>, Long> map, String word)
{
long sum = 0;
@@ -85,6 +85,6 @@ public class WindowedWordCountTest
}
return sum;
}
-
+
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/AutoCompleteTest.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/AutoCompleteTest.java b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/AutoCompleteTest.java
index dc236f9..26bb13e 100644
--- a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/AutoCompleteTest.java
+++ b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/AutoCompleteTest.java
@@ -41,7 +41,7 @@ public class AutoCompleteTest
Configuration conf = new Configuration(false);
lma.prepareDAG(new AutoComplete(), conf);
LocalMode.Controller lc = lma.getController();
-
+
((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>()
{
@Override
@@ -50,9 +50,9 @@ public class AutoCompleteTest
return AutoComplete.TweetsInput.isDone();
}
});
-
+
lc.run(200000);
-
+
Assert.assertTrue(AutoComplete.Collector.getResult().containsKey("chi"));
Assert.assertTrue(AutoComplete.Collector.getResult().containsKey("china"));
Assert.assertEquals(2, AutoComplete.Collector.getResult().get("china").get(0).getCount());
@@ -61,6 +61,6 @@ public class AutoCompleteTest
Assert.assertEquals(3, AutoComplete.Collector.getResult().get("f").size());
Assert.assertTrue(AutoComplete.Collector.getResult().get("f").get(0).getCount() >= AutoComplete.Collector.getResult().get("f").get(1).getCount());
Assert.assertTrue(AutoComplete.Collector.getResult().get("f").get(1).getCount() >= AutoComplete.Collector.getResult().get("f").get(2).getCount());
-
+
}
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/StreamingWordExtractTest.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/StreamingWordExtractTest.java b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/StreamingWordExtractTest.java
index bf9b030..dc9cdec 100644
--- a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/StreamingWordExtractTest.java
+++ b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/StreamingWordExtractTest.java
@@ -54,10 +54,10 @@ public class StreamingWordExtractTest
{
try {
Class.forName(DB_DRIVER).newInstance();
-
+
Connection con = DriverManager.getConnection(DB_URL,USER_NAME,PSW);
Statement stmt = con.createStatement();
-
+
String createMetaTable = "CREATE TABLE IF NOT EXISTS " + JdbcTransactionalStore.DEFAULT_META_TABLE + " ( "
+ JdbcTransactionalStore.DEFAULT_APP_ID_COL + " VARCHAR(100) NOT NULL, "
+ JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + " INT NOT NULL, "
@@ -66,16 +66,16 @@ public class StreamingWordExtractTest
+ JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + ", " + JdbcTransactionalStore.DEFAULT_WINDOW_COL + ") "
+ ")";
stmt.executeUpdate(createMetaTable);
-
+
String createTable = "CREATE TABLE IF NOT EXISTS " + TABLE_NAME
+ "(STRINGVALUE VARCHAR(255))";
stmt.executeUpdate(createTable);
-
+
} catch (Throwable e) {
throw Throwables.propagate(e);
}
}
-
+
@After
public void cleanTable()
{
@@ -88,7 +88,7 @@ public class StreamingWordExtractTest
throw new RuntimeException(e);
}
}
-
+
public void setConfig(Configuration conf)
{
conf.set("dt.operator.jdbcOutput.prop.store.userName", USER_NAME);
@@ -99,14 +99,14 @@ public class StreamingWordExtractTest
conf.set("dt.operator.jdbcOutput.prop.store.databaseUrl", DB_URL);
conf.set("dt.operator.jdbcOutput.prop.tablename", TABLE_NAME);
}
-
+
public int getNumOfEventsInStore()
{
Connection con;
try {
con = DriverManager.getConnection(DB_URL,USER_NAME,PSW);
Statement stmt = con.createStatement();
-
+
String countQuery = "SELECT count(*) from " + TABLE_NAME;
ResultSet resultSet = stmt.executeQuery(countQuery);
resultSet.next();
@@ -115,7 +115,7 @@ public class StreamingWordExtractTest
throw new RuntimeException("fetching count", e);
}
}
-
+
@Test
public void StreamingWordExtractTest() throws Exception
{
@@ -125,7 +125,7 @@ public class StreamingWordExtractTest
StreamingWordExtract app = new StreamingWordExtract();
lma.prepareDAG(app, conf);
LocalMode.Controller lc = lma.getController();
-
+
((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>()
{
@Override
@@ -134,11 +134,11 @@ public class StreamingWordExtractTest
return getNumOfEventsInStore() == 36;
}
});
-
+
lc.run(10000);
-
+
Assert.assertEquals(app.getWordCount(), getNumOfEventsInStore());
Assert.assertEquals(app.getEntriesMapped(), getNumOfEventsInStore());
}
-
+
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessionsTest.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessionsTest.java b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessionsTest.java
index f8ec086..c0dbaf4 100644
--- a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessionsTest.java
+++ b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessionsTest.java
@@ -41,7 +41,7 @@ public class TopWikipediaSessionsTest
Configuration conf = new Configuration(false);
lma.prepareDAG(new TopWikipediaSessions(), conf);
LocalMode.Controller lc = lma.getController();
-
+
((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>()
{
@Override
@@ -50,14 +50,14 @@ public class TopWikipediaSessionsTest
return TopWikipediaSessions.SessionGen.getTupleCount() >= 250;
}
});
-
+
lc.run(30000);
-
+
for (int i = 0; i < TopWikipediaSessions.Collector.getResult().size(); i++) {
Assert.assertTrue(isInOrder(TopWikipediaSessions.Collector.getResult().get(i)));
}
}
-
+
public boolean isInOrder(List<TopWikipediaSessions.TempWrapper> input)
{
if (input.size() == 0 || input.size() == 1) {
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutesTest.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutesTest.java b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutesTest.java
index e363ca7..c532898 100644
--- a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutesTest.java
+++ b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutesTest.java
@@ -43,7 +43,7 @@ public class TrafficRoutesTest
Configuration conf = new Configuration(false);
lma.prepareDAG(new TrafficRoutes(), conf);
LocalMode.Controller lc = lma.getController();
-
+
((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>()
{
@Override
@@ -52,9 +52,9 @@ public class TrafficRoutesTest
return TrafficRoutes.InfoGen.getTupleCount() >= 100;
}
});
-
+
lc.run(60000);
-
+
Assert.assertTrue(!TrafficRoutes.Collector.getResult().isEmpty());
for (Map.Entry<KeyValPair<Long, String>, KeyValPair<Double, Boolean>> entry : TrafficRoutes.Collector.getResult().entrySet()) {
Assert.assertTrue(entry.getValue().getKey() <= 75);
@@ -62,5 +62,5 @@ public class TrafficRoutesTest
Assert.assertTrue(entry.getKey().getValue().equals("SDRoute1") || entry.getKey().getValue().equals("SDRoute2"));
}
}
-
+
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamplesTest.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamplesTest.java b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamplesTest.java
index 5858013..b130808 100644
--- a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamplesTest.java
+++ b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamplesTest.java
@@ -35,11 +35,11 @@ public class CombinePerKeyExamplesTest
{
LocalMode lma = LocalMode.newInstance();
Configuration conf = new Configuration(false);
-
+
CombinePerKeyExamples app = new CombinePerKeyExamples();
-
+
lma.prepareDAG(app, conf);
-
+
LocalMode.Controller lc = lma.getController();
((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>()
{
@@ -50,7 +50,7 @@ public class CombinePerKeyExamplesTest
}
});
lc.run(100000);
-
+
Assert.assertTrue(CombinePerKeyExamples.Collector.result.get(CombinePerKeyExamples.Collector.result.size() - 1).getCorpus().contains("1, 2, 3, 4, 5, 6, 7, 8"));
}
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExampleTest.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExampleTest.java b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExampleTest.java
index ed4ddb4..a175cd7 100644
--- a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExampleTest.java
+++ b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExampleTest.java
@@ -38,7 +38,7 @@ public class DeDupExampleTest
{
LocalMode lma = LocalMode.newInstance();
Configuration conf = new Configuration(false);
-
+
DeDupExample app = new DeDupExample();
lma.prepareDAG(app, conf);
LocalMode.Controller lc = lma.getController();
@@ -51,9 +51,9 @@ public class DeDupExampleTest
}
});
lc.run(50000);
-
+
Assert.assertEquals(9, DeDupExample.Collector.getResult().getValue().size());
-
+
}
-
+
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamplesTest.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamplesTest.java b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamplesTest.java
index 51981de..ec28b40 100644
--- a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamplesTest.java
+++ b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamplesTest.java
@@ -46,7 +46,7 @@ import com.datatorrent.stram.StramLocalCluster;
*/
public class MaxPerKeyExamplesTest
{
-
+
private static final String INPUT_TUPLE_CLASS = "org.apache.apex.malhar.stream.sample.cookbook.InputPojo";
private static final String OUTPUT_TUPLE_CLASS = "org.apache.apex.malhar.stream.sample.cookbook.OutputPojo";
private static final String DB_DRIVER = "org.h2.Driver";
@@ -56,18 +56,18 @@ public class MaxPerKeyExamplesTest
private static final String USER_NAME = "root";
private static final String PSW = "password";
private static final String QUERY = "SELECT * FROM " + INPUT_TABLE + ";";
-
+
private static final double[] MEANTEMPS = {85.3, 75.4};
-
+
@BeforeClass
public static void setup()
{
try {
Class.forName(DB_DRIVER).newInstance();
-
+
Connection con = DriverManager.getConnection(DB_URL,USER_NAME,PSW);
Statement stmt = con.createStatement();
-
+
String createMetaTable = "CREATE TABLE IF NOT EXISTS " + JdbcTransactionalStore.DEFAULT_META_TABLE + " ( "
+ JdbcTransactionalStore.DEFAULT_APP_ID_COL + " VARCHAR(100) NOT NULL, "
+ JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + " INT NOT NULL, "
@@ -76,53 +76,53 @@ public class MaxPerKeyExamplesTest
+ JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + ", " + JdbcTransactionalStore.DEFAULT_WINDOW_COL + ") "
+ ")";
stmt.executeUpdate(createMetaTable);
-
+
String createInputTable = "CREATE TABLE IF NOT EXISTS " + INPUT_TABLE
+ "(MONTH INT(2) not NULL, DAY INT(2), YEAR INT(4), MEANTEMP DOUBLE(10) )";
stmt.executeUpdate(createInputTable);
-
+
String createOutputTable = "CREATE TABLE IF NOT EXISTS " + OUTPUT_TABLE
+ "(MONTH INT(2) not NULL, MEANTEMP DOUBLE(10) )";
stmt.executeUpdate(createOutputTable);
-
+
String cleanTable = "truncate table " + INPUT_TABLE;
stmt.executeUpdate(cleanTable);
-
+
stmt = con.createStatement();
-
+
String sql = "INSERT INTO " + INPUT_TABLE + " VALUES (6, 21, 2014, 85.3)";
stmt.executeUpdate(sql);
sql = "INSERT INTO " + INPUT_TABLE + " VALUES (7, 20, 2014, 75.4)";
stmt.executeUpdate(sql);
sql = "INSERT INTO " + INPUT_TABLE + " VALUES (6, 18, 2014, 45.3)";
stmt.executeUpdate(sql);
-
+
} catch (Throwable e) {
throw Throwables.propagate(e);
}
}
-
+
@AfterClass
public static void cleanup()
{
try {
Class.forName(DB_DRIVER).newInstance();
-
+
Connection con = DriverManager.getConnection(DB_URL, USER_NAME, PSW);
Statement stmt = con.createStatement();
-
+
String dropInputTable = "DROP TABLE " + INPUT_TABLE;
stmt.executeUpdate(dropInputTable);
-
+
String dropOutputTable = "DROP TABLE " + OUTPUT_TABLE;
stmt.executeUpdate(dropOutputTable);
-
+
} catch (Throwable e) {
throw Throwables.propagate(e);
}
-
+
}
-
+
public void setConfig(Configuration conf)
{
conf.set("dt.operator.jdbcInput.prop.store.userName", USER_NAME);
@@ -133,7 +133,7 @@ public class MaxPerKeyExamplesTest
conf.set("dt.operator.jdbcInput.prop.store.databaseUrl", DB_URL);
conf.set("dt.operator.jdbcInput.prop.tableName", INPUT_TABLE);
conf.set("dt.operator.jdbcInput.prop.query", QUERY);
-
+
conf.set("dt.operator.jdbcOutput.prop.store.userName", USER_NAME);
conf.set("dt.operator.jdbcOutput.prop.store.password", PSW);
conf.set("dt.operator.jdbcOutput.prop.store.databaseDriver", DB_DRIVER);
@@ -142,14 +142,14 @@ public class MaxPerKeyExamplesTest
conf.set("dt.operator.jdbcOutput.prop.store.databaseUrl", DB_URL);
conf.set("dt.operator.jdbcOutput.prop.tablename", OUTPUT_TABLE);
}
-
+
public int getNumEntries()
{
Connection con;
try {
con = DriverManager.getConnection(DB_URL,USER_NAME,PSW);
Statement stmt = con.createStatement();
-
+
String countQuery = "SELECT count(DISTINCT (MONTH, MEANTEMP)) from " + OUTPUT_TABLE;
ResultSet resultSet = stmt.executeQuery(countQuery);
resultSet.next();
@@ -158,7 +158,7 @@ public class MaxPerKeyExamplesTest
throw new RuntimeException("fetching count", e);
}
}
-
+
public Map<Integer, Double> getMaxMeanTemp()
{
Map<Integer, Double> result = new HashMap<>();
@@ -166,30 +166,30 @@ public class MaxPerKeyExamplesTest
try {
con = DriverManager.getConnection(DB_URL,USER_NAME,PSW);
Statement stmt = con.createStatement();
-
+
String countQuery = "SELECT DISTINCT * from " + OUTPUT_TABLE;
ResultSet resultSet = stmt.executeQuery(countQuery);
while (resultSet.next()) {
result.put(resultSet.getInt("MONTH"), resultSet.getDouble("MEANTEMP"));
-
+
}
return result;
} catch (SQLException e) {
throw new RuntimeException("fetching count", e);
}
}
-
+
@Test
public void MaxPerKeyExampleTest() throws Exception
{
LocalMode lma = LocalMode.newInstance();
Configuration conf = new Configuration(false);
setConfig(conf);
-
+
MaxPerKeyExamples app = new MaxPerKeyExamples();
-
+
lma.prepareDAG(app, conf);
-
+
LocalMode.Controller lc = lma.getController();
((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>()
{
@@ -199,9 +199,9 @@ public class MaxPerKeyExamplesTest
return getNumEntries() == 2;
}
});
-
+
lc.run(5000);
-
+
double[] result = new double[2];
result[0] = getMaxMeanTemp().get(6);
result[1] = getMaxMeanTemp().get(7);
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/iteration/src/main/java/com/datatorrent/demos/iteration/Application.java
----------------------------------------------------------------------
diff --git a/demos/iteration/src/main/java/com/datatorrent/demos/iteration/Application.java b/demos/iteration/src/main/java/com/datatorrent/demos/iteration/Application.java
index 5f93206..7fbdfd1 100644
--- a/demos/iteration/src/main/java/com/datatorrent/demos/iteration/Application.java
+++ b/demos/iteration/src/main/java/com/datatorrent/demos/iteration/Application.java
@@ -18,23 +18,24 @@
*/
package com.datatorrent.demos.iteration;
-import com.datatorrent.api.Context;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.hadoop.conf.Configuration;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.DAG;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.annotation.ApplicationAnnotation;
import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.common.util.DefaultDelayOperator;
import com.datatorrent.lib.testbench.RandomEventGenerator;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.PrintStream;
/**
* Iteration demo : <br>
@@ -64,10 +65,10 @@ import java.io.PrintStream;
*
* @since 3.4.0
*/
-@ApplicationAnnotation(name="IterationDemo")
+@ApplicationAnnotation(name = "IterationDemo")
public class Application implements StreamingApplication
{
- private final static Logger LOG = LoggerFactory.getLogger(Application.class);
+ private static final Logger LOG = LoggerFactory.getLogger(Application.class);
private String extraOutputFileName; // for unit test
public static class FibonacciOperator extends BaseOperator
@@ -117,7 +118,7 @@ public class Application implements StreamingApplication
public void process(Object t)
{
String s = t.toString();
- System.out.println(s);
+ LOG.info(s);
if (extraOutputStream != null) {
extraOutputStream.println(s);
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/iteration/src/test/java/com/datatorrent/demos/iteration/ApplicationTest.java
----------------------------------------------------------------------
diff --git a/demos/iteration/src/test/java/com/datatorrent/demos/iteration/ApplicationTest.java b/demos/iteration/src/test/java/com/datatorrent/demos/iteration/ApplicationTest.java
index 7804fcd..9fb89ac 100644
--- a/demos/iteration/src/test/java/com/datatorrent/demos/iteration/ApplicationTest.java
+++ b/demos/iteration/src/test/java/com/datatorrent/demos/iteration/ApplicationTest.java
@@ -18,17 +18,16 @@
*/
package com.datatorrent.demos.iteration;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
-import org.apache.hadoop.conf.Configuration;
import org.junit.Assert;
import org.junit.Test;
-import com.datatorrent.api.LocalMode;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileReader;
+import org.apache.hadoop.conf.Configuration;
+import com.datatorrent.api.LocalMode;
/**
*
@@ -61,7 +60,8 @@ public class ApplicationTest
if (file.length() > 50) {
break;
}
- } while (System.currentTimeMillis() - startTime < timeout);
+ }
+ while (System.currentTimeMillis() - startTime < timeout);
lc.shutdown();
try (BufferedReader br = new BufferedReader(new FileReader(outputFileName))) {
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/Application.java
----------------------------------------------------------------------
diff --git a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/Application.java b/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/Application.java
index 32aac35..55b299f 100644
--- a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/Application.java
+++ b/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/Application.java
@@ -18,22 +18,23 @@
*/
package com.datatorrent.demos.machinedata;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+
import com.datatorrent.api.DAG;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.contrib.redis.RedisKeyValPairOutputOperator;
import com.datatorrent.demos.machinedata.data.MachineKey;
import com.datatorrent.demos.machinedata.operator.MachineInfoAveragingOperator;
import com.datatorrent.demos.machinedata.operator.MachineInfoAveragingPrerequisitesOperator;
-import com.datatorrent.contrib.redis.RedisKeyValPairOutputOperator;
import com.datatorrent.lib.io.SmtpOutputOperator;
-import java.util.Map;
-
-import org.apache.hadoop.conf.Configuration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
/**
* <p>
* Resource monitor application.
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/DimensionGenerator.java
----------------------------------------------------------------------
diff --git a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/DimensionGenerator.java b/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/DimensionGenerator.java
index 77b39b5..75c2a02 100644
--- a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/DimensionGenerator.java
+++ b/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/DimensionGenerator.java
@@ -18,18 +18,13 @@
*/
package com.datatorrent.demos.machinedata;
-import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.api.Context;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.demos.machinedata.data.MachineInfo;
import com.datatorrent.demos.machinedata.data.MachineKey;
-import java.util.*;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
/**
* <p>
* Information tuple generator with randomness.
@@ -42,9 +37,10 @@ public class DimensionGenerator extends BaseOperator
{
public transient DefaultOutputPort<MachineInfo> outputInline = new DefaultOutputPort<>();
public transient DefaultOutputPort<MachineInfo> output = new DefaultOutputPort<>();
- private int threshold=90;
+ private int threshold = 90;
- public final transient DefaultInputPort<MachineInfo> inputPort = new DefaultInputPort<MachineInfo>() {
+ public final transient DefaultInputPort<MachineInfo> inputPort = new DefaultInputPort<MachineInfo>()
+ {
@Override
public void process(MachineInfo tuple)
@@ -113,9 +109,9 @@ public class DimensionGenerator extends BaseOperator
int hdd = tuple.getHdd();
MachineInfo machineInfo = new MachineInfo();
machineInfo.setMachineKey(machineKey);
- machineInfo.setCpu((cpu < threshold)?cpu:threshold);
- machineInfo.setRam((ram < threshold)?ram:threshold);
- machineInfo.setHdd((hdd < threshold)?hdd:threshold);
+ machineInfo.setCpu((cpu < threshold) ? cpu : threshold);
+ machineInfo.setRam((ram < threshold) ? ram : threshold);
+ machineInfo.setHdd((hdd < threshold) ? hdd : threshold);
outputInline.emit(machineInfo);
output.emit(machineInfo);
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/InputReceiver.java
----------------------------------------------------------------------
diff --git a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/InputReceiver.java b/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/InputReceiver.java
index 560df52..85ec954 100644
--- a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/InputReceiver.java
+++ b/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/InputReceiver.java
@@ -18,20 +18,23 @@
*/
package com.datatorrent.demos.machinedata;
-import com.datatorrent.common.util.BaseOperator;
-import com.datatorrent.api.Context;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.InputOperator;
-import com.datatorrent.demos.machinedata.data.MachineInfo;
-import com.datatorrent.demos.machinedata.data.MachineKey;
-
import java.text.DateFormat;
import java.text.SimpleDateFormat;
-import java.util.*;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.Random;
+import java.util.TimeZone;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.demos.machinedata.data.MachineInfo;
+import com.datatorrent.demos.machinedata.data.MachineKey;
+
/**
* <p>
* Information tuple generator with randomness.
@@ -74,6 +77,7 @@ public class InputReceiver extends BaseOperator implements InputOperator
dayDateFormat.setTimeZone(tz);
}
+
@Override
public void setup(Context.OperatorContext context)
{
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/data/MachineKey.java
----------------------------------------------------------------------
diff --git a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/data/MachineKey.java b/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/data/MachineKey.java
index 722a77e..2b3bb1c 100644
--- a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/data/MachineKey.java
+++ b/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/data/MachineKey.java
@@ -305,25 +305,29 @@ public class MachineKey
if (!(obj instanceof MachineKey)) {
return false;
}
- MachineKey mkey = (MachineKey) obj;
+ MachineKey mkey = (MachineKey)obj;
return checkStringEqual(this.timeKey, mkey.timeKey) && checkStringEqual(this.day, mkey.day) && checkIntEqual(this.customer, mkey.customer) && checkIntEqual(this.product, mkey.product) && checkIntEqual(this.os, mkey.os) && checkIntEqual(this.software1, mkey.software1) && checkIntEqual(this.software2, mkey.software2) && checkIntEqual(this.software3, mkey.software3) && checkIntEqual(this.deviceId, mkey.deviceId);
}
private boolean checkIntEqual(Integer a, Integer b)
{
- if ((a == null) && (b == null))
+ if ((a == null) && (b == null)) {
return true;
- if ((a != null) && a.equals(b))
+ }
+ if ((a != null) && a.equals(b)) {
return true;
+ }
return false;
}
private boolean checkStringEqual(String a, String b)
{
- if ((a == null) && (b == null))
+ if ((a == null) && (b == null)) {
return true;
- if ((a != null) && a.equals(b))
+ }
+ if ((a != null) && a.equals(b)) {
return true;
+ }
return false;
}
@@ -331,20 +335,27 @@ public class MachineKey
public String toString()
{
StringBuilder sb = new StringBuilder(timeKey);
- if (customer != null)
+ if (customer != null) {
sb.append("|0:").append(customer);
- if (product != null)
+ }
+ if (product != null) {
sb.append("|1:").append(product);
- if (os != null)
+ }
+ if (os != null) {
sb.append("|2:").append(os);
- if (software1 != null)
+ }
+ if (software1 != null) {
sb.append("|3:").append(software1);
- if (software2 != null)
+ }
+ if (software2 != null) {
sb.append("|4:").append(software2);
- if (software3 != null)
+ }
+ if (software3 != null) {
sb.append("|5:").append(software3);
- if (deviceId != null)
+ }
+ if (deviceId != null) {
sb.append("|6:").append(deviceId);
+ }
return sb.toString();
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/data/ResourceType.java
----------------------------------------------------------------------
diff --git a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/data/ResourceType.java b/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/data/ResourceType.java
index a0b2ecf..d474c5c 100644
--- a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/data/ResourceType.java
+++ b/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/data/ResourceType.java
@@ -18,45 +18,49 @@
*/
package com.datatorrent.demos.machinedata.data;
-import com.google.common.collect.Maps;
-
import java.util.Map;
+import com.google.common.collect.Maps;
+
/**
* This class captures the resources whose usage is collected for each device
* <p>ResourceType class.</p>
*
* @since 0.3.5
*/
-public enum ResourceType {
+public enum ResourceType
+{
- CPU("cpu"), RAM("ram"), HDD("hdd");
+ CPU("cpu"), RAM("ram"), HDD("hdd");
- private static Map<String, ResourceType> descToResource = Maps.newHashMap();
+ private static Map<String, ResourceType> descToResource = Maps.newHashMap();
- static {
- for (ResourceType type : ResourceType.values()) {
- descToResource.put(type.desc, type);
- }
+ static {
+ for (ResourceType type : ResourceType.values()) {
+ descToResource.put(type.desc, type);
}
+ }
- private String desc;
+ private String desc;
- private ResourceType(String desc) {
- this.desc = desc;
- }
+ private ResourceType(String desc)
+ {
+ this.desc = desc;
+ }
- @Override
- public String toString() {
- return desc;
- }
+ @Override
+ public String toString()
+ {
+ return desc;
+ }
- /**
- * This method returns ResourceType for the given description
- * @param desc the description
- * @return
- */
- public static ResourceType getResourceTypeOf(String desc) {
- return descToResource.get(desc);
- }
+ /**
+ * This method returns ResourceType for the given description
+ * @param desc the description
+ * @return
+ */
+ public static ResourceType getResourceTypeOf(String desc)
+ {
+ return descToResource.get(desc);
+ }
}