You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by ti...@apache.org on 2015/09/24 22:59:08 UTC

[1/2] incubator-apex-malhar git commit: MLHR-1855 TopN word counts with visualization support

Repository: incubator-apex-malhar
Updated Branches:
  refs/heads/devel-3 d710af9b1 -> ae5f1ede5


MLHR-1855 TopN word counts with visualization support

  Fixed per review comments


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/0a5914ce
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/0a5914ce
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/0a5914ce

Branch: refs/heads/devel-3
Commit: 0a5914ce8066159fc71633e8dc4db12abd23d143
Parents: 9194a72
Author: Munagala V. Ramanath <ra...@apache.org>
Authored: Mon Sep 21 06:17:41 2015 -0700
Committer: Munagala V. Ramanath <ra...@apache.org>
Committed: Thu Sep 24 13:45:36 2015 -0700

----------------------------------------------------------------------
 demos/wordcount/pom.xml                         |   8 +
 .../wordcount/ApplicationWithQuerySupport.java  | 122 ++++++++
 .../demos/wordcount/FileWordCount.java          | 288 +++++++++++++++++++
 .../datatorrent/demos/wordcount/LineReader.java |  95 ++++++
 .../com/datatorrent/demos/wordcount/WCPair.java |  34 +++
 .../demos/wordcount/WindowWordCount.java        |  82 ++++++
 .../demos/wordcount/WordCountWriter.java        |  90 ++++++
 .../datatorrent/demos/wordcount/WordReader.java |  69 +++++
 .../src/main/resources/META-INF/properties.xml  |  88 +++++-
 .../src/main/resources/WordDataSchema.json      |   4 +
 10 files changed, 869 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/0a5914ce/demos/wordcount/pom.xml
----------------------------------------------------------------------
diff --git a/demos/wordcount/pom.xml b/demos/wordcount/pom.xml
index a3834ba..6535c66 100644
--- a/demos/wordcount/pom.xml
+++ b/demos/wordcount/pom.xml
@@ -21,4 +21,12 @@
     <skipTests>true</skipTests>
   </properties>
 
