You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by ti...@apache.org on 2015/09/24 22:59:08 UTC
[1/2] incubator-apex-malhar git commit: MLHR-1855 TopN word counts
with visualization support
Repository: incubator-apex-malhar
Updated Branches:
refs/heads/devel-3 d710af9b1 -> ae5f1ede5
MLHR-1855 TopN word counts with visualization support
Fixed per review comments
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/0a5914ce
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/0a5914ce
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/0a5914ce
Branch: refs/heads/devel-3
Commit: 0a5914ce8066159fc71633e8dc4db12abd23d143
Parents: 9194a72
Author: Munagala V. Ramanath <ra...@apache.org>
Authored: Mon Sep 21 06:17:41 2015 -0700
Committer: Munagala V. Ramanath <ra...@apache.org>
Committed: Thu Sep 24 13:45:36 2015 -0700
----------------------------------------------------------------------
demos/wordcount/pom.xml | 8 +
.../wordcount/ApplicationWithQuerySupport.java | 122 ++++++++
.../demos/wordcount/FileWordCount.java | 288 +++++++++++++++++++
.../datatorrent/demos/wordcount/LineReader.java | 95 ++++++
.../com/datatorrent/demos/wordcount/WCPair.java | 34 +++
.../demos/wordcount/WindowWordCount.java | 82 ++++++
.../demos/wordcount/WordCountWriter.java | 90 ++++++
.../datatorrent/demos/wordcount/WordReader.java | 69 +++++
.../src/main/resources/META-INF/properties.xml | 88 +++++-
.../src/main/resources/WordDataSchema.json | 4 +
10 files changed, 869 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/0a5914ce/demos/wordcount/pom.xml
----------------------------------------------------------------------
diff --git a/demos/wordcount/pom.xml b/demos/wordcount/pom.xml
index a3834ba..6535c66 100644
--- a/demos/wordcount/pom.xml
+++ b/demos/wordcount/pom.xml
@@ -21,4 +21,12 @@
<skipTests>true</skipTests>
</properties>
+ <dependencies>
+ <dependency>
+ <groupId>it.unimi.dsi</groupId>
+ <artifactId>fastutil</artifactId>
+ <version>6.6.4</version>
+ </dependency>
+ </dependencies>
+
</project>
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/0a5914ce/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/ApplicationWithQuerySupport.java
----------------------------------------------------------------------
diff --git a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/ApplicationWithQuerySupport.java b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/ApplicationWithQuerySupport.java
new file mode 100644
index 0000000..9b0b8d8
--- /dev/null
+++ b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/ApplicationWithQuerySupport.java
@@ -0,0 +1,122 @@
+/**
+ * Copyright (C) 2015 DataTorrent, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datatorrent.demos.wordcount;
+
+import java.net.URI;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.lang.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DAG.Locality;
+import com.datatorrent.api.Operator;
+
+import com.datatorrent.lib.appdata.schemas.SchemaUtils;
+import com.datatorrent.lib.appdata.snapshot.AppDataSnapshotServerMap;
+import com.datatorrent.lib.io.PubSubWebSocketAppDataQuery;
+import com.datatorrent.lib.io.PubSubWebSocketAppDataResult;
+import com.datatorrent.lib.io.ConsoleOutputOperator;
+import com.datatorrent.lib.stream.DevNull;
+
+import org.apache.hadoop.conf.Configuration;
+
+@ApplicationAnnotation(name="TopNWordsWithQueries")
+public class ApplicationWithQuerySupport implements StreamingApplication
+{
+ private static final Logger LOG = LoggerFactory.getLogger(ApplicationWithQuerySupport.class);
+
+ public static final String
+ SNAPSHOT_SCHEMA = "WordDataSchema.json",
+ APP_NAME = "TopNWordsWithQueries";
+
+ private final Locality locality = null;
+
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ // create operators
+ LineReader lineReader = dag.addOperator("lineReader", new LineReader());
+ WordReader wordReader = dag.addOperator("wordReader", new WordReader());
+ WindowWordCount windowWordCount = dag.addOperator("windowWordCount", new WindowWordCount());
+ FileWordCount fileWordCount = dag.addOperator("fileWordCount", new FileWordCount());
+ WordCountWriter wcWriter = dag.addOperator("wcWriter", new WordCountWriter());
+ ConsoleOutputOperator console = dag.addOperator("console", new ConsoleOutputOperator());
+ console.setStringFormat("wordCount: %s");
+
+ // create streams
+
+ dag.addStream("lines", lineReader.output, wordReader.input);
+ dag.addStream("control", lineReader.control, fileWordCount.control);
+ dag.addStream("words", wordReader.output, windowWordCount.input);
+ dag.addStream("windowWordCounts", windowWordCount.output, fileWordCount.input);
+ dag.addStream("fileWordCounts", fileWordCount.fileOutput, wcWriter.input);
+
+ String gatewayAddress = dag.getValue(DAG.GATEWAY_CONNECT_ADDRESS);
+
+ if ( ! StringUtils.isEmpty(gatewayAddress)) { // add query support
+ URI uri = URI.create("ws://" + gatewayAddress + "/pubsub");
+
+ AppDataSnapshotServerMap snapshotServerFile
+ = dag.addOperator("snapshotServerFile", new AppDataSnapshotServerMap());
+ AppDataSnapshotServerMap snapshotServerGlobal
+ = dag.addOperator("snapshotServerGlobal", new AppDataSnapshotServerMap());
+
+ String snapshotServerJSON = SchemaUtils.jarResourceFileToString(SNAPSHOT_SCHEMA);
+ snapshotServerFile.setSnapshotSchemaJSON(snapshotServerJSON);
+ snapshotServerGlobal.setSnapshotSchemaJSON(snapshotServerJSON);
+
+ PubSubWebSocketAppDataQuery
+ wsQueryFile = new PubSubWebSocketAppDataQuery(),
+ wsQueryGlobal = new PubSubWebSocketAppDataQuery();
+ wsQueryFile.setUri(uri);
+ wsQueryGlobal.setUri(uri);
+
+ snapshotServerFile.setEmbeddableQueryInfoProvider(wsQueryFile);
+ snapshotServerGlobal.setEmbeddableQueryInfoProvider(wsQueryGlobal);
+
+ PubSubWebSocketAppDataResult wsResultFile
+ = dag.addOperator("wsResultFile", new PubSubWebSocketAppDataResult());
+ PubSubWebSocketAppDataResult wsResultGlobal
+ = dag.addOperator("wsResultGlobal", new PubSubWebSocketAppDataResult());
+ wsResultFile.setUri(uri);
+ wsResultGlobal.setUri(uri);
+
+ Operator.InputPort<String> queryResultFilePort = wsResultFile.input;
+ Operator.InputPort<String> queryResultGlobalPort = wsResultGlobal.input;
+
+ dag.addStream("WordCountsFile", fileWordCount.outputPerFile,
+ snapshotServerFile.input, console.input);
+ dag.addStream("WordCountsGlobal", fileWordCount.outputGlobal,
+ snapshotServerGlobal.input);
+
+ dag.addStream("ResultFile", snapshotServerFile.queryResult, queryResultFilePort);
+ dag.addStream("ResultGlobal", snapshotServerGlobal.queryResult, queryResultGlobalPort);
+ } else {
+ //throw new RuntimeException("Error: No GATEWAY_CONNECT_ADDRESS");
+ dag.addStream("WordCounts", fileWordCount.outputPerFile, console.input);
+ }
+
+ System.out.println("done with populateDAG, isDebugEnabled = " + LOG.isDebugEnabled());
+ LOG.info("Returning from populateDAG");
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/0a5914ce/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/FileWordCount.java
----------------------------------------------------------------------
diff --git a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/FileWordCount.java b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/FileWordCount.java
new file mode 100644
index 0000000..615fa5c
--- /dev/null
+++ b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/FileWordCount.java
@@ -0,0 +1,288 @@
+/**
+ * Copyright (C) 2015 DataTorrent, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datatorrent.demos.wordcount;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+//import org.apache.commons.lang.mutable.MutableInt;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.annotation.InputPortFieldAnnotation;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.common.util.BaseOperator;
+
+/**
+ * Monitors an input directory for text files, computes word frequency counts per file and globally,
+ * and writes the top N pairs to an output file and to snapshot servers for visualization.
+ * Currently designed to work with only 1 file at a time; will be enhanced later to support
+ * multiple files dropped into the monitored directory at the same time.
+ *
+ * <p>
+ * Receives per-window list of pairs (word, frequency) on the input port. When the end of a file
+ * is reached, expects to get an EOF on the control port; at the next endWindow, the top N words
+ * and frequencies are computed and emitted to the output ports.
+ * <p>
+ * There are 3 output ports: (a) One for the per-file top N counts emitted when the file is fully
+ * read and is written to the output file. (b) One for the top N counts emitted per window for the
+ * current file to the snapshot server and (c) One for the global top N counts emitted per window
+ * to a different snapshot server.
+ *
+ * Since the EOF is received by a single operator, this operator cannot be partitionable
+ */
+public class FileWordCount extends BaseOperator
+{
+ private static final Logger LOG = LoggerFactory.getLogger(FileWordCount.class);
+ private static final String GLOBAL = "global";
+
+ // If topN > 0, only data for the topN most frequent words is output; if topN == 0, the
+ // entire frequency map is output
+ //
+ protected int topN;
+
+ // set to true when we get an EOF control tuple
+ protected boolean eof = false;
+
+ // last component of path (i.e. only file name)
+ // incoming value from control tuple
+ protected String fileName;
+
+ // wordMapFile : {word => frequency} map, current file, all words
+ // wordMapGlobal : {word => frequency} map, global, all words
+ //
+ protected Map<String, WCPair> wordMapFile = new HashMap<>();
+ protected Map<String, WCPair> wordMapGlobal = new HashMap<>();
+
+ // resultPerFile : singleton list [TopNMap] with per file data; sent on outputPerFile
+ // resultGlobal : singleton list [wordFreqMap] with per file data; sent on outputGlobal
+ //
+ protected transient List<Map<String, Object>> resultPerFile, resultGlobal;
+
+ // singleton map of fileName to sorted list of (word, frequency) pairs
+ protected transient Map<String, Object> resultFileFinal;
+ protected transient List<WCPair> fileFinalList;
+
+ public final transient DefaultInputPort<List<WCPair>> input = new DefaultInputPort<List<WCPair>>()
+ {
+ @Override
+ public void process(List<WCPair> list)
+ {
+ // blend incoming list into wordMapFile and wordMapGlobal
+ for (WCPair pair : list) {
+ final String word = pair.word;
+ WCPair filePair = wordMapFile.get(word);
+ if (null != filePair) { // word seen previously in current file
+ WCPair globalPair = wordMapGlobal.get(word); // cannot be null
+ filePair.freq += pair.freq;
+ globalPair.freq += pair.freq;
+ continue;
+ }
+
+ // new word in current file
+ filePair = new WCPair(word, pair.freq);
+ wordMapFile.put(word, filePair);
+
+ // check global map
+ WCPair globalPair = wordMapGlobal.get(word); // may be null
+ if (null != globalPair) { // word seen previously
+ globalPair.freq += pair.freq;
+ continue;
+ }
+
+ // word never seen before
+ globalPair = new WCPair(word, pair.freq);
+ wordMapGlobal.put(word, globalPair);
+ }
+ }
+ };
+
+ @InputPortFieldAnnotation(optional = true)
+ public final transient DefaultInputPort<String> control = new DefaultInputPort<String>()
+ {
+ @Override
+ public void process(String msg)
+ {
+ if (msg.isEmpty()) { // sanity check
+ throw new RuntimeException("Empty file path");
+ }
+ LOG.info("FileWordCount: EOF for {}, topN = {}", msg, topN);
+ fileName = msg;
+ eof = true;
+ // NOTE: current version only supports processing one file at a time.
+ }
+ };
+
+ // outputPerFile -- tuple is TopNMap for current file
+ // outputGlobal -- tuple is TopNMap globally
+ //
+ public final transient DefaultOutputPort<List<Map<String, Object>>>
+ outputPerFile = new DefaultOutputPort<>();
+
+ @OutputPortFieldAnnotation(optional = true)
+ public final transient DefaultOutputPort<List<Map<String, Object>>>
+ outputGlobal = new DefaultOutputPort<>();
+
+ // fileOutput -- tuple is singleton map {<fileName> => TopNMap} where TopNMap is the final
+ // top N for current file; emitted on EOF
+ //
+ public final transient DefaultOutputPort<Map<String, Object>>
+ fileOutput = new DefaultOutputPort<>();
+
+ public int getTopN() {
+ return topN;
+ }
+
+ public void setTopN(int n) {
+ topN = n;
+ }
+
+ @Override
+ public void setup(OperatorContext context)
+ {
+ if (null == wordMapFile) {
+ wordMapFile = new HashMap<>();
+ }
+ if (null == wordMapGlobal) {
+ wordMapGlobal = new HashMap<>();
+ }
+ resultPerFile = new ArrayList(1);
+ resultGlobal = new ArrayList(1);
+ // singleton map {<fileName> => fileFinalList}; cannot populate it yet since we need fileName
+ resultFileFinal = new HashMap<>(1);
+ fileFinalList = new ArrayList<>();
+ }
+
+ @Override
+ public void endWindow()
+ {
+ LOG.info("FileWordCount: endWindow for {}, topN = {}", fileName, topN);
+
+ if (wordMapFile.isEmpty()) { // no words found
+ if (eof) { // write empty list to fileOutput port
+ // got EOF, so output empty list to output file
+ fileFinalList.clear();
+ resultFileFinal.put(fileName, fileFinalList);
+ fileOutput.emit(resultFileFinal);
+
+ // reset for next file
+ eof = false;
+ fileName = null;
+ resultFileFinal.clear();
+ }
+ LOG.info("FileWordCount: endWindow for {}, no words, topN = {}", fileName, topN);
+ return;
+ }
+
+ LOG.info("FileWordCount: endWindow for {}, wordMapFile.size = {}, topN = {}",
+ fileName, wordMapFile.size(), topN);
+
+ // have some words; need the file name to emit topN
+ if (null == fileName) { // should never happen
+ throw new RuntimeException("No fileName at endWindow");
+ }
+
+ // get topN list for this file and, if we have EOF, emit to fileOutput port
+
+ // get topN global list and emit to global output port
+ getTopNMap(wordMapGlobal, resultGlobal);
+ LOG.info("FileWordCount: resultGlobal.size = {}", resultGlobal.size());
+ outputGlobal.emit(resultGlobal);
+
+ // get topN list for this file and emit to file output port
+ getTopNMap(wordMapFile, resultPerFile);
+ LOG.info("FileWordCount: resultPerFile.size = {}", resultPerFile.size());
+ outputPerFile.emit(resultPerFile);
+
+ if (eof) {
+ // got EOF, so compute final topN list from wordMapFile into fileFinalList and emit it
+ getTopNList(wordMapFile);
+ resultFileFinal.put(fileName, fileFinalList);
+ fileOutput.emit(resultFileFinal);
+
+ // reset for next file
+ eof = false;
+ fileName = null;
+ wordMapFile.clear();
+ resultFileFinal.clear();
+ }
+ }
+
+ // get topN frequencies from map, convert each pair to a singleton map and append to result
+ // This map is suitable input to AppDataSnapshotServer
+ // MUST have map.size() > 0 here
+ //
+ private void getTopNMap(final Map<String, WCPair> map, List<Map<String, Object>> result)
+ {
+ final ArrayList<WCPair> list = new ArrayList<>(map.values());
+
+ // sort entries in descending order of frequency
+ Collections.sort(list, new Comparator<WCPair>() {
+ @Override
+ public int compare(WCPair o1, WCPair o2) {
+ return (int)(o2.freq - o1.freq);
+ }
+ });
+
+ if (topN > 0) {
+ list.subList(topN, map.size()).clear(); // retain only the first topN entries
+ }
+
+ // convert each pair (word, freq) of list to a map with 2 elements
+ // {("word": <word>, "count": freq)} and append to list
+ //
+ result.clear();
+ for (WCPair pair : list) {
+ Map<String, Object> wmap = new HashMap<>(2);
+ wmap.put("word", pair.word);
+ wmap.put("count", pair.freq);
+ result.add(wmap);
+ }
+ LOG.info("FileWordCount:getTopNMap: result.size = {}", result.size());
+ list.clear();
+ }
+
+ // populate fileFinalList with topN frequencies from argument
+ // This list is suitable input to WordCountWriter which writes it to a file
+ // MUST have map.size() > 0 here
+ //
+ private void getTopNList(final Map<String, WCPair> map)
+ {
+ fileFinalList.clear();
+ fileFinalList.addAll(map.values());
+
+ // sort entries in descending order of frequency
+ Collections.sort(fileFinalList, new Comparator<WCPair>() {
+ @Override
+ public int compare(WCPair o1, WCPair o2) {
+ return (int)(o2.freq - o1.freq);
+ }
+ });
+
+ if (topN > 0) {
+ fileFinalList.subList(topN, map.size()).clear(); // retain only the first topN entries
+ }
+ LOG.info("FileWordCount:getTopNList: fileFinalList.size = {}", fileFinalList.size());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/0a5914ce/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/LineReader.java
----------------------------------------------------------------------
diff --git a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/LineReader.java b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/LineReader.java
new file mode 100644
index 0000000..b030356
--- /dev/null
+++ b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/LineReader.java
@@ -0,0 +1,95 @@
+/**
+ * Copyright (C) 2015 DataTorrent, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datatorrent.demos.wordcount;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+
+import org.apache.hadoop.fs.Path;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.lib.io.fs.AbstractFileInputOperator;
+
+// reads lines from input file and returns them; if end-of-file is reached, a control tuple
+// is emitted on the control port
+//
+public class LineReader extends AbstractFileInputOperator<String>
+{
+ private static final Logger LOG = LoggerFactory.getLogger(LineReader.class);
+
+ public final transient DefaultOutputPort<String> output = new DefaultOutputPort<>();
+
+ @OutputPortFieldAnnotation(optional = true)
+ public final transient DefaultOutputPort<String> control = new DefaultOutputPort<>();
+
+ private transient BufferedReader br = null;
+
+ private Path path;
+
+ @Override
+ protected InputStream openFile(Path curPath) throws IOException
+ {
+ LOG.info("openFile: curPath = {}", curPath);
+ path = curPath;
+ InputStream is = super.openFile(path);
+ br = new BufferedReader(new InputStreamReader(is));
+ return is;
+ }
+
+ @Override
+ protected void closeFile(InputStream is) throws IOException
+ {
+ super.closeFile(is);
+ br.close();
+ br = null;
+ path = null;
+ }
+
+ // return empty string
+ @Override
+ protected String readEntity() throws IOException
+ {
+ // try to read a line
+ final String line = br.readLine();
+ if (null != line) { // common case
+ LOG.debug("readEntity: line = {}", line);
+ return line;
+ }
+
+ // end-of-file; send control tuple, containing only the last component of the path
+ // (only file name) on control port
+ //
+ if (control.isConnected()) {
+ LOG.info("readEntity: EOF for {}", path);
+ final String name = path.getName(); // final component of path
+ control.emit(name);
+ }
+
+ return null;
+ }
+
+ @Override
+ protected void emit(String tuple)
+ {
+ output.emit(tuple);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/0a5914ce/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WCPair.java
----------------------------------------------------------------------
diff --git a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WCPair.java b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WCPair.java
new file mode 100644
index 0000000..0eb1e72
--- /dev/null
+++ b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WCPair.java
@@ -0,0 +1,34 @@
+/**
+ * Copyright (C) 2015 DataTorrent, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datatorrent.demos.wordcount;
+
+// a single (word, frequency) pair
+public class WCPair {
+ public String word;
+ public int freq;
+
+ public WCPair() {}
+
+ public WCPair(String w, int f) {
+ word = w;
+ freq = f;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("(%s, %d)", word, freq);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/0a5914ce/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WindowWordCount.java
----------------------------------------------------------------------
diff --git a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WindowWordCount.java b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WindowWordCount.java
new file mode 100644
index 0000000..93f4f2f
--- /dev/null
+++ b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WindowWordCount.java
@@ -0,0 +1,82 @@
+/**
+ * Copyright (C) 2015 DataTorrent, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datatorrent.demos.wordcount;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.common.util.BaseOperator;
+
+// Computes word frequency counts per window and emits them at each endWindow. The output is a
+// list of pairs (word, frequency).
+//
+public class WindowWordCount extends BaseOperator
+{
+ private static final Logger LOG = LoggerFactory.getLogger(WindowWordCount.class);
+
+ // wordMap : word => frequency
+ protected Map<String, WCPair> wordMap = new HashMap<>();
+
+ public final transient DefaultInputPort<String> input = new DefaultInputPort<String>()
+ {
+ @Override
+ public void process(String word)
+ {
+ WCPair pair = wordMap.get(word);
+ if (null != pair) { // word seen previously
+ pair.freq += 1;
+ return;
+ }
+
+ // new word
+ pair = new WCPair();
+ pair.word = word;
+ pair.freq = 1;
+ wordMap.put(word, pair);
+ }
+ };
+
+ // output port which emits the list of word frequencies for current window
+ // fileName => list of (word, freq) pairs
+ //
+ public final transient DefaultOutputPort<List<WCPair>> output = new DefaultOutputPort<>();
+
+ @Override
+ public void endWindow()
+ {
+ LOG.info("WindowWordCount: endWindow");
+
+ // got EOF; if no words found, do nothing
+ if (wordMap.isEmpty()) return;
+
+ // have some words; emit single map and reset for next file
+ final ArrayList<WCPair> list = new ArrayList<>(wordMap.values());
+ output.emit(list);
+ list.clear();
+ wordMap.clear();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/0a5914ce/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WordCountWriter.java
----------------------------------------------------------------------
diff --git a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WordCountWriter.java b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WordCountWriter.java
new file mode 100644
index 0000000..c1c58ef
--- /dev/null
+++ b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WordCountWriter.java
@@ -0,0 +1,90 @@
+/**
+ * Copyright (C) 2015 DataTorrent, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datatorrent.demos.wordcount;
+
+import java.io.UnsupportedEncodingException;
+
+import java.util.List;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.lib.io.fs.AbstractFileOutputOperator;
+
+// write top N words and their frequencies to a file
+//
+public class WordCountWriter extends AbstractFileOutputOperator<Map<String, Object>>
+{
+ private static final Logger LOG = LoggerFactory.getLogger(WordCountWriter.class);
+ private static final String charsetName = "UTF-8";
+ private static final String nl = System.lineSeparator();
+
+ private String fileName; // current file name
+ private transient final StringBuilder sb = new StringBuilder();
+
+ @Override
+ public void endWindow()
+ {
+ if (null != fileName) {
+ requestFinalize(fileName);
+ }
+ super.endWindow();
+ }
+
+ // input is a singleton list [M] where M is a singleton map {fileName => L} where L is a
+ // list of pairs: (word, frequency)
+ //
+ @Override
+ protected String getFileName(Map<String, Object> tuple)
+ {
+ LOG.info("getFileName: tuple.size = {}", tuple.size());
+
+ final Map.Entry<String, Object> entry = tuple.entrySet().iterator().next();
+ fileName = entry.getKey();
+ LOG.info("getFileName: fileName = {}", fileName);
+ return fileName;
+ }
+
+ @Override
+ protected byte[] getBytesForTuple(Map<String, Object> tuple)
+ {
+ LOG.info("getBytesForTuple: tuple.size = {}", tuple.size());
+
+ // get first and only pair; key is the fileName and is ignored here
+ final Map.Entry<String, Object> entry = tuple.entrySet().iterator().next();
+ final List<WCPair> list = (List<WCPair>) entry.getValue();
+
+ if (sb.length() > 0) { // clear buffer
+ sb.delete(0, sb.length());
+ }
+
+ for ( WCPair pair : list ) {
+ sb.append(pair.word); sb.append(" : ");
+ sb.append(pair.freq); sb.append(nl);
+ }
+
+ final String data = sb.toString();
+ LOG.info("getBytesForTuple: data = {}", data);
+ try {
+ final byte[] result = data.getBytes(charsetName);
+ return result;
+ } catch (UnsupportedEncodingException ex) {
+ throw new RuntimeException("Should never get here", ex);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/0a5914ce/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WordReader.java
----------------------------------------------------------------------
diff --git a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WordReader.java b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WordReader.java
new file mode 100644
index 0000000..b509540
--- /dev/null
+++ b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WordReader.java
@@ -0,0 +1,69 @@
+/**
+ * Copyright (C) 2015 DataTorrent, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datatorrent.demos.wordcount;
+
+import java.util.regex.Pattern;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.common.util.BaseOperator;
+
+// extracts words from input line
+public class WordReader extends BaseOperator
+{
+ // default pattern for word-separators
+ private static final Pattern nonWordDefault = Pattern.compile("[\\p{Punct}\\s]+");
+
+ private String nonWordStr; // configurable regex
+ private transient Pattern nonWord; // compiled regex
+
+ public final transient DefaultOutputPort<String> output = new DefaultOutputPort<>();
+
+ public final transient DefaultInputPort<String>
+ input = new DefaultInputPort<String>() {
+
+ @Override
+ public void process(String line)
+ {
+ // line; split it into words and emit them
+ final String[] words = nonWord.split(line);
+ for (String word : words) {
+ if (word.isEmpty()) continue;
+ output.emit(word);
+ }
+ }
+ };
+
+ public String getNonWordStr() {
+ return nonWordStr;
+ }
+
+ public void setNonWordStr(String regex) {
+ nonWordStr = regex;
+ }
+
+ @Override
+ public void setup(OperatorContext context)
+ {
+ if (null == nonWordStr) {
+ nonWord = nonWordDefault;
+ } else {
+ nonWord = Pattern.compile(nonWordStr);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/0a5914ce/demos/wordcount/src/main/resources/META-INF/properties.xml
----------------------------------------------------------------------
diff --git a/demos/wordcount/src/main/resources/META-INF/properties.xml b/demos/wordcount/src/main/resources/META-INF/properties.xml
index aa8d0bf..aaee620 100644
--- a/demos/wordcount/src/main/resources/META-INF/properties.xml
+++ b/demos/wordcount/src/main/resources/META-INF/properties.xml
@@ -1,12 +1,78 @@
<configuration>
- <property>
- <name>dt.application.WordCountDemo.operator.wordinput.fileName</name>
- <value>samplefile.txt</value>
- </property>
- <property>
- <name>dt.application.WordCountDemo.stream.wordinput.count.locality</name>
- <value>CONTAINER_LOCAL</value>
- <description>Specify container locality for the viewtuplecount stream
- </description>
- </property>
-</configuration>
\ No newline at end of file
+ <!-- TopNWordsWithQueries -->
+
+ <!-- for debugging -->
+ <!--
+ <property>
+ <name>dt.attr.CONTAINER_JVM_OPTIONS</name>
+ <value>-Dlog4j.configuration=my_log4j.properties</value>
+ </property>
+ -->
+
+ <!-- monitored input directory -->
+ <property>
+ <name>dt.application.TopNWordsWithQueries.operator.lineReader.directory</name>
+ <value>/tmp/test/input-dir</value>
+ </property>
+
+ <!-- regular expression for word separator -->
+ <property>
+ <name>dt.application.TopNWordsWithQueries.operator.wordReader.nonWordStr</name>
+ <value>[\p{Punct}\s]+</value>
+ </property>
+
+ <!-- output directory for word counts -->
+ <property>
+ <name>dt.application.TopNWordsWithQueries.operator.wcWriter.filePath</name>
+ <value>/tmp/test/output-dir</value>
+ </property>
+
+ <!-- Top N value -->
+ <property>
+ <name>dt.application.TopNWordsWithQueries.operator.fileWordCount.topN</name>
+ <value>10</value>
+ </property>
+
+ <!-- topic for queries (current file) -->
+ <property>
+ <name>dt.application.TopNWordsWithQueries.operator.snapshotServerFile.embeddableQueryInfoProvider.topic</name>
+ <value>TopNWordsQueryFile</value>
+ </property>
+
+ <!-- topic for query results (current file) -->
+ <property>
+ <name>dt.application.TopNWordsWithQueries.operator.wsResultFile.topic</name>
+ <value>TopNWordsQueryFileResult</value>
+ </property>
+
+ <!-- topic for queries (global) -->
+ <property>
+ <name>dt.application.TopNWordsWithQueries.operator.snapshotServerGlobal.embeddableQueryInfoProvider.topic</name>
+ <value>TopNWordsQueryGlobal</value>
+ </property>
+
+ <!-- topic for query results (global) -->
+ <property>
+ <name>dt.application.TopNWordsWithQueries.operator.wsResultGlobal.topic</name>
+ <value>TopNWordsQueryGlobalResult</value>
+ </property>
+
+ <!-- retry count -->
+ <property>
+ <name>dt.application.TwitterDemo.operator.wsResult.numRetries</name>
+ <value>2147483647</value>
+ </property>
+
+
+ <!-- WordCountDemo -->
+ <property>
+ <name>dt.application.WordCountDemo.operator.wordinput.fileName</name>
+ <value>samplefile.txt</value>
+ </property>
+ <property>
+ <name>dt.application.WordCountDemo.stream.wordinput.count.locality</name>
+ <value>CONTAINER_LOCAL</value>
+ <description>Specify container locality for the viewtuplecount stream
+ </description>
+ </property>
+</configuration>
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/0a5914ce/demos/wordcount/src/main/resources/WordDataSchema.json
----------------------------------------------------------------------
diff --git a/demos/wordcount/src/main/resources/WordDataSchema.json b/demos/wordcount/src/main/resources/WordDataSchema.json
new file mode 100644
index 0000000..5e8e7c0
--- /dev/null
+++ b/demos/wordcount/src/main/resources/WordDataSchema.json
@@ -0,0 +1,4 @@
+{
+ "values": [{"name": "word", "type": "string"},
+ {"name": "count", "type": "integer"}]
+}
[2/2] incubator-apex-malhar git commit: Merge branch 'TopNWords' into
devel-3
Posted by ti...@apache.org.
Merge branch 'TopNWords' into devel-3
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/ae5f1ede
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/ae5f1ede
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/ae5f1ede
Branch: refs/heads/devel-3
Commit: ae5f1ede5260e59bb1c2943cbf62d26d58a69b24
Parents: d710af9 0a5914c
Author: Timothy Farkas <ti...@datatorrent.com>
Authored: Thu Sep 24 13:56:31 2015 -0700
Committer: Timothy Farkas <ti...@datatorrent.com>
Committed: Thu Sep 24 13:56:31 2015 -0700
----------------------------------------------------------------------
demos/wordcount/pom.xml | 8 +
.../wordcount/ApplicationWithQuerySupport.java | 122 ++++++++
.../demos/wordcount/FileWordCount.java | 288 +++++++++++++++++++
.../datatorrent/demos/wordcount/LineReader.java | 95 ++++++
.../com/datatorrent/demos/wordcount/WCPair.java | 34 +++
.../demos/wordcount/WindowWordCount.java | 82 ++++++
.../demos/wordcount/WordCountWriter.java | 90 ++++++
.../datatorrent/demos/wordcount/WordReader.java | 69 +++++
.../src/main/resources/META-INF/properties.xml | 88 +++++-
.../src/main/resources/WordDataSchema.json | 4 +
10 files changed, 869 insertions(+), 11 deletions(-)
----------------------------------------------------------------------