You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2017/03/07 06:58:21 UTC

[16/30] apex-malhar git commit: Renamed demos to examples. Packages and artifactid names are changed as suggested.

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/FileWordCount.java
----------------------------------------------------------------------
diff --git a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/FileWordCount.java b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/FileWordCount.java
deleted file mode 100644
index e8a91b2..0000000
--- a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/FileWordCount.java
+++ /dev/null
@@ -1,349 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.demos.wordcount;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.datatorrent.api.Context.OperatorContext;
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.annotation.InputPortFieldAnnotation;
-import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
-import com.datatorrent.common.util.BaseOperator;
-
-/**
- * Computes word frequencies per file and globally, and writes the top N pairs to an output file
- * and to snapshot servers for visualization.
- * Currently designed to work with only 1 file at a time; will be enhanced later to support
- * multiple files dropped into the monitored directory at the same time.
- *
- * <p>
- * Receives per-window list of pairs (word, frequency) on the input port. When the end of a file
- * is reached, expects to get an EOF on the control port; at the next endWindow, the top N words
- * and frequencies are computed and emitted to the output ports.
- * <p>
- * There are 3 output ports: (a) One for the per-file top N counts emitted when the file is fully
- * read and is written to the output file. (b) One for the top N counts emitted per window for the
- * current file to the snapshot server and (c) One for the global top N counts emitted per window
- * to a different snapshot server.
- *
- * Since the EOF is received by a single operator, this operator cannot be partitionable
- *
- * @since 3.2.0
- */
-public class FileWordCount extends BaseOperator
-{
-  private static final Logger LOG = LoggerFactory.getLogger(FileWordCount.class);
-  private static final String GLOBAL = "global";
-
-  /**
-   * If {@literal topN > 0}, only data for the topN most frequent words is output; if topN == 0, the
-   * entire frequency map is output
-   */
-  protected int topN;
-
-  /**
-   * Set to true when an EOF control tuple for the current input file is received; reset to false
-   * when the corresponding output file has been written.
-   */
-  protected boolean eof = false;
-
-  /**
-   * Last component of path (just the file name)
-   * incoming value from control tuple
-   */
-  protected String fileName;
-
-  /**
-   * {@literal (word => frequency)} map: current file, all words
-   */
-  protected Map<String, WCPair> wordMapFile = new HashMap<>();
-
-  /**
-   * {@literal (word => frequency)} map: global, all words
-   */
-  protected Map<String, WCPair> wordMapGlobal = new HashMap<>();
-
-  /**
-   * Singleton list with per file data; sent on {@code outputPerFile}
-   */
-  protected transient List<Map<String, Object>> resultPerFile;
-
-  /**
-   * Singleton list with global data; sent on {@code outputGlobal}
-   */
-  protected transient List<Map<String, Object>> resultGlobal;
-
-  /**
-   * Singleton map of {@code fileName} to sorted list of (word, frequency) pairs
-   */
-  protected transient Map<String, Object> resultFileFinal;
-
-  /**
-   * final list of (word, frequency) pairs written to output file
-   */
-  protected transient List<WCPair> fileFinalList;
-
-  /**
-   * Input port on which per-window {@literal (word => frequency)} map is received; the map
-   * is merged into {@code wordMapFile} and {@code wordMapGlobal}.
-   */
-  public final transient DefaultInputPort<List<WCPair>> input = new DefaultInputPort<List<WCPair>>()
-  {
-    @Override
-    public void process(List<WCPair> list)
-    {
-      // blend incoming list into wordMapFile and wordMapGlobal
-      for (WCPair pair : list) {
-        final String word = pair.word;
-        WCPair filePair = wordMapFile.get(word);
-        if (null != filePair) {    // word seen previously in current file
-          WCPair globalPair = wordMapGlobal.get(word);    // cannot be null
-          filePair.freq += pair.freq;
-          globalPair.freq += pair.freq;
-          continue;
-        }
-
-        // new word in current file
-        filePair = new WCPair(word, pair.freq);
-        wordMapFile.put(word, filePair);
-
-        // check global map
-        WCPair globalPair = wordMapGlobal.get(word);    // may be null
-        if (null != globalPair) {    // word seen previously
-          globalPair.freq += pair.freq;
-          continue;
-        }
-
-        // word never seen before
-        globalPair = new WCPair(word, pair.freq);
-        wordMapGlobal.put(word, globalPair);
-      }
-    }
-  };
-
-  /**
-   * Control port on which the current file name is received to indicate EOF
-   */
-  @InputPortFieldAnnotation(optional = true)
-  public final transient DefaultInputPort<String> control = new DefaultInputPort<String>()
-  {
-    @Override
-    public void process(String msg)
-    {
-      if (msg.isEmpty()) {    // sanity check
-        throw new RuntimeException("Empty file path");
-      }
-      LOG.info("FileWordCount: EOF for {}, topN = {}", msg, topN);
-      fileName = msg;
-      eof = true;
-      // NOTE: current version only supports processing one file at a time.
-    }
-  };
-
-  /**
-   * Output port for current file output
-   */
-  public final transient DefaultOutputPort<List<Map<String, Object>>>
-      outputPerFile = new DefaultOutputPort<>();
-
-  /**
-   * Output port for global output
-   */
-  @OutputPortFieldAnnotation(optional = true)
-  public final transient DefaultOutputPort<List<Map<String, Object>>>
-      outputGlobal = new DefaultOutputPort<>();
-
-  /**
-   * Tuple is singleton map {@code fileName => TopNMap} where {@code TopNMap} is the final
-   * top N pairs for current file and will be written to the output file; emitted in the
-   * {@code endWindow()} call after an EOF
-   */
-  public final transient DefaultOutputPort<Map<String, Object>>
-      fileOutput = new DefaultOutputPort<>();
-
-  /**
-   * Get the number of top (word, frequency) pairs that will be output
-   */
-  public int getTopN()
-  {
-    return topN;
-  }
-
-  /**
-   * Set the number of top (word, frequency) pairs that will be output
-   * @param n The new number
-   */
-  public void setTopN(int n)
-  {
-    topN = n;
-  }
-
-  /**
-   * {@inheritDoc}
-   * Initialize various map and list fields
-   */
-  @Override
-  public void setup(OperatorContext context)
-  {
-    if (null == wordMapFile) {
-      wordMapFile = new HashMap<>();
-    }
-    if (null == wordMapGlobal) {
-      wordMapGlobal = new HashMap<>();
-    }
-    resultPerFile = new ArrayList(1);
-    resultGlobal = new ArrayList(1);
-    // singleton map {<fileName> => fileFinalList}; cannot populate it yet since we need fileName
-    resultFileFinal = new HashMap<>(1);
-    fileFinalList = new ArrayList<>();
-  }
-
-  /**
-   * {@inheritDoc}
-   * This is where we do most of the work:
-   * 1. Sort global map and emit top N pairs
-   * 2. Sort current file map and emit top N pairs
-   * 3. If we've seen EOF, emit top N pairs on port connected to file writer and clear all per-file
-   *    data structures.
-   */
-  @Override
-  public void endWindow()
-  {
-    LOG.info("FileWordCount: endWindow for {}, topN = {}", fileName, topN);
-
-    if (wordMapFile.isEmpty()) {    // no words found
-      if (eof) {                    // write empty list to fileOutput port
-        // got EOF, so output empty list to output file
-        fileFinalList.clear();
-        resultFileFinal.put(fileName, fileFinalList);
-        fileOutput.emit(resultFileFinal);
-
-        // reset for next file
-        eof = false;
-        fileName = null;
-        resultFileFinal.clear();
-      }
-      LOG.info("FileWordCount: endWindow for {}, no words, topN = {}", fileName, topN);
-      return;
-    }
-
-    LOG.info("FileWordCount: endWindow for {}, wordMapFile.size = {}, topN = {}", fileName, wordMapFile.size(), topN);
-
-    // get topN list for this file and, if we have EOF, emit to fileOutput port
-
-    // get topN global list and emit to global output port
-    getTopNMap(wordMapGlobal, resultGlobal);
-    LOG.info("FileWordCount: resultGlobal.size = {}", resultGlobal.size());
-    outputGlobal.emit(resultGlobal);
-
-    // get topN list for this file and emit to file output port
-    getTopNMap(wordMapFile, resultPerFile);
-    LOG.info("FileWordCount: resultPerFile.size = {}", resultPerFile.size());
-    outputPerFile.emit(resultPerFile);
-
-    if (eof) {                     // got EOF earlier
-      if (null == fileName) {      // need file name to emit topN pairs to file writer
-        throw new RuntimeException("EOF but no fileName at endWindow");
-      }
-
-      // so compute final topN list from wordMapFile into fileFinalList and emit it
-      getTopNList(wordMapFile);
-      resultFileFinal.put(fileName, fileFinalList);
-      fileOutput.emit(resultFileFinal);
-
-      // reset for next file
-      eof = false;
-      fileName = null;
-      wordMapFile.clear();
-      resultFileFinal.clear();
-    }
-  }
-
-  /**
-   * Get topN frequencies from map, convert each pair to a singleton map and append to result
-   * This map is suitable input to AppDataSnapshotServer
-   * MUST have {@code map.size() > 0} here
-   */
-  private void getTopNMap(final Map<String, WCPair> map, List<Map<String, Object>> result)
-  {
-    final ArrayList<WCPair> list = new ArrayList<>(map.values());
-
-    // sort entries in descending order of frequency
-    Collections.sort(list, new Comparator<WCPair>()
-    {
-      @Override
-      public int compare(WCPair o1, WCPair o2)
-      {
-        return (int)(o2.freq - o1.freq);
-      }
-    });
-
-    if (topN > 0) {
-      list.subList(topN, map.size()).clear();      // retain only the first topN entries
-    }
-
-    // convert each pair (word, freq) of list to a map with 2 elements
-    // {("word": <word>, "count": freq)} and append to list
-    //
-    result.clear();
-    for (WCPair pair : list) {
-      Map<String, Object> wmap = new HashMap<>(2);
-      wmap.put("word", pair.word);
-      wmap.put("count", pair.freq);
-      result.add(wmap);
-    }
-    LOG.info("FileWordCount:getTopNMap: result.size = {}", result.size());
-    list.clear();
-  }
-
-  /**
-   * Populate fileFinalList with topN frequencies from argument
-   * This list is suitable input to WordCountWriter which writes it to a file
-   * MUST have {@code map.size() > 0} here
-   */
-  private void getTopNList(final Map<String, WCPair> map)
-  {
-    fileFinalList.clear();
-    fileFinalList.addAll(map.values());
-
-    // sort entries in descending order of frequency
-    Collections.sort(fileFinalList, new Comparator<WCPair>()
-    {
-      @Override
-      public int compare(WCPair o1, WCPair o2)
-      {
-        return (int)(o2.freq - o1.freq);
-      }
-    });
-
-    if (topN > 0) {
-      fileFinalList.subList(topN, map.size()).clear();      // retain only the first topN entries
-    }
-    LOG.info("FileWordCount:getTopNList: fileFinalList.size = {}", fileFinalList.size());
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/LineReader.java
----------------------------------------------------------------------
diff --git a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/LineReader.java b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/LineReader.java
deleted file mode 100644
index 8a1a57b..0000000
--- a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/LineReader.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.demos.wordcount;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.hadoop.fs.Path;
-
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
-import com.datatorrent.lib.io.fs.AbstractFileInputOperator;
-
-/**
- * Reads lines from input file and returns them. If EOF is reached, a control tuple
- * is emitted on the control port
- *
- * @since 3.2.0
- */
-public class LineReader extends AbstractFileInputOperator<String>
-{
-  private static final Logger LOG = LoggerFactory.getLogger(LineReader.class);
-
-  /**
-   * Output port on which lines from current file name are emitted
-   */
-  public final transient DefaultOutputPort<String> output  = new DefaultOutputPort<>();
-
-  /**
-   * Control port on which the current file name is emitted to indicate EOF
-   */
-  @OutputPortFieldAnnotation(optional = true)
-  public final transient DefaultOutputPort<String> control = new DefaultOutputPort<>();
-
-  private transient BufferedReader br = null;
-
-  private Path path;
-
-  /**
-   * File open callback; wrap the file input stream in a buffered reader for reading lines
-   * @param curPath The path to the file just opened
-   */
-  @Override
-  protected InputStream openFile(Path curPath) throws IOException
-  {
-    LOG.info("openFile: curPath = {}", curPath);
-    path = curPath;
-    InputStream is = super.openFile(path);
-    br = new BufferedReader(new InputStreamReader(is));
-    return is;
-  }
-
-  /**
-   * File close callback; close buffered reader
-   * @param is File input stream that will imminently be closed
-   */
-  @Override
-  protected void closeFile(InputStream is) throws IOException
-  {
-    super.closeFile(is);
-    br.close();
-    br = null;
-    path = null;
-  }
-
-  /**
-   * {@inheritDoc}
-   * If we hit EOF, emit file name on control port
-   */
-  @Override
-  protected String readEntity() throws IOException
-  {
-    // try to read a line
-    final String line = br.readLine();
-    if (null != line) {    // common case
-      LOG.debug("readEntity: line = {}", line);
-      return line;
-    }
-
-    // end-of-file; send control tuple, containing only the last component of the path
-    // (only file name) on control port
-    //
-    if (control.isConnected()) {
-      LOG.info("readEntity: EOF for {}", path);
-      final String name = path.getName();    // final component of path
-      control.emit(name);
-    }
-
-    return null;
-  }
-
-  @Override
-  protected void emit(String tuple)
-  {
-    output.emit(tuple);
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WCPair.java
----------------------------------------------------------------------
diff --git a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WCPair.java b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WCPair.java
deleted file mode 100644
index bb67622..0000000
--- a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WCPair.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.demos.wordcount;
-
-/**
- * A single (word, frequency) pair
- *
- * @since 3.2.0
- */
-public class WCPair
-{
-  /**
-   * The word
-   */
-  public String word;
-
-  /**
-   * The frequency
-   */
-  public int freq;
-
-  /**
-   * Default constructor
-   */
-  public WCPair()
-  {
-
-  }
-
-  /**
-   * Create new object with given values
-   * @param w The word
-   * @param f The frequency
-   */
-  public WCPair(String w, int f)
-  {
-    word = w;
-    freq = f;
-  }
-
-  @Override
-  public String toString()
-  {
-    return String.format("(%s, %d)", word, freq);
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WindowWordCount.java
----------------------------------------------------------------------
diff --git a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WindowWordCount.java b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WindowWordCount.java
deleted file mode 100644
index 0edfd1e..0000000
--- a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WindowWordCount.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.demos.wordcount;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.common.util.BaseOperator;
-
-/**
- * Computes word frequencies per window and emits them at each {@code endWindow()}. The output is a
- * list of (word, frequency) pairs
- *
- * @since 3.2.0
- */
-public class WindowWordCount extends BaseOperator
-{
-  private static final Logger LOG = LoggerFactory.getLogger(WindowWordCount.class);
-
-  /** {@literal (word => frequency)} map for current window */
-  protected Map<String, WCPair> wordMap = new HashMap<>();
-
-  /**
-   * Input port on which words are received
-   */
-  public final transient DefaultInputPort<String> input = new DefaultInputPort<String>()
-  {
-    @Override
-    public void process(String word)
-    {
-      WCPair pair = wordMap.get(word);
-      if (null != pair) {    // word seen previously
-        pair.freq += 1;
-        return;
-      }
-
-      // new word
-      pair = new WCPair();
-      pair.word = word;
-      pair.freq = 1;
-      wordMap.put(word, pair);
-    }
-  };
-
-  /**
-   * Output port which emits the list of word frequencies for current window
-   */
-  public final transient DefaultOutputPort<List<WCPair>> output = new DefaultOutputPort<>();
-
-  /**
-   * {@inheritDoc}
-   * If we've seen some words in this window, emit the map and clear it for next window
-   */
-  @Override
-  public void endWindow()
-  {
-    LOG.info("WindowWordCount: endWindow");
-
-    // got EOF; if no words found, do nothing
-    if (wordMap.isEmpty()) {
-      return;
-    }
-
-    // have some words; emit single map and reset for next file
-    final ArrayList<WCPair> list = new ArrayList<>(wordMap.values());
-    output.emit(list);
-    list.clear();
-    wordMap.clear();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WordCountInputOperator.java
----------------------------------------------------------------------
diff --git a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WordCountInputOperator.java b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WordCountInputOperator.java
deleted file mode 100644
index 3a88bab..0000000
--- a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WordCountInputOperator.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.demos.wordcount;
-
-import java.io.BufferedReader;
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.datatorrent.lib.io.SimpleSinglePortInputOperator;
-
-/**
- * <p>WordCountInputOperator class.</p>
- *
- * @since 0.3.2
- */
-public class WordCountInputOperator extends SimpleSinglePortInputOperator<String> implements Runnable
-{
-
-  private static final Logger logger = LoggerFactory.getLogger(WordCountInputOperator.class);
-  protected long averageSleep = 300;
-  protected long sleepPlusMinus = 100;
-  protected String fileName = "com/datatorrent/demos/wordcount/samplefile.txt";
-
-  public void setAverageSleep(long as)
-  {
-    averageSleep = as;
-  }
-
-  public void setSleepPlusMinus(long spm)
-  {
-    sleepPlusMinus = spm;
-  }
-
-  public void setFileName(String fn)
-  {
-    fileName = fn;
-  }
-
-  @Override
-  public void run()
-  {
-    BufferedReader br = null;
-    DataInputStream in = null;
-    InputStream fstream = null;
-
-    while (true) {
-      try {
-        String line;
-        fstream = this.getClass().getClassLoader().getResourceAsStream(fileName);
-
-        in = new DataInputStream(fstream);
-        br = new BufferedReader(new InputStreamReader(in));
-
-        while ((line = br.readLine()) != null) {
-          String[] words = line.trim().split("[\\p{Punct}\\s\\\"\\'\u201c\u201d]+");
-          for (String word : words) {
-            word = word.trim().toLowerCase();
-            if (!word.isEmpty()) {
-              outputPort.emit(word);
-            }
-          }
-          try {
-            Thread.sleep(averageSleep + (new Double(sleepPlusMinus * (Math.random() * 2 - 1))).longValue());
-          } catch (InterruptedException ex) {
-            // nothing
-          }
-        }
-
-      } catch (IOException ex) {
-        logger.debug(ex.toString());
-      } finally {
-        try {
-          if (br != null) {
-            br.close();
-          }
-          if (in != null) {
-            in.close();
-          }
-          if (fstream != null) {
-            fstream.close();
-          }
-        } catch (IOException exc) {
-          // nothing
-        }
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WordCountWriter.java
----------------------------------------------------------------------
diff --git a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WordCountWriter.java b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WordCountWriter.java
deleted file mode 100644
index 30aab10..0000000
--- a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WordCountWriter.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.demos.wordcount;
-
-import java.io.UnsupportedEncodingException;
-
-import java.util.List;
-import java.util.Map;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.datatorrent.lib.io.fs.AbstractFileOutputOperator;
-
-/**
- * Write top N words and their frequencies to a file
- *
- * @since 3.2.0
- */
-public class WordCountWriter extends AbstractFileOutputOperator<Map<String, Object>>
-{
-  private static final Logger LOG = LoggerFactory.getLogger(WordCountWriter.class);
-  private static final String charsetName = "UTF-8";
-  private static final String nl = System.lineSeparator();
-
-  private String fileName;    // current file name
-  private final transient StringBuilder sb = new StringBuilder();
-
-  /**
-   * {@inheritDoc}
-   * Invoke requestFinalize() to create the output file with the desired name without decorations.
-   */
-  @Override
-  public void endWindow()
-  {
-    if (null != fileName) {
-      requestFinalize(fileName);
-    }
-    super.endWindow();
-  }
-
-  /**
-   * Extracts file name from argument
-   * @param tuple Singleton map {@literal (fileName => L) where L is a list of (word, frequency) pairs}
-   * @return the file name to write the tuple to
-   */
-  @Override
-  protected String getFileName(Map<String, Object> tuple)
-  {
-    LOG.info("getFileName: tuple.size = {}", tuple.size());
-
-    final Map.Entry<String, Object> entry = tuple.entrySet().iterator().next();
-    fileName = entry.getKey();
-    LOG.info("getFileName: fileName = {}", fileName);
-    return fileName;
-  }
-
-  /**
-   * Extracts output file content from argument
-   * @param tuple Singleton map {@literal (fileName => L) where L is a list of (word, frequency) pairs}
-   * @return input tuple converted to an array of bytes
-   */
-  @Override
-  protected byte[] getBytesForTuple(Map<String, Object> tuple)
-  {
-    LOG.info("getBytesForTuple: tuple.size = {}", tuple.size());
-
-    // get first and only pair; key is the fileName and is ignored here
-    final Map.Entry<String, Object> entry = tuple.entrySet().iterator().next();
-    final List<WCPair> list = (List<WCPair>)entry.getValue();
-
-    if (sb.length() > 0) {        // clear buffer
-      sb.delete(0, sb.length());
-    }
-
-    for ( WCPair pair : list ) {
-      sb.append(pair.word);
-      sb.append(" : ");
-      sb.append(pair.freq);
-      sb.append(nl);
-    }
-
-    final String data = sb.toString();
-    LOG.info("getBytesForTuple: data = {}", data);
-    try {
-      final byte[] result = data.getBytes(charsetName);
-      return result;
-    } catch (UnsupportedEncodingException ex) {
-      throw new RuntimeException("Should never get here", ex);
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WordReader.java
----------------------------------------------------------------------
diff --git a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WordReader.java b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WordReader.java
deleted file mode 100644
index 58c44b4..0000000
--- a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WordReader.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.demos.wordcount;
-
-import java.util.regex.Pattern;
-
-import com.datatorrent.api.Context.OperatorContext;
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.common.util.BaseOperator;
-
-/**
- * Extracts words from input line
- *
- * @since 3.3.0
- */
-public class WordReader extends BaseOperator
-{
-  // default pattern for word-separators
-  private static final Pattern nonWordDefault = Pattern.compile("[\\p{Punct}\\s]+");
-
-  private String nonWordStr;              // configurable regex
-  private transient Pattern nonWord;      // compiled regex
-
-  /**
-   * Output port on which words from the current file are emitted
-   */
-  public final transient DefaultOutputPort<String> output = new DefaultOutputPort<>();
-
-  /**
-   * Input port on which lines from the current file are received
-   */
-  public final transient DefaultInputPort<String> input = new DefaultInputPort<String>()
-  {
-
-    @Override
-    public void process(String line)
-    {
-      // line; split it into words and emit them
-      final String[] words = nonWord.split(line);
-      for (String word : words) {
-        if (word.isEmpty()) {
-          continue;
-        }
-        output.emit(word);
-      }
-    }
-  };
-
-  /**
-   * Returns the regular expression that matches strings between words
-   * @return Regular expression for strings that separate words
-   */
-  public String getNonWordStr()
-  {
-    return nonWordStr;
-  }
-
-  /**
-   * Sets the regular expression that matches strings between words
-   * @param regex New regular expression for strings that separate words
-   */
-  public void setNonWordStr(String regex)
-  {
-    nonWordStr = regex;
-  }
-
-  /**
-   * {@inheritDoc}
-   * Set nonWord to the default pattern if necessary
-   */
-  @Override
-  public void setup(OperatorContext context)
-  {
-    if (null == nonWordStr) {
-      nonWord = nonWordDefault;
-    } else {
-      nonWord = Pattern.compile(nonWordStr);
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/doc-files/UniqueWordCounter.jpg
----------------------------------------------------------------------
diff --git a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/doc-files/UniqueWordCounter.jpg b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/doc-files/UniqueWordCounter.jpg
deleted file mode 100644
index 054baed..0000000
Binary files a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/doc-files/UniqueWordCounter.jpg and /dev/null differ

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/package-info.java
----------------------------------------------------------------------
diff --git a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/package-info.java b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/package-info.java
deleted file mode 100644
index d00397d..0000000
--- a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-/**
- * Streaming word count demonstration application.
- */
-package com.datatorrent.demos.wordcount;

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/wordcount/src/main/resources/META-INF/properties.xml
----------------------------------------------------------------------
diff --git a/demos/wordcount/src/main/resources/META-INF/properties.xml b/demos/wordcount/src/main/resources/META-INF/properties.xml
deleted file mode 100644
index 1d3594e..0000000
--- a/demos/wordcount/src/main/resources/META-INF/properties.xml
+++ /dev/null
@@ -1,98 +0,0 @@
-<!--
-
-    Licensed to the Apache Software Foundation (ASF) under one
-    or more contributor license agreements.  See the NOTICE file
-    distributed with this work for additional information
-    regarding copyright ownership.  The ASF licenses this file
-    to you under the Apache License, Version 2.0 (the
-    "License"); you may not use this file except in compliance
-    with the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-    Unless required by applicable law or agreed to in writing,
-    software distributed under the License is distributed on an
-    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-    KIND, either express or implied.  See the License for the
-    specific language governing permissions and limitations
-    under the License.
-
--->
-<configuration>
-  <!-- TopNWordsWithQueries -->
-
-  <!-- for debugging -->
-  <!--
-  <property>
-    <name>dt.attr.CONTAINER_JVM_OPTIONS</name>
-    <value>-Dlog4j.configuration=my_log4j.properties</value>
-  </property>
-  -->
-
-  <!-- monitored input directory -->
-  <property>
-    <name>dt.application.TopNWordsWithQueries.operator.lineReader.directory</name>
-    <value>/tmp/test/input-dir</value>
-  </property>
-
-  <!-- regular expression for word separator -->
-  <property>
-    <name>dt.application.TopNWordsWithQueries.operator.wordReader.nonWordStr</name>
-    <value>[\p{Punct}\s]+</value>
-  </property>
-
-  <!-- output directory for word counts -->
-  <property>
-    <name>dt.application.TopNWordsWithQueries.operator.wcWriter.filePath</name>
-    <value>/tmp/test/output-dir</value>
-  </property>
-
-  <!-- Top N value -->
-  <property>
-    <name>dt.application.TopNWordsWithQueries.operator.fileWordCount.topN</name>
-    <value>10</value>
-  </property>
-
-  <!-- topic for queries (current file) -->
-  <property>
-    <name>dt.application.TopNWordsWithQueries.operator.snapshotServerFile.embeddableQueryInfoProvider.topic</name>
-    <value>TopNWordsQueryFile</value>
-  </property>
-
-  <!-- topic for query results (current file)  -->
-  <property>
-    <name>dt.application.TopNWordsWithQueries.operator.wsResultFile.topic</name>
-    <value>TopNWordsQueryFileResult</value>
-  </property>
-
-  <!-- topic for queries (global) -->
-  <property>
-    <name>dt.application.TopNWordsWithQueries.operator.snapshotServerGlobal.embeddableQueryInfoProvider.topic</name>
-    <value>TopNWordsQueryGlobal</value>
-  </property>
-
-  <!-- topic for query results (global)  -->
-  <property>
-    <name>dt.application.TopNWordsWithQueries.operator.wsResultGlobal.topic</name>
-    <value>TopNWordsQueryGlobalResult</value>
-  </property>
-
-  <!-- retry count -->
-  <property>
-    <name>dt.application.TwitterDemo.operator.wsResult.numRetries</name>
-    <value>2147483647</value>
-  </property>
-
-
-  <!-- WordCountDemo -->
-  <property>
-    <name>dt.application.WordCountDemo.operator.wordinput.fileName</name>
-    <value>samplefile.txt</value>
-  </property>
-  <property>
-    <name>dt.application.WordCountDemo.stream.wordinput.count.locality</name>
-    <value>CONTAINER_LOCAL</value>
-    <description>Specify container locality for the viewtuplecount stream
-    </description>
-  </property>
-</configuration>

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/wordcount/src/main/resources/WordDataSchema.json
----------------------------------------------------------------------
diff --git a/demos/wordcount/src/main/resources/WordDataSchema.json b/demos/wordcount/src/main/resources/WordDataSchema.json
deleted file mode 100644
index 5e8e7c0..0000000
--- a/demos/wordcount/src/main/resources/WordDataSchema.json
+++ /dev/null
@@ -1,4 +0,0 @@
-{
-  "values": [{"name": "word", "type": "string"},
-             {"name": "count", "type": "integer"}]
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/wordcount/src/main/resources/com/datatorrent/demos/wordcount/samplefile.txt
----------------------------------------------------------------------
diff --git a/demos/wordcount/src/main/resources/com/datatorrent/demos/wordcount/samplefile.txt b/demos/wordcount/src/main/resources/com/datatorrent/demos/wordcount/samplefile.txt
deleted file mode 100644
index 83eaaed..0000000
--- a/demos/wordcount/src/main/resources/com/datatorrent/demos/wordcount/samplefile.txt
+++ /dev/null
@@ -1 +0,0 @@
-CONTENT DELETED

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/wordcount/src/main/resources/samplefile.txt
----------------------------------------------------------------------
diff --git a/demos/wordcount/src/main/resources/samplefile.txt b/demos/wordcount/src/main/resources/samplefile.txt
deleted file mode 100644
index 02a5e70..0000000
--- a/demos/wordcount/src/main/resources/samplefile.txt
+++ /dev/null
@@ -1,2 +0,0 @@
-CONTENT DELETED
-

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/wordcount/src/site/conf/my-app-conf1.xml
----------------------------------------------------------------------
diff --git a/demos/wordcount/src/site/conf/my-app-conf1.xml b/demos/wordcount/src/site/conf/my-app-conf1.xml
deleted file mode 100644
index f35873b..0000000
--- a/demos/wordcount/src/site/conf/my-app-conf1.xml
+++ /dev/null
@@ -1,27 +0,0 @@
-<?xml version="1.0" encoding="UTF-8" standalone="no"?>
-<!--
-
-    Licensed to the Apache Software Foundation (ASF) under one
-    or more contributor license agreements.  See the NOTICE file
-    distributed with this work for additional information
-    regarding copyright ownership.  The ASF licenses this file
-    to you under the Apache License, Version 2.0 (the
-    "License"); you may not use this file except in compliance
-    with the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-    Unless required by applicable law or agreed to in writing,
-    software distributed under the License is distributed on an
-    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-    KIND, either express or implied.  See the License for the
-    specific language governing permissions and limitations
-    under the License.
-
--->
-<configuration>
-  <property>
-    <name>dt.attr.MASTER_MEMORY_MB</name>
-    <value>1024</value>
-  </property>
-</configuration>

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/wordcount/src/test/java/com/datatorrent/demos/wordcount/ApplicationTest.java
----------------------------------------------------------------------
diff --git a/demos/wordcount/src/test/java/com/datatorrent/demos/wordcount/ApplicationTest.java b/demos/wordcount/src/test/java/com/datatorrent/demos/wordcount/ApplicationTest.java
deleted file mode 100644
index 1df0459..0000000
--- a/demos/wordcount/src/test/java/com/datatorrent/demos/wordcount/ApplicationTest.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.demos.wordcount;
-
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.hadoop.conf.Configuration;
-import com.datatorrent.api.LocalMode;
-
-/**
- *
- */
-public class ApplicationTest
-{
-  private final transient Logger LOG = LoggerFactory.getLogger(ApplicationTest.class);
-  public ApplicationTest()
-  {
-  }
-
-  @Test
-  public void testSomeMethod() throws Exception
-  {
-    LocalMode lma = LocalMode.newInstance();
-    Configuration conf = new Configuration(false);
-    conf.addResource("dt-site-wordcount.xml");
-    lma.prepareDAG(new Application(), conf);
-    LocalMode.Controller lc = lma.getController();
-    long start = System.currentTimeMillis();
-    lc.run(300000);
-    long end = System.currentTimeMillis();
-    long time = end - start;
-    LOG.debug("Test used " + time + " ms");
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/wordcount/src/test/resources/dt-site-wordcount.xml
----------------------------------------------------------------------
diff --git a/demos/wordcount/src/test/resources/dt-site-wordcount.xml b/demos/wordcount/src/test/resources/dt-site-wordcount.xml
deleted file mode 100644
index a25dac4..0000000
--- a/demos/wordcount/src/test/resources/dt-site-wordcount.xml
+++ /dev/null
@@ -1,37 +0,0 @@
-<!--
-
-    Licensed to the Apache Software Foundation (ASF) under one
-    or more contributor license agreements.  See the NOTICE file
-    distributed with this work for additional information
-    regarding copyright ownership.  The ASF licenses this file
-    to you under the Apache License, Version 2.0 (the
-    "License"); you may not use this file except in compliance
-    with the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-    Unless required by applicable law or agreed to in writing,
-    software distributed under the License is distributed on an
-    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-    KIND, either express or implied.  See the License for the
-    specific language governing permissions and limitations
-    under the License.
-
--->
-<configuration>
-	<property>
-		<name>dt.application.WordCountDemo.class</name>
-		<value>com.datatorrent.demos.wordcount.Application</value>
-		<description>An alias for the application</description>
-	</property>
-	<property>
-		<name>dt.application.WordCountDemo.operator.wordinput.fileName</name>
-		<value>samplefile.txt</value>
-	</property>
-	<property>
-		<name>dt.application.WordCountDemo.stream.wordinput.count.locality</name>
-		<value>CONTAINER_LOCAL</value>
-		<description>Specify container locality for the viewtuplecount stream
-		</description>
-	</property>
-</configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/wordcount/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/demos/wordcount/src/test/resources/log4j.properties b/demos/wordcount/src/test/resources/log4j.properties
deleted file mode 100644
index cf0d19e..0000000
--- a/demos/wordcount/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,43 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-log4j.rootLogger=DEBUG,CONSOLE
-
-log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
-log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
-log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
-log4j.appender.CONSOLE.threshold=${test.log.console.threshold}
-test.log.console.threshold=DEBUG
-
-log4j.appender.RFA=org.apache.log4j.RollingFileAppender
-log4j.appender.RFA.layout=org.apache.log4j.PatternLayout
-log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
-log4j.appender.RFA.File=/tmp/app.log
-
-# to enable, add SYSLOG to rootLogger
-log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender
-log4j.appender.SYSLOG.syslogHost=127.0.0.1
-log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout
-log4j.appender.SYSLOG.layout.conversionPattern=${dt.cid} %-5p [%t] %c{2} %x - %m%n
-log4j.appender.SYSLOG.Facility=LOCAL1
-
-log4j.logger.org=info
-#log4j.logger.org.apache.commons.beanutils=warn
-log4j.logger.com.datatorrent=debug
-log4j.logger.org.apache.apex=debug

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/yahoofinance/pom.xml
----------------------------------------------------------------------
diff --git a/demos/yahoofinance/pom.xml b/demos/yahoofinance/pom.xml
deleted file mode 100644
index 819a475..0000000
--- a/demos/yahoofinance/pom.xml
+++ /dev/null
@@ -1,65 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-
-    Licensed to the Apache Software Foundation (ASF) under one
-    or more contributor license agreements.  See the NOTICE file
-    distributed with this work for additional information
-    regarding copyright ownership.  The ASF licenses this file
-    to you under the Apache License, Version 2.0 (the
-    "License"); you may not use this file except in compliance
-    with the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-    Unless required by applicable law or agreed to in writing,
-    software distributed under the License is distributed on an
-    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-    KIND, either express or implied.  See the License for the
-    specific language governing permissions and limitations
-    under the License.
-
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-  <modelVersion>4.0.0</modelVersion>
-
-  <artifactId>yahoo-finance-demo</artifactId>
-  <packaging>jar</packaging>
-
-  <name>Apache Apex Malhar Yahoo! Finance Demo</name>
-  <description>Apex demo applications that get Yahoo finance feed and calculate minute price range, minute volume and simple moving average.</description>
-
-  <parent>
-    <groupId>org.apache.apex</groupId>
-    <artifactId>malhar-demos</artifactId>
-    <version>3.7.0-SNAPSHOT</version>
-  </parent>
-
-  <properties>
-    <skipTests>true</skipTests>
-  </properties>
-
-  <dependencies>
-    <dependency>
-      <groupId>net.sf.opencsv</groupId>
-      <artifactId>opencsv</artifactId>
-      <version>2.0</version>
-    </dependency>    
-    <dependency>
-      <groupId>org.apache.derby</groupId>
-      <artifactId>derby</artifactId>
-      <version>10.9.1.0</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.apex</groupId>
-      <artifactId>malhar-contrib</artifactId>
-      <version>${project.version}</version>
-      <exclusions>
-        <exclusion>
-          <groupId>*</groupId>
-          <artifactId>*</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-  </dependencies>
-
-</project>

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/yahoofinance/src/assemble/appPackage.xml
----------------------------------------------------------------------
diff --git a/demos/yahoofinance/src/assemble/appPackage.xml b/demos/yahoofinance/src/assemble/appPackage.xml
deleted file mode 100644
index 4138cf2..0000000
--- a/demos/yahoofinance/src/assemble/appPackage.xml
+++ /dev/null
@@ -1,59 +0,0 @@
-<!--
-
-    Licensed to the Apache Software Foundation (ASF) under one
-    or more contributor license agreements.  See the NOTICE file
-    distributed with this work for additional information
-    regarding copyright ownership.  The ASF licenses this file
-    to you under the Apache License, Version 2.0 (the
-    "License"); you may not use this file except in compliance
-    with the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-    Unless required by applicable law or agreed to in writing,
-    software distributed under the License is distributed on an
-    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-    KIND, either express or implied.  See the License for the
-    specific language governing permissions and limitations
-    under the License.
-
--->
-<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
-    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-    xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd">
-  <id>appPackage</id>
-  <formats>
-    <format>jar</format>
-  </formats>
-  <includeBaseDirectory>false</includeBaseDirectory>
-  <fileSets>
-    <fileSet>
-      <directory>${basedir}/target/</directory>
-      <outputDirectory>/app</outputDirectory>
-      <includes>
-        <include>${project.artifactId}-${project.version}.jar</include>
-      </includes>
-    </fileSet>
-    <fileSet>
-      <directory>${basedir}/target/deps</directory>
-      <outputDirectory>/lib</outputDirectory>
-    </fileSet>
-    <fileSet>
-      <directory>${basedir}/src/site/conf</directory>
-      <outputDirectory>/conf</outputDirectory>
-      <includes>
-        <include>*.xml</include>
-      </includes>
-    </fileSet>
-    <fileSet>
-      <directory>${basedir}/src/main/resources/META-INF</directory>
-      <outputDirectory>/META-INF</outputDirectory>
-    </fileSet>
-    <fileSet>
-      <directory>${basedir}/src/main/resources/app</directory>
-      <outputDirectory>/app</outputDirectory>
-    </fileSet>
-  </fileSets>
-
-</assembly>
-

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/ApplicationWithDerbySQL.java
----------------------------------------------------------------------
diff --git a/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/ApplicationWithDerbySQL.java b/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/ApplicationWithDerbySQL.java
deleted file mode 100644
index 1a38495..0000000
--- a/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/ApplicationWithDerbySQL.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.demos.yahoofinance;
-
-import org.apache.apex.malhar.contrib.misc.streamquery.AbstractSqlStreamOperator;
-import org.apache.apex.malhar.contrib.misc.streamquery.DerbySqlStreamOperator;
-import org.apache.hadoop.conf.Configuration;
-
-import com.datatorrent.api.DAG;
-import com.datatorrent.api.StreamingApplication;
-import com.datatorrent.api.annotation.ApplicationAnnotation;
-import com.datatorrent.lib.io.ConsoleOutputOperator;
-
-/**
- * This demo will output the stock market data from yahoo finance
- *
- * @since 0.3.2
- */
-@ApplicationAnnotation(name = "YahooFinanceWithDerbySQLDemo")
-public class ApplicationWithDerbySQL implements StreamingApplication
-{
-  @Override
-  public void populateDAG(DAG dag, Configuration conf)
-  {
-    String symbolStr = conf.get(ApplicationWithDerbySQL.class.getName() + ".tickerSymbols", "YHOO,GOOG,AAPL,FB,AMZN,NFLX,IBM");
-
-    String[] symbols = symbolStr.split(",");
-
-    YahooFinanceCSVInputOperator input1 = dag.addOperator("input1", new YahooFinanceCSVInputOperator());
-    YahooFinanceCSVInputOperator input2 = dag.addOperator("input2", new YahooFinanceCSVInputOperator());
-    DerbySqlStreamOperator sqlOper = dag.addOperator("sqlOper", new DerbySqlStreamOperator());
-    ConsoleOutputOperator consoleOperator = dag.addOperator("console", new ConsoleOutputOperator());
-
-    for (String symbol : symbols) {
-      input1.addSymbol(symbol);
-      input2.addSymbol(symbol);
-    }
-    input1.addFormat("s0");
-    input1.addFormat("l1");
-    input2.addFormat("s0");
-    input2.addFormat("e0");
-    input2.addFormat("b4");
-
-    AbstractSqlStreamOperator.InputSchema inputSchema1 = new AbstractSqlStreamOperator.InputSchema("t1");
-    AbstractSqlStreamOperator.InputSchema inputSchema2 = new AbstractSqlStreamOperator.InputSchema("t2");
-    inputSchema1.setColumnInfo("s0", "varchar(100)", true); // symbol
-    inputSchema1.setColumnInfo("l1", "float", false);  // last trade
-    inputSchema2.setColumnInfo("s0", "varchar(100)", true); // symbol
-    inputSchema2.setColumnInfo("e0", "float", false);  // EPS
-    inputSchema2.setColumnInfo("b4", "float", false);  // Book value
-
-    sqlOper.setInputSchema(0, inputSchema1);
-    sqlOper.setInputSchema(1, inputSchema2);
-
-    // Calculate PE Ratio and PB Ratio using SQL
-    sqlOper.addExecStatementString("SELECT SESSION.t1.s0 AS symbol, SESSION.t1.l1 / SESSION.t2.e0 AS pe_ratio, SESSION.t1.l1 / SESSION.t2.b4 AS pb_ratio FROM SESSION.t1,SESSION.t2 WHERE SESSION.t1.s0 = SESSION.t2.s0");
-
-    dag.addStream("input1_sql", input1.outputPort, sqlOper.in1);
-    dag.addStream("input2_sql", input2.outputPort, sqlOper.in2);
-
-    dag.addStream("result_console", sqlOper.result, consoleOperator.input);
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/StockTickInput.java
----------------------------------------------------------------------
diff --git a/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/StockTickInput.java b/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/StockTickInput.java
deleted file mode 100644
index 01e3ce9..0000000
--- a/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/StockTickInput.java
+++ /dev/null
@@ -1,192 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.demos.yahoofinance;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-
-import javax.validation.constraints.NotNull;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.commons.httpclient.HttpClient;
-import org.apache.commons.httpclient.HttpStatus;
-import org.apache.commons.httpclient.cookie.CookiePolicy;
-import org.apache.commons.httpclient.methods.GetMethod;
-import org.apache.commons.httpclient.params.DefaultHttpParams;
-import org.apache.hadoop.util.StringUtils;
-
-import com.datatorrent.api.Context.OperatorContext;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.InputOperator;
-import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
-import com.datatorrent.lib.util.KeyValPair;
-
-import au.com.bytecode.opencsv.CSVReader;
-
-/**
- * This operator sends price, volume and time into separate ports and calculates incremental volume.
- *
- * @since 0.3.2
- */
-public class StockTickInput implements InputOperator
-{
-  private static final Logger logger = LoggerFactory.getLogger(StockTickInput.class);
-  /**
-   * Timeout interval for reading from server. 0 or negative indicates no timeout.
-   */
-  public int readIntervalMillis = 500;
-  /**
-   * The URL of the web service resource for the POST request.
-   */
-  private String url;
-  private String[] symbols;
-  @NotNull
-  private String tickers;
-  private transient HttpClient client;
-  private transient GetMethod method;
-  private HashMap<String, Long> lastVolume = new HashMap<String, Long>();
-  private boolean outputEvenIfZeroVolume = false;
-  /**
-   * The output port to emit price.
-   */
-  @OutputPortFieldAnnotation(optional = true)
-  public final transient DefaultOutputPort<KeyValPair<String, Double>> price = new DefaultOutputPort<KeyValPair<String, Double>>();
-  /**
-   * The output port to emit incremental volume.
-   */
-  @OutputPortFieldAnnotation(optional = true)
-  public final transient DefaultOutputPort<KeyValPair<String, Long>> volume = new DefaultOutputPort<KeyValPair<String, Long>>();
-  /**
-   * The output port to emit last traded time.
-   */
-  @OutputPortFieldAnnotation(optional = true)
-  public final transient DefaultOutputPort<KeyValPair<String, String>> time = new DefaultOutputPort<KeyValPair<String, String>>();
-
-  /**
-   * Prepare URL from symbols and parameters. URL will be something like: http://download.finance.yahoo.com/d/quotes.csv?s=IBM,GOOG,AAPL,YHOO&f=sl1vt1
-   *
-   * @return the URL
-   */
-  private String prepareURL()
-  {
-    String str = "http://download.finance.yahoo.com/d/quotes.csv?s=";
-    for (int i = 0; i < symbols.length; i++) {
-      if (i != 0) {
-        str += ",";
-      }
-      str += symbols[i];
-    }
-    str += "&f=sl1vt1&e=.csv";
-    return str;
-  }
-
-  @Override
-  public void setup(OperatorContext context)
-  {
-    url = prepareURL();
-    client = new HttpClient();
-    method = new GetMethod(url);
-    DefaultHttpParams.getDefaultParams().setParameter("http.protocol.cookie-policy", CookiePolicy.BROWSER_COMPATIBILITY);
-  }
-
-  @Override
-  public void teardown()
-  {
-  }
-
-  @Override
-  public void emitTuples()
-  {
-
-    try {
-      int statusCode = client.executeMethod(method);
-      if (statusCode != HttpStatus.SC_OK) {
-        logger.error("Method failed: " + method.getStatusLine());
-      } else {
-        InputStream istream = method.getResponseBodyAsStream();
-        // Process response
-        InputStreamReader isr = new InputStreamReader(istream);
-        CSVReader reader = new CSVReader(isr);
-        List<String[]> myEntries = reader.readAll();
-        for (String[] stringArr: myEntries) {
-          ArrayList<String> tuple = new ArrayList<String>(Arrays.asList(stringArr));
-          if (tuple.size() != 4) {
-            return;
-          }
-          // input csv is <Symbol>,<Price>,<Volume>,<Time>
-          String symbol = tuple.get(0);
-          double currentPrice = Double.valueOf(tuple.get(1));
-          long currentVolume = Long.valueOf(tuple.get(2));
-          String timeStamp = tuple.get(3);
-          long vol = currentVolume;
-          // Sends total volume in first tick, and incremental volume afterwards.
-          if (lastVolume.containsKey(symbol)) {
-            vol -= lastVolume.get(symbol);
-          }
-
-          if (vol > 0 || outputEvenIfZeroVolume) {
-            price.emit(new KeyValPair<String, Double>(symbol, currentPrice));
-            volume.emit(new KeyValPair<String, Long>(symbol, vol));
-            time.emit(new KeyValPair<String, String>(symbol, timeStamp));
-            lastVolume.put(symbol, currentVolume);
-          }
-        }
-      }
-      Thread.sleep(readIntervalMillis);
-    } catch (InterruptedException ex) {
-      logger.debug(ex.toString());
-    } catch (IOException ex) {
-      logger.debug(ex.toString());
-    }
-  }
-
-  @Override
-  public void beginWindow(long windowId)
-  {
-  }
-
-  @Override
-  public void endWindow()
-  {
-  }
-
-  public void setOutputEvenIfZeroVolume(boolean outputEvenIfZeroVolume)
-  {
-    this.outputEvenIfZeroVolume = outputEvenIfZeroVolume;
-  }
-
-  public void setTickers(String tickers)
-  {
-    this.tickers = tickers;
-    symbols = StringUtils.split(tickers, ',');
-  }
-
-  public String getTickers()
-  {
-    return tickers;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/YahooFinanceApplication.java
----------------------------------------------------------------------
diff --git a/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/YahooFinanceApplication.java b/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/YahooFinanceApplication.java
deleted file mode 100644
index a6aaece..0000000
--- a/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/YahooFinanceApplication.java
+++ /dev/null
@@ -1,365 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.demos.yahoofinance;
-
-import org.apache.hadoop.conf.Configuration;
-import com.datatorrent.api.Context.OperatorContext;
-import com.datatorrent.api.Context.PortContext;
-import com.datatorrent.api.DAG;
-import com.datatorrent.api.Operator.InputPort;
-import com.datatorrent.api.StreamingApplication;
-import com.datatorrent.api.annotation.ApplicationAnnotation;
-import com.datatorrent.lib.io.ConsoleOutputOperator;
-import com.datatorrent.lib.math.RangeKeyVal;
-import com.datatorrent.lib.math.SumKeyVal;
-import com.datatorrent.lib.multiwindow.SimpleMovingAverage;
-import com.datatorrent.lib.stream.ConsolidatorKeyVal;
-import com.datatorrent.lib.util.BaseKeyValueOperator.DefaultPartitionCodec;
-import com.datatorrent.lib.util.HighLow;
-
-/**
- * Yahoo! Finance Application Demo :<br>
- * Get Yahoo finance feed and calculate minute price range, minute volume,
- * simple moving average of 5 minutes. <br>
- * <br>
- * Functional Description : <br>
- * Application samples yahoo finance ticker every 200ms. All data points in one
- * second are streamed from input adapter. <br>
- * <br>
- *
- * Application calculates following Real Time Value(s):<br>
- * <ul>
- * <li>Quotes for IBM, Google, Apple, Yahoo stocks price/volume/time displayed
- * every second.</li>
- * <li>Charts for Stocks in terms for high/low price vs volume for last minute.</li>
- * <li>Simple moving average over last 5 minutes for IBM, Google, Apple, Yahoo
- * stocks.</li>
- * </ul>
- * <br>
- * <br>
- *
- * Custom Attribute : <br>
- * <ul>
- * <li>Application streaming window size(STREAMING_WINDOW_SIZE_MILLIS) = 1 sec,
- * since we are only interested in quotes every second.</li>
- * <li>Range/Minute Volume operator's window size(APPLICATION_WINDOW_COUNT) =
- * 60, aggregate over one minute.</li>
- * <li>Sum operator window length : 300, sliding average over last 5 minutes.</li>
- * </ul>
- * <br>
- *
- * Input Adapter : <br>
- * Stock Tick input operator get yahoo finance real time stock quotes data and
- * pushes application. <br>
- * <br>
- *
- * Output Adapter : <br>
- * Output values are written to console through ConsoleOutputOerator<br>
- * if you need to change write to HDFS,HTTP .. instead of console, <br>
- * Please refer to {@link com.datatorrent.lib.io.HttpOutputOperator} or
- * {@link com.datatorrent.lib.io.fs.HdfsOutputOperator}. <br>
- * <br>
- *
- * Run Sample Application : <br>
- * <p>
- * Running Java Test or Main app in IDE:
- *
- * <pre>
- * LocalMode.runApp(new Application(), 600000); // 10 min run
- * </pre>
- *
- * Run Success : <br>
- * For successful deployment and run, user should see following output on
- * console:
- *
- * <pre>
- *  Price SMA: AAPL=435.965
- * Price SMA: GOOG=877.0
- * QUOTE: {YHOO=[26.37, 9760360, 4:00pm, null, null], IBM=[203.77, 2899698, 4:00pm, null, null], GOOG=[877.0, 2069614, 4:00pm, null, null], AAPL=[435.965, 10208099, 4:00pm, null, null]}
- * Price SMA: YHOO=26.37
- * </pre>
- *
- * Scaling Options : <br>
- * <ul>
- * <li>Volume operator can be replicated using a {@link StatelessPartitioner}
- * on an operator.</li>
- * <li>Range value operator can replicated but using proper unifier
- * operator(read App Dev Guide).</li>
- * <li>Slinging window operator can be replicated with proper unifier operator.</li>
- * </ul>
- * <br>
- *
- * Application DAG : <br>
- * <img src="doc-files/Application.gif" width=600px > <br>
- * <br>
- *
- * Streaming Window Size : 1000 ms(1 Sec) <br>
- * Operator Details : <br>
- * <ul>
- * <li>
- * <p>
- * <b>The operator DailyVolume:</b> This operator reads from the input port,
- * which contains the incremental volume tuples from StockTickInput, and
- * aggregates the data to provide the cumulative volume. It just utilizes the
- * library class SumKeyVal<K,V> provided in math package. In this case,
- * SumKeyVal<String,Long>, where K is the stock symbol, V is the aggregated
- * volume, with cumulative set to true. (Otherwise if cumulative was set to
- * false, SumKeyVal would provide the sum for the application window.) The platform
- * provides a number of built-in operators for simple operations like this so
- * that application developers do not have to write them. More examples to
- * follow. This operator assumes that the application restarts before market
- * opens every day.
- * </p>
- * Class : {@link com.datatorrent.lib.math.SumKeyVal} <br>
- * Operator Application Window Count : 1 <br>
- * StateFull : Yes, volume gets aggregated every window count.</li>
- *
- * <li>
- * <p>
- * <b>The operator MinuteVolume:</b> This operator reads from the input port,
- * which contains the volume tuples from StockTickInput, and aggregates the data
- * to provide the sum of the volume within one minute. Like the operator
- * DailyVolume, this operator is also SumKeyVal<String,Long>, but with
- * cumulative set to false. Application Window is set to 1 minute. We will
- * explain how to set this later. <br>
- * Class : {@link com.datatorrent.lib.math.SumKeyVal} <br>
- * Operator App Window Count : 60 (1 Minute) <br>
- * StateFull : Yes, aggregate over last 60 windows.</li>
- *
- * <li>
- * <p>
- * <b>The operator Quote:</b> This operator has three input ports, which are
- * price (from StockTickInput), daily_vol (from Daily Volume), and time (from
- * StockTickInput). This operator just consolidates the three data and and emits
- * the consolidated data. It utilizes the class ConsolidatorKeyVal<K> from
- * stream package.<br>
- * Class : {@link com.datatorrent.lib.stream.ConsolidatorKeyVal} <br>
- * Operator App Window Count : 1 <br>
- * StateFull : No</li>
- *
- * <li>
- * <p>
- * <b>The operator Chart:</b> This operator is very similar to the operator
- * Quote, except that it takes inputs from High Low and Minute Vol and outputs
- * the consolidated tuples to the output port. <br>
- * Class : {@link com.datatorrent.lib.stream.ConsolidatorKeyVal} <br>
- * StateFull : No<br>
- * Operator App Window Count : 1</li>
- *
- *
- * <li>
- * <p>
- * <b>The operator PriceSMA:</b> SMA stands for - Simple Moving Average. It
- * reads from the input port, which contains the price tuples from
- * StockTickInput, and provides the moving average price of the stock. It
- * utilizes SimpleMovingAverage<String,Double>, which is provided in multiwindow
- * package. SimpleMovingAverage keeps track of the data of the previous N
- * application windows in a sliding manner. For each end window event, it
- * provides the average of the data in those application windows. <br>
- * Class : {@link com.datatorrent.lib.multiwindow.SimpleMovingAverage} <br>
- * StateFull : Yes, stores values across application window. <br>
- * Operator App Window : 1 <br>
- * Operator Sliding Window : 300 (5 mins).</li>
- *
- * <li>
- * <p>
- * <b>The operator Console: </b> This operator just outputs the input tuples to
- * the console (or stdout). In this example, there are four console operators,
- * which connect to the output of Quote, Chart, PriceSMA and VolumeSMA. In
- * practice, they should be replaced by operators which would do something about
- * the data, like drawing charts. </li>
- *
- * </ul>
- * <br>
- *
- * @since 0.3.2
- */
-@ApplicationAnnotation(name = "YahooFinanceDemo")
-public class YahooFinanceApplication implements StreamingApplication
-{
-  protected int streamingWindowSizeMilliSeconds = 1000; // 1 second
-  protected int appWindowCountMinute = 60;   // 1 minute
-  protected int appWindowCountSMA = 300;  // 5 minute
-  //protected String[] tickers = {"IBM", "GOOG", "AAPL", "YHOO"};
-
-  /**
-   * Instantiate stock input operator for actual Yahoo finance ticks of symbol, last price, total daily volume, and last traded price.
-   * @param name  Operator name
-   * @param dag   Application DAG graph.
-   * @return StockTickInput instance.
-   */
-  public StockTickInput getStockTickInputOperator(String name, DAG dag)
-  {
-    StockTickInput oper = dag.addOperator(name, StockTickInput.class);
-    oper.readIntervalMillis = 200;
-    //oper.symbols = tickers;
-    return oper;
-  }
-
-  /**
-   * Instantiate {@link com.datatorrent.lib.math.SumKeyVal} operator
-   * to sends total daily volume by adding volumes from each ticks.
-   * @param name  Operator name
-   * @param dag   Application DAG graph.
-   * @return SumKeyVal instance.
-   */
-  public SumKeyVal<String, Long> getDailyVolumeOperator(String name, DAG dag)
-  {
-    SumKeyVal<String, Long> oper = dag.addOperator(name, new SumKeyVal<String, Long>());
-    oper.setType(Long.class);
-    oper.setCumulative(true);
-    return oper;
-  }
-
-  /**
-   * Instantiate {@link com.datatorrent.lib.math.SumKeyVal} operator
-   * Get aggregated volume of 1 minute and send at the end window of 1 minute.
-   * @param name  Operator name
-   * @param dag   Application DAG graph.
-   * @param appWindowCount Operator aggregate window size.
-   * @return SumKeyVal instance.
-   */
-  public SumKeyVal<String, Long> getMinuteVolumeOperator(String name, DAG dag, int appWindowCount)
-  {
-    SumKeyVal<String, Long> oper = dag.addOperator(name, new SumKeyVal<String, Long>());
-    oper.setType(Long.class);
-    dag.getOperatorMeta(name).getAttributes().put(OperatorContext.APPLICATION_WINDOW_COUNT, appWindowCount);
-    return oper;
-  }
-
-  /**
-   * Instantiate {@link com.datatorrent.lib.math.RangeKeyVal} operator to get high/low
-   * value for each key within given application window.
-   * Get High-low range for 1 minute.
-   * @param name  Operator name
-   * @param dag   Application DAG graph.
-   * @param appWindowCount Operator aggregate window size.
-   * @return RangeKeyVal instance.
-   */
-  public RangeKeyVal<String, Double> getHighLowOperator(String name, DAG dag, int appWindowCount)
-  {
-    RangeKeyVal<String, Double> oper = dag.addOperator(name, new RangeKeyVal<String, Double>());
-    dag.getOperatorMeta(name).getAttributes().put(OperatorContext.APPLICATION_WINDOW_COUNT, appWindowCount);
-    oper.setType(Double.class);
-    return oper;
-  }
-
-  /**
-   * Instantiate {@link com.datatorrent.lib.stream.ConsolidatorKeyVal} to send
-   * Quote (Merge price, daily volume, time)
-   * @param name  Operator name
-   * @param dag   Application DAG graph.
-   * @return ConsolidatorKeyVal instance.
-   */
-  public ConsolidatorKeyVal<String,Double,Long,String,?,?> getQuoteOperator(String name, DAG dag)
-  {
-    ConsolidatorKeyVal<String,Double,Long,String,?,?> oper = dag.addOperator(name, new ConsolidatorKeyVal<String,Double,Long,String,Object,Object>());
-    return oper;
-  }
-
-  /**
-   * Instantiate {@link com.datatorrent.lib.stream.ConsolidatorKeyVal} to send
-   * Chart (Merge minute volume and minute high-low)
-   * @param name  Operator name
-   * @param dag   Application DAG graph.
-   * @return ConsolidatorKeyVal instance.
-   */
-  public ConsolidatorKeyVal<String,HighLow<Double>,Long,?,?,?> getChartOperator(String name, DAG dag)
-  {
-    ConsolidatorKeyVal<String,HighLow<Double>,Long,?,?,?> oper = dag.addOperator(name, new ConsolidatorKeyVal<String,HighLow<Double>,Long,Object,Object,Object>());
-    return oper;
-  }
-
-  /**
-   * Instantiate {@link com.datatorrent.lib.multiwindow.SimpleMovingAverage} to calculate moving average for price
-   * over given window size. Sliding window size is 1.
-   * @param name  Operator name
-   * @param dag   Application DAG graph.
-   * @param appWindowCount Operator aggregate window size.
-   * @return SimpleMovingAverage instance.
-   */
-  public SimpleMovingAverage<String, Double> getPriceSimpleMovingAverageOperator(String name, DAG dag, int appWindowCount)
-  {
-    SimpleMovingAverage<String, Double> oper = dag.addOperator(name, new SimpleMovingAverage<String, Double>());
-    oper.setWindowSize(appWindowCount);
-    oper.setType(Double.class);
-    return oper;
-  }
-
-  /**
-   * Get console for output operator.
-   * @param name  Operator name
-   * @param dag   Application DAG graph.
-   * @return input port for console output.
-   */
-  public InputPort<Object> getConsole(String name, /*String nodeName,*/ DAG dag, String prefix)
-  {
-    // hack to output to HTTP based on actual environment
-        /*
-     String serverAddr = System.getenv("MALHAR_AJAXSERVER_ADDRESS");
-     if (serverAddr != null) {
-     HttpOutputOperator<Object> oper = dag.addOperator(name, new HttpOutputOperator<Object>());
-     oper.setResourceURL(URI.create("http://" + serverAddr + "/channel/" + nodeName));
-     return oper.input;
-     }
-     */
-
-    ConsoleOutputOperator oper = dag.addOperator(name, ConsoleOutputOperator.class);
-    oper.setStringFormat(prefix + ": %s");
-    return oper.input;
-  }
-
-  /**
-   * Populate Yahoo Finance Demo Application DAG.
-   */
-  @SuppressWarnings("unchecked")
-  @Override
-  public void populateDAG(DAG dag, Configuration conf)
-  {
-
-    dag.getAttributes().put(DAG.STREAMING_WINDOW_SIZE_MILLIS, streamingWindowSizeMilliSeconds);
-
-    StockTickInput tick = getStockTickInputOperator("StockTickInput", dag);
-    SumKeyVal<String, Long> dailyVolume = getDailyVolumeOperator("DailyVolume", dag);
-    ConsolidatorKeyVal<String,Double,Long,String,?,?> quoteOperator = getQuoteOperator("Quote", dag);
-
-    RangeKeyVal<String, Double> highlow = getHighLowOperator("HighLow", dag, appWindowCountMinute);
-    SumKeyVal<String, Long> minuteVolume = getMinuteVolumeOperator("MinuteVolume", dag, appWindowCountMinute);
-    ConsolidatorKeyVal<String,HighLow<Double>,Long,?,?,?> chartOperator = getChartOperator("Chart", dag);
-
-    SimpleMovingAverage<String, Double> priceSMA = getPriceSimpleMovingAverageOperator("PriceSMA", dag, appWindowCountSMA);
-    DefaultPartitionCodec<String, Double> codec = new DefaultPartitionCodec<String, Double>();
-    dag.setInputPortAttribute(highlow.data, PortContext.STREAM_CODEC, codec);
-    dag.setInputPortAttribute(priceSMA.data, PortContext.STREAM_CODEC, codec);
-    dag.addStream("price", tick.price, quoteOperator.in1, highlow.data, priceSMA.data);
-    dag.addStream("vol", tick.volume, dailyVolume.data, minuteVolume.data);
-    dag.addStream("time", tick.time, quoteOperator.in3);
-    dag.addStream("daily_vol", dailyVolume.sum, quoteOperator.in2);
-
-    dag.addStream("quote_data", quoteOperator.out, getConsole("quoteConsole", dag, "QUOTE"));
-
-    dag.addStream("high_low", highlow.range, chartOperator.in1);
-    dag.addStream("vol_1min", minuteVolume.sum, chartOperator.in2);
-    dag.addStream("chart_data", chartOperator.out, getConsole("chartConsole", dag, "CHART"));
-
-    dag.addStream("sma_price", priceSMA.doubleSMA, getConsole("priceSMAConsole", dag, "Price SMA"));
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/YahooFinanceCSVInputOperator.java
----------------------------------------------------------------------
diff --git a/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/YahooFinanceCSVInputOperator.java b/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/YahooFinanceCSVInputOperator.java
deleted file mode 100644
index cf3801e..0000000
--- a/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/YahooFinanceCSVInputOperator.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.demos.yahoofinance;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.commons.httpclient.HttpClient;
-import org.apache.commons.httpclient.HttpStatus;
-import org.apache.commons.httpclient.cookie.CookiePolicy;
-import org.apache.commons.httpclient.methods.GetMethod;
-import org.apache.commons.httpclient.params.DefaultHttpParams;
-
-import com.datatorrent.api.Context.OperatorContext;
-import com.datatorrent.lib.io.SimpleSinglePortInputOperator;
-
-import au.com.bytecode.opencsv.CSVReader;
-
-/**
- * Grabs Yahoo Finance quotes data and emits HashMap, with key equals the format name (e.g. "s0") <p>
- *
- * @since 0.3.2
- */
-public class YahooFinanceCSVInputOperator extends SimpleSinglePortInputOperator<HashMap<String, Object>> implements Runnable
-{
-  private static final Logger logger = LoggerFactory.getLogger(YahooFinanceCSVInputOperator.class);
-  /**
-   * Timeout interval for reading from server. 0 or negative indicates no timeout.
-   */
-  private int readIntervalMillis = 500;
-
-  /**
-   * The URL of the web service resource for the POST request.
-   */
-  private String url;
-  private transient HttpClient client;
-  private transient GetMethod method;
-
-  private ArrayList<String> symbolList = new ArrayList<String>();
-  private ArrayList<String> parameterList = new ArrayList<String>();
-
-  public void addSymbol(String symbol)
-  {
-    symbolList.add(symbol);
-  }
-
-  public void addFormat(String format)
-  {
-    parameterList.add(format);
-  }
-
-  public ArrayList<String> getSymbolList()
-  {
-    return symbolList;
-  }
-
-  public ArrayList<String> getParameterList()
-  {
-    return parameterList;
-  }
-
-  public int getReadIntervalMillis()
-  {
-    return readIntervalMillis;
-  }
-
-  public void setReadIntervalMillis(int readIntervalMillis)
-  {
-    this.readIntervalMillis = readIntervalMillis;
-  }
-
-  /**
-   * Prepare URL from symbols and parameters.
-   * URL will be something like: http://download.finance.yahoo.com/d/quotes.csv?s=GOOG,FB,YHOO&f=sl1vt1&e=.csv
-   * @return
-   */
-  private String prepareURL()
-  {
-    String str = "http://download.finance.yahoo.com/d/quotes.csv?";
-
-    str += "s=";
-    for (int i = 0; i < symbolList.size(); i++) {
-      if (i != 0) {
-        str += ",";
-      }
-      str += symbolList.get(i);
-    }
-    str += "&f=";
-    for (String format: parameterList) {
-      str += format;
-    }
-    str += "&e=.csv";
-    return str;
-  }
-
-  @Override
-  public void setup(OperatorContext context)
-  {
-    url = prepareURL();
-    client = new HttpClient();
-    method = new GetMethod(url);
-    DefaultHttpParams.getDefaultParams().setParameter("http.protocol.cookie-policy", CookiePolicy.BROWSER_COMPATIBILITY);
-  }
-
-  @Override
-  public void run()
-  {
-    while (true) {
-      try {
-        int statusCode = client.executeMethod(method);
-        if (statusCode != HttpStatus.SC_OK) {
-          logger.error("Method failed: " + method.getStatusLine());
-        } else {
-          InputStream istream;
-          istream = method.getResponseBodyAsStream();
-          // Process response
-          InputStreamReader isr = new InputStreamReader(istream);
-          CSVReader reader = new CSVReader(isr);
-          List<String[]> myEntries;
-          myEntries = reader.readAll();
-          for (String[] stringArr: myEntries) {
-            HashMap<String,Object> hm = new HashMap<String,Object>();
-            for (int i = 0; i < parameterList.size(); i++) {
-              hm.put(parameterList.get(i), stringArr[i]);
-            }
-            outputPort.emit(hm); // send out one symbol at a time
-          }
-        }
-        Thread.sleep(readIntervalMillis);
-      } catch (InterruptedException ex) {
-        logger.debug(ex.toString());
-      } catch (IOException ex) {
-        logger.debug(ex.toString());
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/doc-files/Application.gif
----------------------------------------------------------------------
diff --git a/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/doc-files/Application.gif b/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/doc-files/Application.gif
deleted file mode 100644
index 8fc8331..0000000
Binary files a/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/doc-files/Application.gif and /dev/null differ

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/package-info.java
----------------------------------------------------------------------
diff --git a/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/package-info.java b/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/package-info.java
deleted file mode 100644
index 992bc30..0000000
--- a/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-/**
- * Yahoo Finance demonstration applications.
- */
-package com.datatorrent.demos.yahoofinance;