+  <dependencies>
+    <dependency>
+      <groupId>it.unimi.dsi</groupId>
+      <artifactId>fastutil</artifactId>
+      <version>6.6.4</version>
+    </dependency>
+  </dependencies>
+
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/0a5914ce/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/ApplicationWithQuerySupport.java
----------------------------------------------------------------------
diff --git a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/ApplicationWithQuerySupport.java b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/ApplicationWithQuerySupport.java
new file mode 100644
index 0000000..9b0b8d8
--- /dev/null
+++ b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/ApplicationWithQuerySupport.java
@@ -0,0 +1,122 @@
+/**
+ * Copyright (C) 2015 DataTorrent, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datatorrent.demos.wordcount;
+
+import java.net.URI;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.lang.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DAG.Locality;
+import com.datatorrent.api.Operator;
+
+import com.datatorrent.lib.appdata.schemas.SchemaUtils;
+import com.datatorrent.lib.appdata.snapshot.AppDataSnapshotServerMap;
+import com.datatorrent.lib.io.PubSubWebSocketAppDataQuery;
+import com.datatorrent.lib.io.PubSubWebSocketAppDataResult;
+import com.datatorrent.lib.io.ConsoleOutputOperator;
+import com.datatorrent.lib.stream.DevNull;
+
+import org.apache.hadoop.conf.Configuration;
+
+@ApplicationAnnotation(name="TopNWordsWithQueries")
+public class ApplicationWithQuerySupport implements StreamingApplication
+{
+  private static final Logger LOG = LoggerFactory.getLogger(ApplicationWithQuerySupport.class);
+
+  public static final String
+    SNAPSHOT_SCHEMA   = "WordDataSchema.json",
+    APP_NAME          = "TopNWordsWithQueries";
+
+  private final Locality locality = null;
+
+  @Override
+  public void populateDAG(DAG dag, Configuration conf)
+  {
+    // create operators
+    LineReader lineReader            = dag.addOperator("lineReader", new LineReader());
+    WordReader wordReader            = dag.addOperator("wordReader", new WordReader());
+    WindowWordCount windowWordCount  = dag.addOperator("windowWordCount", new WindowWordCount());
+    FileWordCount fileWordCount      = dag.addOperator("fileWordCount", new FileWordCount());
+    WordCountWriter wcWriter         = dag.addOperator("wcWriter", new WordCountWriter());
+    ConsoleOutputOperator console    = dag.addOperator("console", new ConsoleOutputOperator());
+    console.setStringFormat("wordCount: %s");
+
+    // create streams
+
+    dag.addStream("lines",   lineReader.output,  wordReader.input);
+    dag.addStream("control", lineReader.control, fileWordCount.control);
+    dag.addStream("words",   wordReader.output,  windowWordCount.input);
+    dag.addStream("windowWordCounts", windowWordCount.output, fileWordCount.input);
+    dag.addStream("fileWordCounts", fileWordCount.fileOutput, wcWriter.input);
+
+    String gatewayAddress = dag.getValue(DAG.GATEWAY_CONNECT_ADDRESS);
+
+    if ( ! StringUtils.isEmpty(gatewayAddress)) {        // add query support
+      URI uri = URI.create("ws://" + gatewayAddress + "/pubsub");
+
+      AppDataSnapshotServerMap snapshotServerFile
+        = dag.addOperator("snapshotServerFile", new AppDataSnapshotServerMap());
+      AppDataSnapshotServerMap snapshotServerGlobal
+        = dag.addOperator("snapshotServerGlobal", new AppDataSnapshotServerMap());
+
+      String snapshotServerJSON = SchemaUtils.jarResourceFileToString(SNAPSHOT_SCHEMA);
+      snapshotServerFile.setSnapshotSchemaJSON(snapshotServerJSON);
+      snapshotServerGlobal.setSnapshotSchemaJSON(snapshotServerJSON);
+
+      PubSubWebSocketAppDataQuery
+        wsQueryFile = new PubSubWebSocketAppDataQuery(),
+        wsQueryGlobal = new PubSubWebSocketAppDataQuery();
+      wsQueryFile.setUri(uri);
+      wsQueryGlobal.setUri(uri);
+
+      snapshotServerFile.setEmbeddableQueryInfoProvider(wsQueryFile);
+      snapshotServerGlobal.setEmbeddableQueryInfoProvider(wsQueryGlobal);
+
+      PubSubWebSocketAppDataResult wsResultFile
+        = dag.addOperator("wsResultFile", new PubSubWebSocketAppDataResult());
+      PubSubWebSocketAppDataResult wsResultGlobal
+        = dag.addOperator("wsResultGlobal", new PubSubWebSocketAppDataResult());
+      wsResultFile.setUri(uri);
+      wsResultGlobal.setUri(uri);
+
+      Operator.InputPort<String> queryResultFilePort = wsResultFile.input;
+      Operator.InputPort<String> queryResultGlobalPort = wsResultGlobal.input;
+
+      dag.addStream("WordCountsFile", fileWordCount.outputPerFile,
+                    snapshotServerFile.input, console.input);
+      dag.addStream("WordCountsGlobal", fileWordCount.outputGlobal,
+                    snapshotServerGlobal.input);
+
+      dag.addStream("ResultFile", snapshotServerFile.queryResult, queryResultFilePort);
+      dag.addStream("ResultGlobal", snapshotServerGlobal.queryResult, queryResultGlobalPort);
+    } else {
+      //throw new RuntimeException("Error: No GATEWAY_CONNECT_ADDRESS");
+      dag.addStream("WordCounts", fileWordCount.outputPerFile, console.input);
+    }
+
+    System.out.println("done with populateDAG, isDebugEnabled = " + LOG.isDebugEnabled());
+    LOG.info("Returning from populateDAG");
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/0a5914ce/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/FileWordCount.java
----------------------------------------------------------------------
diff --git a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/FileWordCount.java b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/FileWordCount.java
new file mode 100644
index 0000000..615fa5c
--- /dev/null
+++ b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/FileWordCount.java
@@ -0,0 +1,288 @@
+/**
+ * Copyright (C) 2015 DataTorrent, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datatorrent.demos.wordcount;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+//import org.apache.commons.lang.mutable.MutableInt;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.annotation.InputPortFieldAnnotation;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.common.util.BaseOperator;
+
+/**
+ * Monitors an input directory for text files, computes word frequency counts per file and globally,
+ * and writes the top N pairs to an output file and to snapshot servers for visualization.
+ * Currently designed to work with only 1 file at a time; will be enhanced later to support
+ * multiple files dropped into the monitored directory at the same time.
+ *
+ * <p>
+ * Receives per-window list of pairs (word, frequency) on the input port. When the end of a file
+ * is reached, expects to get an EOF on the control port; at the next endWindow, the top N words
+ * and frequencies are computed and emitted to the output ports.
+ * <p>
+ * There are 3 output ports: (a) One for the per-file top N counts emitted when the file is fully
+ * read and is written to the output file. (b) One for the top N counts emitted per window for the
+ * current file to the snapshot server and (c) One for the global top N counts emitted per window
+ * to a different snapshot server.
+ *
+ * Since the EOF is received by a single operator, this operator cannot be partitionable
+ */
+public class FileWordCount extends BaseOperator
+{
+  private static final Logger LOG = LoggerFactory.getLogger(FileWordCount.class);
+  private static final String GLOBAL = "global";
+
+  // If topN > 0, only data for the topN most frequent words is output; if topN == 0, the
+  // entire frequency map is output
+  //
+  protected int topN;
+
+  // set to true when we get an EOF control tuple
+  protected boolean eof = false;
+
+  // last component of path (i.e. only file name)
+  // incoming value from control tuple
+  protected String fileName;
+
+  // wordMapFile   : {word => frequency} map, current file, all words
+  // wordMapGlobal : {word => frequency} map, global, all words
+  //
+  protected Map<String, WCPair> wordMapFile = new HashMap<>();
+  protected Map<String, WCPair> wordMapGlobal = new HashMap<>();
+
+  // resultPerFile : singleton list [TopNMap] with per file data; sent on outputPerFile
+  // resultGlobal : singleton list [wordFreqMap] with per file data; sent on outputGlobal
+  //
+  protected transient List<Map<String, Object>> resultPerFile, resultGlobal;
+
+  // singleton map of fileName to sorted list of (word, frequency) pairs
+  protected transient Map<String, Object> resultFileFinal;
+  protected transient List<WCPair> fileFinalList;
+
+  public final transient DefaultInputPort<List<WCPair>> input = new DefaultInputPort<List<WCPair>>()
+  {
+    @Override
+    public void process(List<WCPair> list)
+    {
+      // blend incoming list into wordMapFile and wordMapGlobal
+      for (WCPair pair : list) {
+        final String word = pair.word;
+        WCPair filePair = wordMapFile.get(word);
+        if (null != filePair) {    // word seen previously in current file
+          WCPair globalPair = wordMapGlobal.get(word);    // cannot be null
+          filePair.freq += pair.freq;
+          globalPair.freq += pair.freq;
+          continue;
+        }
+
+        // new word in current file
+        filePair = new WCPair(word, pair.freq);
+        wordMapFile.put(word, filePair);
+
+        // check global map
+        WCPair globalPair = wordMapGlobal.get(word);    // may be null
+        if (null != globalPair) {    // word seen previously
+          globalPair.freq += pair.freq;
+          continue;
+        }
+
+        // word never seen before
+        globalPair = new WCPair(word, pair.freq);
+        wordMapGlobal.put(word, globalPair);
+      }
+    }
+  };
+
+  @InputPortFieldAnnotation(optional = true)
+  public final transient DefaultInputPort<String> control = new DefaultInputPort<String>()
+  {
+    @Override
+    public void process(String msg)
+    {
+      if (msg.isEmpty()) {    // sanity check
+        throw new RuntimeException("Empty file path");
+      }
+      LOG.info("FileWordCount: EOF for {}, topN = {}", msg, topN);
+      fileName = msg;
+      eof = true;
+      // NOTE: current version only supports processing one file at a time.
+    }
+  };
+
+  // outputPerFile -- tuple is TopNMap for current file
+  // outputGlobal --  tuple is TopNMap globally
+  //
+  public final transient DefaultOutputPort<List<Map<String, Object>>>
+    outputPerFile = new DefaultOutputPort<>();
+
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort<List<Map<String, Object>>>
+    outputGlobal = new DefaultOutputPort<>();
+
+  // fileOutput -- tuple is singleton map {<fileName> => TopNMap} where TopNMap is the final
+  //               top N for current file; emitted on EOF
+  //
+  public final transient DefaultOutputPort<Map<String, Object>>
+    fileOutput = new DefaultOutputPort<>();
+
+  public int getTopN() {
+    return topN;
+  }
+
+  public void setTopN(int n) {
+    topN = n;
+  }
+
+  @Override
+  public void setup(OperatorContext context)
+  {
+    if (null == wordMapFile) {
+      wordMapFile = new HashMap<>();
+    }
+    if (null == wordMapGlobal) {
+      wordMapGlobal = new HashMap<>();
+    }
+    resultPerFile = new ArrayList(1);
+    resultGlobal = new ArrayList(1);
+    // singleton map {<fileName> => fileFinalList}; cannot populate it yet since we need fileName
+    resultFileFinal = new HashMap<>(1);
+    fileFinalList = new ArrayList<>();
+  }
+
+  @Override
+  public void endWindow()
+  {
+    LOG.info("FileWordCount: endWindow for {}, topN = {}", fileName, topN);
+
+    if (wordMapFile.isEmpty()) {    // no words found
+      if (eof) {                    // write empty list to fileOutput port
+        // got EOF, so output empty list to output file
+        fileFinalList.clear();
+        resultFileFinal.put(fileName, fileFinalList);
+        fileOutput.emit(resultFileFinal);
+
+        // reset for next file
+        eof = false;
+        fileName = null;
+        resultFileFinal.clear();
+      }
+      LOG.info("FileWordCount: endWindow for {}, no words, topN = {}", fileName, topN);
+      return;
+    }
+
+    LOG.info("FileWordCount: endWindow for {}, wordMapFile.size = {}, topN = {}",
+             fileName, wordMapFile.size(), topN);
+
+    // have some words; need the file name to emit topN
+    if (null == fileName) {    // should never happen
+      throw new RuntimeException("No fileName at endWindow");
+    }
+
+    // get topN list for this file and, if we have EOF, emit to fileOutput port
+
+    // get topN global list and emit to global output port
+    getTopNMap(wordMapGlobal, resultGlobal);
+    LOG.info("FileWordCount: resultGlobal.size = {}", resultGlobal.size());
+    outputGlobal.emit(resultGlobal);
+
+    // get topN list for this file and emit to file output port
+    getTopNMap(wordMapFile, resultPerFile);
+    LOG.info("FileWordCount: resultPerFile.size = {}", resultPerFile.size());
+    outputPerFile.emit(resultPerFile);
+
+    if (eof) {
+      // got EOF, so compute final topN list from wordMapFile into fileFinalList and emit it
+      getTopNList(wordMapFile);
+      resultFileFinal.put(fileName, fileFinalList);
+      fileOutput.emit(resultFileFinal);
+
+      // reset for next file
+      eof = false;
+      fileName = null;
+      wordMapFile.clear();
+      resultFileFinal.clear();
+    }
+  }
+
+  // get topN frequencies from map, convert each pair to a singleton map and append to result
+  // This map is suitable input to AppDataSnapshotServer
+  // MUST have map.size() > 0 here
+  //
+  private void getTopNMap(final Map<String, WCPair> map, List<Map<String, Object>> result)
+  {
+    final ArrayList<WCPair> list = new ArrayList<>(map.values());
+
+    // sort entries in descending order of frequency
+    Collections.sort(list, new Comparator<WCPair>() {
+        @Override
+        public int compare(WCPair o1, WCPair o2) {
+          return (int)(o2.freq - o1.freq);
+        }
+    });
+  
+    if (topN > 0) {
+      list.subList(topN, map.size()).clear();      // retain only the first topN entries
+    }
+
+    // convert each pair (word, freq) of list to a map with 2 elements
+    // {("word": <word>, "count": freq)} and append to list
+    //
+    result.clear();
+    for (WCPair pair : list) {
+      Map<String, Object> wmap = new HashMap<>(2);
+      wmap.put("word", pair.word);
+      wmap.put("count", pair.freq);
+      result.add(wmap);
+    }
+    LOG.info("FileWordCount:getTopNMap: result.size = {}", result.size());
+    list.clear();
+  }
+
+  // populate fileFinalList with topN frequencies from argument
+  // This list is suitable input to WordCountWriter which writes it to a file
+  // MUST have map.size() > 0 here
+  //
+  private void getTopNList(final Map<String, WCPair> map)
+  {
+    fileFinalList.clear();
+    fileFinalList.addAll(map.values());
+
+    // sort entries in descending order of frequency
+    Collections.sort(fileFinalList, new Comparator<WCPair>() {
+        @Override
+        public int compare(WCPair o1, WCPair o2) {
+          return (int)(o2.freq - o1.freq);
+        }
+    });
+  
+    if (topN > 0) {
+      fileFinalList.subList(topN, map.size()).clear();      // retain only the first topN entries
+    }
+    LOG.info("FileWordCount:getTopNList: fileFinalList.size = {}", fileFinalList.size());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/0a5914ce/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/LineReader.java
----------------------------------------------------------------------
diff --git a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/LineReader.java b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/LineReader.java
new file mode 100644
index 0000000..b030356
--- /dev/null
+++ b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/LineReader.java
@@ -0,0 +1,95 @@
+/**
+ * Copyright (C) 2015 DataTorrent, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datatorrent.demos.wordcount;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+
+import org.apache.hadoop.fs.Path;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.lib.io.fs.AbstractFileInputOperator;
+
+// reads lines from input file and returns them; if end-of-file is reached, a control tuple
+// is emitted on the control port
+//
+public class LineReader extends AbstractFileInputOperator<String>
+{
+  private static final Logger LOG = LoggerFactory.getLogger(LineReader.class);
+
+  public final transient DefaultOutputPort<String> output  = new DefaultOutputPort<>();
+
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort<String> control = new DefaultOutputPort<>();
+
+  private transient BufferedReader br = null;
+
+  private Path path;
+
+  @Override
+  protected InputStream openFile(Path curPath) throws IOException
+  {
+    LOG.info("openFile: curPath = {}", curPath);
+    path = curPath;
+    InputStream is = super.openFile(path);
+    br = new BufferedReader(new InputStreamReader(is));
+    return is;
+  }
+
+  @Override
+  protected void closeFile(InputStream is) throws IOException
+  {
+    super.closeFile(is);
+    br.close();
+    br = null;
+    path = null;
+  }
+
+  // return empty string 
+  @Override
+  protected String readEntity() throws IOException
+  {
+    // try to read a line
+    final String line = br.readLine();
+    if (null != line) {    // common case
+      LOG.debug("readEntity: line = {}", line);
+      return line;
+    }
+
+    // end-of-file; send control tuple, containing only the last component of the path
+    // (only file name) on control port
+    //
+    if (control.isConnected()) {
+      LOG.info("readEntity: EOF for {}", path);
+      final String name = path.getName();    // final component of path
+      control.emit(name);
+    }
+
+    return null;
+  }
+
+  @Override
+  protected void emit(String tuple)
+  {
+    output.emit(tuple);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/0a5914ce/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WCPair.java
----------------------------------------------------------------------
diff --git a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WCPair.java b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WCPair.java
new file mode 100644
index 0000000..0eb1e72
--- /dev/null
+++ b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WCPair.java
@@ -0,0 +1,34 @@
+/**
+ * Copyright (C) 2015 DataTorrent, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datatorrent.demos.wordcount;
+
+// a single (word, frequency) pair
+public class WCPair {
+  public String word;
+  public int freq;
+
+  public WCPair() {}
+
+  public WCPair(String w, int f) {
+    word = w;
+    freq = f;
+  }
+  
+  @Override
+  public String toString() {
+    return String.format("(%s, %d)", word, freq);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/0a5914ce/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WindowWordCount.java
----------------------------------------------------------------------
diff --git a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WindowWordCount.java b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WindowWordCount.java
new file mode 100644
index 0000000..93f4f2f
--- /dev/null
+++ b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WindowWordCount.java
@@ -0,0 +1,82 @@
+/**
+ * Copyright (C) 2015 DataTorrent, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datatorrent.demos.wordcount;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.common.util.BaseOperator;
+
+// Computes word frequency counts per window and emits them at each endWindow. The output is a
+// list of pairs (word, frequency).
+//
+public class WindowWordCount extends BaseOperator
+{
+  private static final Logger LOG = LoggerFactory.getLogger(WindowWordCount.class);
+
+  // wordMap : word => frequency
+  protected Map<String, WCPair> wordMap = new HashMap<>();
+
+  public final transient DefaultInputPort<String> input = new DefaultInputPort<String>()
+  {
+    @Override
+    public void process(String word)
+    {
+      WCPair pair = wordMap.get(word);
+      if (null != pair) {    // word seen previously
+        pair.freq += 1;
+        return;
+      }
+
+      // new word
+      pair = new WCPair();
+      pair.word = word;
+      pair.freq = 1;
+      wordMap.put(word, pair);
+    }
+  };
+
+  // output port which emits the list of word frequencies for current window
+  // fileName => list of (word, freq) pairs
+  //
+  public final transient DefaultOutputPort<List<WCPair>> output = new DefaultOutputPort<>();
+
+  @Override
+  public void endWindow()
+  {
+    LOG.info("WindowWordCount: endWindow");
+
+    // got EOF; if no words found, do nothing
+    if (wordMap.isEmpty()) return;
+
+    // have some words; emit single map and reset for next file
+    final ArrayList<WCPair> list = new ArrayList<>(wordMap.values());
+    output.emit(list);
+    list.clear();
+    wordMap.clear();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/0a5914ce/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WordCountWriter.java
----------------------------------------------------------------------
diff --git a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WordCountWriter.java b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WordCountWriter.java
new file mode 100644
index 0000000..c1c58ef
--- /dev/null
+++ b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WordCountWriter.java
@@ -0,0 +1,90 @@
+/**
+ * Copyright (C) 2015 DataTorrent, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datatorrent.demos.wordcount;
+
+import java.io.UnsupportedEncodingException;
+
+import java.util.List;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.lib.io.fs.AbstractFileOutputOperator;
+
+// write top N words and their frequencies to a file
+//
+public class WordCountWriter extends AbstractFileOutputOperator<Map<String, Object>>
+{
+  private static final Logger LOG = LoggerFactory.getLogger(WordCountWriter.class);
+  private static final String charsetName = "UTF-8";
+  private static final String nl = System.lineSeparator();
+
+  private String fileName;    // current file name
+  private transient final StringBuilder sb = new StringBuilder();
+
+  @Override
+  public void endWindow()
+  {
+    if (null != fileName) {
+      requestFinalize(fileName);
+    }
+    super.endWindow();
+  }
+
+  // input is a singleton list [M] where M is a singleton map {fileName => L} where L is a
+  // list of pairs: (word, frequency)
+  //
+  @Override
+  protected String getFileName(Map<String, Object> tuple)
+  {
+    LOG.info("getFileName: tuple.size = {}", tuple.size());
+
+    final Map.Entry<String, Object> entry = tuple.entrySet().iterator().next();
+    fileName = entry.getKey();
+    LOG.info("getFileName: fileName = {}", fileName);
+    return fileName;
+  }
+
+  @Override
+  protected byte[] getBytesForTuple(Map<String, Object> tuple)
+  {
+    LOG.info("getBytesForTuple: tuple.size = {}", tuple.size());
+
+    // get first and only pair; key is the fileName and is ignored here
+    final Map.Entry<String, Object> entry = tuple.entrySet().iterator().next();
+    final List<WCPair> list = (List<WCPair>) entry.getValue();
+
+    if (sb.length() > 0) {        // clear buffer
+      sb.delete(0, sb.length());
+    }
+
+    for ( WCPair pair : list ) {
+      sb.append(pair.word); sb.append(" : ");
+      sb.append(pair.freq); sb.append(nl);
+    }
+
+    final String data = sb.toString();
+    LOG.info("getBytesForTuple: data = {}", data);
+    try {
+      final byte[] result = data.getBytes(charsetName);
+      return result;
+    } catch (UnsupportedEncodingException ex) {
+      throw new RuntimeException("Should never get here", ex);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/0a5914ce/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WordReader.java
----------------------------------------------------------------------
diff --git a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WordReader.java b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WordReader.java
new file mode 100644
index 0000000..b509540
--- /dev/null
+++ b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WordReader.java
@@ -0,0 +1,69 @@
+/**
+ * Copyright (C) 2015 DataTorrent, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datatorrent.demos.wordcount;
+
+import java.util.regex.Pattern;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.common.util.BaseOperator;
+
+// extracts words from input line
+public class WordReader extends BaseOperator
+{
+  // default pattern for word-separators
+  private static final Pattern nonWordDefault = Pattern.compile("[\\p{Punct}\\s]+");
+
+  private String nonWordStr;    // configurable regex
+  private transient Pattern nonWord;      // compiled regex
+
+  public final transient DefaultOutputPort<String> output = new DefaultOutputPort<>();
+
+  public final transient DefaultInputPort<String>
+    input = new DefaultInputPort<String>() {
+
+    @Override
+    public void process(String line)
+    {
+      // line; split it into words and emit them
+      final String[] words = nonWord.split(line);
+      for (String word : words) {
+        if (word.isEmpty()) continue;
+        output.emit(word);
+      }
+    }
+  };
+
+  public String getNonWordStr() {
+    return nonWordStr;
+  }
+
+  public void setNonWordStr(String regex) {
+    nonWordStr = regex;
+  }
+
+  @Override
+  public void setup(OperatorContext context)
+  {
+    if (null == nonWordStr) {
+      nonWord = nonWordDefault;
+    } else {
+      nonWord = Pattern.compile(nonWordStr);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/0a5914ce/demos/wordcount/src/main/resources/META-INF/properties.xml
----------------------------------------------------------------------
diff --git a/demos/wordcount/src/main/resources/META-INF/properties.xml b/demos/wordcount/src/main/resources/META-INF/properties.xml
index aa8d0bf..aaee620 100644
--- a/demos/wordcount/src/main/resources/META-INF/properties.xml
+++ b/demos/wordcount/src/main/resources/META-INF/properties.xml
@@ -1,12 +1,78 @@
 <configuration>
-	<property>
-		<name>dt.application.WordCountDemo.operator.wordinput.fileName</name>
-		<value>samplefile.txt</value>
-	</property>
-	<property>
-		<name>dt.application.WordCountDemo.stream.wordinput.count.locality</name>
-		<value>CONTAINER_LOCAL</value>
-		<description>Specify container locality for the viewtuplecount stream
-		</description>
-	</property>
-</configuration>
\ No newline at end of file
+  <!-- TopNWordsWithQueries -->
+
+  <!-- for debugging -->
+  <!--
+  <property>
+    <name>dt.attr.CONTAINER_JVM_OPTIONS</name>
+    <value>-Dlog4j.configuration=my_log4j.properties</value>
+  </property>
+  -->
+
+  <!-- monitored input directory -->
+  <property>
+    <name>dt.application.TopNWordsWithQueries.operator.lineReader.directory</name>
+    <value>/tmp/test/input-dir</value>
+  </property>
+
+  <!-- regular expression for word separator -->
+  <property>
+    <name>dt.application.TopNWordsWithQueries.operator.wordReader.nonWordStr</name>
+    <value>[\p{Punct}\s]+</value>
+  </property>
+
+  <!-- output directory for word counts -->
+  <property>
+    <name>dt.application.TopNWordsWithQueries.operator.wcWriter.filePath</name>
+    <value>/tmp/test/output-dir</value>
+  </property>
+
+  <!-- Top N value -->
+  <property>
+    <name>dt.application.TopNWordsWithQueries.operator.fileWordCount.topN</name>
+    <value>10</value>
+  </property>
+
+  <!-- topic for queries (current file) -->
+  <property>
+    <name>dt.application.TopNWordsWithQueries.operator.snapshotServerFile.embeddableQueryInfoProvider.topic</name>
+    <value>TopNWordsQueryFile</value>
+  </property>
+
+  <!-- topic for query results (current file)  -->
+  <property>
+    <name>dt.application.TopNWordsWithQueries.operator.wsResultFile.topic</name>
+    <value>TopNWordsQueryFileResult</value>
+  </property>
+
+  <!-- topic for queries (global) -->
+  <property>
+    <name>dt.application.TopNWordsWithQueries.operator.snapshotServerGlobal.embeddableQueryInfoProvider.topic</name>
+    <value>TopNWordsQueryGlobal</value>
+  </property>
+
+  <!-- topic for query results (global)  -->
+  <property>
+    <name>dt.application.TopNWordsWithQueries.operator.wsResultGlobal.topic</name>
+    <value>TopNWordsQueryGlobalResult</value>
+  </property>
+
+  <!-- retry count -->
+  <property>
+    <name>dt.application.TwitterDemo.operator.wsResult.numRetries</name>
+    <value>2147483647</value>
+  </property>
+
+
+  <!-- WordCountDemo -->
+  <property>
+    <name>dt.application.WordCountDemo.operator.wordinput.fileName</name>
+    <value>samplefile.txt</value>
+  </property>
+  <property>
+    <name>dt.application.WordCountDemo.stream.wordinput.count.locality</name>
+    <value>CONTAINER_LOCAL</value>
+    <description>Specify container locality for the viewtuplecount stream
+    </description>
+  </property>
+</configuration>

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/0a5914ce/demos/wordcount/src/main/resources/WordDataSchema.json
----------------------------------------------------------------------
diff --git a/demos/wordcount/src/main/resources/WordDataSchema.json b/demos/wordcount/src/main/resources/WordDataSchema.json
new file mode 100644
index 0000000..5e8e7c0
--- /dev/null
+++ b/demos/wordcount/src/main/resources/WordDataSchema.json
@@ -0,0 +1,4 @@
+{
+  "values": [{"name": "word", "type": "string"},
+             {"name": "count", "type": "integer"}]
+}


[2/2] incubator-apex-malhar git commit: Merge branch 'TopNWords' into devel-3

Posted by ti...@apache.org.
Merge branch 'TopNWords' into devel-3


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/ae5f1ede
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/ae5f1ede
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/ae5f1ede

Branch: refs/heads/devel-3
Commit: ae5f1ede5260e59bb1c2943cbf62d26d58a69b24
Parents: d710af9 0a5914c
Author: Timothy Farkas <ti...@datatorrent.com>
Authored: Thu Sep 24 13:56:31 2015 -0700
Committer: Timothy Farkas <ti...@datatorrent.com>
Committed: Thu Sep 24 13:56:31 2015 -0700

----------------------------------------------------------------------
 demos/wordcount/pom.xml                         |   8 +
 .../wordcount/ApplicationWithQuerySupport.java  | 122 ++++++++
 .../demos/wordcount/FileWordCount.java          | 288 +++++++++++++++++++
 .../datatorrent/demos/wordcount/LineReader.java |  95 ++++++
 .../com/datatorrent/demos/wordcount/WCPair.java |  34 +++
 .../demos/wordcount/WindowWordCount.java        |  82 ++++++
 .../demos/wordcount/WordCountWriter.java        |  90 ++++++
 .../datatorrent/demos/wordcount/WordReader.java |  69 +++++
 .../src/main/resources/META-INF/properties.xml  |  88 +++++-
 .../src/main/resources/WordDataSchema.json      |   4 +
 10 files changed, 869 insertions(+), 11 deletions(-)
----------------------------------------------------------------------