You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2017/03/07 06:58:21 UTC
[16/30] apex-malhar git commit: Renamed demos to examples. Packages
and artifactid names are changed as suggested.
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/FileWordCount.java
----------------------------------------------------------------------
diff --git a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/FileWordCount.java b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/FileWordCount.java
deleted file mode 100644
index e8a91b2..0000000
--- a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/FileWordCount.java
+++ /dev/null
@@ -1,349 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.demos.wordcount;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.datatorrent.api.Context.OperatorContext;
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.annotation.InputPortFieldAnnotation;
-import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
-import com.datatorrent.common.util.BaseOperator;
-
-/**
- * Computes word frequencies per file and globally, and writes the top N pairs to an output file
- * and to snapshot servers for visualization.
- * Currently designed to work with only 1 file at a time; will be enhanced later to support
- * multiple files dropped into the monitored directory at the same time.
- *
- * <p>
- * Receives per-window list of pairs (word, frequency) on the input port. When the end of a file
- * is reached, expects to get an EOF on the control port; at the next endWindow, the top N words
- * and frequencies are computed and emitted to the output ports.
- * <p>
- * There are 3 output ports: (a) One for the per-file top N counts emitted when the file is fully
- * read and is written to the output file. (b) One for the top N counts emitted per window for the
- * current file to the snapshot server and (c) One for the global top N counts emitted per window
- * to a different snapshot server.
- *
- * Since the EOF is received by a single operator, this operator cannot be partitionable
- *
- * @since 3.2.0
- */
-public class FileWordCount extends BaseOperator
-{
- private static final Logger LOG = LoggerFactory.getLogger(FileWordCount.class);
- private static final String GLOBAL = "global";
-
- /**
- * If {@literal topN > 0}, only data for the topN most frequent words is output; if topN == 0, the
- * entire frequency map is output
- */
- protected int topN;
-
- /**
- * Set to true when an EOF control tuple for the current input file is received; reset to false
- * when the corresponding output file has been written.
- */
- protected boolean eof = false;
-
- /**
- * Last component of path (just the file name)
- * incoming value from control tuple
- */
- protected String fileName;
-
- /**
- * {@literal (word => frequency)} map: current file, all words
- */
- protected Map<String, WCPair> wordMapFile = new HashMap<>();
-
- /**
- * {@literal (word => frequency)} map: global, all words
- */
- protected Map<String, WCPair> wordMapGlobal = new HashMap<>();
-
- /**
- * Singleton list with per file data; sent on {@code outputPerFile}
- */
- protected transient List<Map<String, Object>> resultPerFile;
-
- /**
- * Singleton list with global data; sent on {@code outputGlobal}
- */
- protected transient List<Map<String, Object>> resultGlobal;
-
- /**
- * Singleton map of {@code fileName} to sorted list of (word, frequency) pairs
- */
- protected transient Map<String, Object> resultFileFinal;
-
- /**
- * final list of (word, frequency) pairs written to output file
- */
- protected transient List<WCPair> fileFinalList;
-
- /**
- * Input port on which per-window {@literal (word => frequency)} map is received; the map
- * is merged into {@code wordMapFile} and {@code wordMapGlobal}.
- */
- public final transient DefaultInputPort<List<WCPair>> input = new DefaultInputPort<List<WCPair>>()
- {
- @Override
- public void process(List<WCPair> list)
- {
- // blend incoming list into wordMapFile and wordMapGlobal
- for (WCPair pair : list) {
- final String word = pair.word;
- WCPair filePair = wordMapFile.get(word);
- if (null != filePair) { // word seen previously in current file
- WCPair globalPair = wordMapGlobal.get(word); // cannot be null
- filePair.freq += pair.freq;
- globalPair.freq += pair.freq;
- continue;
- }
-
- // new word in current file
- filePair = new WCPair(word, pair.freq);
- wordMapFile.put(word, filePair);
-
- // check global map
- WCPair globalPair = wordMapGlobal.get(word); // may be null
- if (null != globalPair) { // word seen previously
- globalPair.freq += pair.freq;
- continue;
- }
-
- // word never seen before
- globalPair = new WCPair(word, pair.freq);
- wordMapGlobal.put(word, globalPair);
- }
- }
- };
-
- /**
- * Control port on which the current file name is received to indicate EOF
- */
- @InputPortFieldAnnotation(optional = true)
- public final transient DefaultInputPort<String> control = new DefaultInputPort<String>()
- {
- @Override
- public void process(String msg)
- {
- if (msg.isEmpty()) { // sanity check
- throw new RuntimeException("Empty file path");
- }
- LOG.info("FileWordCount: EOF for {}, topN = {}", msg, topN);
- fileName = msg;
- eof = true;
- // NOTE: current version only supports processing one file at a time.
- }
- };
-
- /**
- * Output port for current file output
- */
- public final transient DefaultOutputPort<List<Map<String, Object>>>
- outputPerFile = new DefaultOutputPort<>();
-
- /**
- * Output port for global output
- */
- @OutputPortFieldAnnotation(optional = true)
- public final transient DefaultOutputPort<List<Map<String, Object>>>
- outputGlobal = new DefaultOutputPort<>();
-
- /**
- * Tuple is singleton map {@code fileName => TopNMap} where {@code TopNMap} is the final
- * top N pairs for current file and will be written to the output file; emitted in the
- * {@code endWindow()} call after an EOF
- */
- public final transient DefaultOutputPort<Map<String, Object>>
- fileOutput = new DefaultOutputPort<>();
-
- /**
- * Get the number of top (word, frequency) pairs that will be output
- */
- public int getTopN()
- {
- return topN;
- }
-
- /**
- * Set the number of top (word, frequency) pairs that will be output
- * @param n The new number
- */
- public void setTopN(int n)
- {
- topN = n;
- }
-
- /**
- * {@inheritDoc}
- * Initialize various map and list fields
- */
- @Override
- public void setup(OperatorContext context)
- {
- if (null == wordMapFile) {
- wordMapFile = new HashMap<>();
- }
- if (null == wordMapGlobal) {
- wordMapGlobal = new HashMap<>();
- }
- resultPerFile = new ArrayList(1);
- resultGlobal = new ArrayList(1);
- // singleton map {<fileName> => fileFinalList}; cannot populate it yet since we need fileName
- resultFileFinal = new HashMap<>(1);
- fileFinalList = new ArrayList<>();
- }
-
- /**
- * {@inheritDoc}
- * This is where we do most of the work:
- * 1. Sort global map and emit top N pairs
- * 2. Sort current file map and emit top N pairs
- * 3. If we've seen EOF, emit top N pairs on port connected to file writer and clear all per-file
- * data structures.
- */
- @Override
- public void endWindow()
- {
- LOG.info("FileWordCount: endWindow for {}, topN = {}", fileName, topN);
-
- if (wordMapFile.isEmpty()) { // no words found
- if (eof) { // write empty list to fileOutput port
- // got EOF, so output empty list to output file
- fileFinalList.clear();
- resultFileFinal.put(fileName, fileFinalList);
- fileOutput.emit(resultFileFinal);
-
- // reset for next file
- eof = false;
- fileName = null;
- resultFileFinal.clear();
- }
- LOG.info("FileWordCount: endWindow for {}, no words, topN = {}", fileName, topN);
- return;
- }
-
- LOG.info("FileWordCount: endWindow for {}, wordMapFile.size = {}, topN = {}", fileName, wordMapFile.size(), topN);
-
- // get topN list for this file and, if we have EOF, emit to fileOutput port
-
- // get topN global list and emit to global output port
- getTopNMap(wordMapGlobal, resultGlobal);
- LOG.info("FileWordCount: resultGlobal.size = {}", resultGlobal.size());
- outputGlobal.emit(resultGlobal);
-
- // get topN list for this file and emit to file output port
- getTopNMap(wordMapFile, resultPerFile);
- LOG.info("FileWordCount: resultPerFile.size = {}", resultPerFile.size());
- outputPerFile.emit(resultPerFile);
-
- if (eof) { // got EOF earlier
- if (null == fileName) { // need file name to emit topN pairs to file writer
- throw new RuntimeException("EOF but no fileName at endWindow");
- }
-
- // so compute final topN list from wordMapFile into fileFinalList and emit it
- getTopNList(wordMapFile);
- resultFileFinal.put(fileName, fileFinalList);
- fileOutput.emit(resultFileFinal);
-
- // reset for next file
- eof = false;
- fileName = null;
- wordMapFile.clear();
- resultFileFinal.clear();
- }
- }
-
- /**
- * Get topN frequencies from map, convert each pair to a singleton map and append to result
- * This map is suitable input to AppDataSnapshotServer
- * MUST have {@code map.size() > 0} here
- */
- private void getTopNMap(final Map<String, WCPair> map, List<Map<String, Object>> result)
- {
- final ArrayList<WCPair> list = new ArrayList<>(map.values());
-
- // sort entries in descending order of frequency
- Collections.sort(list, new Comparator<WCPair>()
- {
- @Override
- public int compare(WCPair o1, WCPair o2)
- {
- return (int)(o2.freq - o1.freq);
- }
- });
-
- if (topN > 0) {
- list.subList(topN, map.size()).clear(); // retain only the first topN entries
- }
-
- // convert each pair (word, freq) of list to a map with 2 elements
- // {("word": <word>, "count": freq)} and append to list
- //
- result.clear();
- for (WCPair pair : list) {
- Map<String, Object> wmap = new HashMap<>(2);
- wmap.put("word", pair.word);
- wmap.put("count", pair.freq);
- result.add(wmap);
- }
- LOG.info("FileWordCount:getTopNMap: result.size = {}", result.size());
- list.clear();
- }
-
- /**
- * Populate fileFinalList with topN frequencies from argument
- * This list is suitable input to WordCountWriter which writes it to a file
- * MUST have {@code map.size() > 0} here
- */
- private void getTopNList(final Map<String, WCPair> map)
- {
- fileFinalList.clear();
- fileFinalList.addAll(map.values());
-
- // sort entries in descending order of frequency
- Collections.sort(fileFinalList, new Comparator<WCPair>()
- {
- @Override
- public int compare(WCPair o1, WCPair o2)
- {
- return (int)(o2.freq - o1.freq);
- }
- });
-
- if (topN > 0) {
- fileFinalList.subList(topN, map.size()).clear(); // retain only the first topN entries
- }
- LOG.info("FileWordCount:getTopNList: fileFinalList.size = {}", fileFinalList.size());
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/LineReader.java
----------------------------------------------------------------------
diff --git a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/LineReader.java b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/LineReader.java
deleted file mode 100644
index 8a1a57b..0000000
--- a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/LineReader.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.demos.wordcount;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.hadoop.fs.Path;
-
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
-import com.datatorrent.lib.io.fs.AbstractFileInputOperator;
-
-/**
- * Reads lines from input file and returns them. If EOF is reached, a control tuple
- * is emitted on the control port
- *
- * @since 3.2.0
- */
-public class LineReader extends AbstractFileInputOperator<String>
-{
- private static final Logger LOG = LoggerFactory.getLogger(LineReader.class);
-
- /**
- * Output port on which lines from current file name are emitted
- */
- public final transient DefaultOutputPort<String> output = new DefaultOutputPort<>();
-
- /**
- * Control port on which the current file name is emitted to indicate EOF
- */
- @OutputPortFieldAnnotation(optional = true)
- public final transient DefaultOutputPort<String> control = new DefaultOutputPort<>();
-
- private transient BufferedReader br = null;
-
- private Path path;
-
- /**
- * File open callback; wrap the file input stream in a buffered reader for reading lines
- * @param curPath The path to the file just opened
- */
- @Override
- protected InputStream openFile(Path curPath) throws IOException
- {
- LOG.info("openFile: curPath = {}", curPath);
- path = curPath;
- InputStream is = super.openFile(path);
- br = new BufferedReader(new InputStreamReader(is));
- return is;
- }
-
- /**
- * File close callback; close buffered reader
- * @param is File input stream that will imminently be closed
- */
- @Override
- protected void closeFile(InputStream is) throws IOException
- {
- super.closeFile(is);
- br.close();
- br = null;
- path = null;
- }
-
- /**
- * {@inheritDoc}
- * If we hit EOF, emit file name on control port
- */
- @Override
- protected String readEntity() throws IOException
- {
- // try to read a line
- final String line = br.readLine();
- if (null != line) { // common case
- LOG.debug("readEntity: line = {}", line);
- return line;
- }
-
- // end-of-file; send control tuple, containing only the last component of the path
- // (only file name) on control port
- //
- if (control.isConnected()) {
- LOG.info("readEntity: EOF for {}", path);
- final String name = path.getName(); // final component of path
- control.emit(name);
- }
-
- return null;
- }
-
- @Override
- protected void emit(String tuple)
- {
- output.emit(tuple);
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WCPair.java
----------------------------------------------------------------------
diff --git a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WCPair.java b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WCPair.java
deleted file mode 100644
index bb67622..0000000
--- a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WCPair.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.demos.wordcount;
-
-/**
- * A single (word, frequency) pair
- *
- * @since 3.2.0
- */
-public class WCPair
-{
- /**
- * The word
- */
- public String word;
-
- /**
- * The frequency
- */
- public int freq;
-
- /**
- * Default constructor
- */
- public WCPair()
- {
-
- }
-
- /**
- * Create new object with given values
- * @param w The word
- * @param f The frequency
- */
- public WCPair(String w, int f)
- {
- word = w;
- freq = f;
- }
-
- @Override
- public String toString()
- {
- return String.format("(%s, %d)", word, freq);
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WindowWordCount.java
----------------------------------------------------------------------
diff --git a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WindowWordCount.java b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WindowWordCount.java
deleted file mode 100644
index 0edfd1e..0000000
--- a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WindowWordCount.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.demos.wordcount;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.common.util.BaseOperator;
-
-/**
- * Computes word frequencies per window and emits them at each {@code endWindow()}. The output is a
- * list of (word, frequency) pairs
- *
- * @since 3.2.0
- */
-public class WindowWordCount extends BaseOperator
-{
- private static final Logger LOG = LoggerFactory.getLogger(WindowWordCount.class);
-
- /** {@literal (word => frequency)} map for current window */
- protected Map<String, WCPair> wordMap = new HashMap<>();
-
- /**
- * Input port on which words are received
- */
- public final transient DefaultInputPort<String> input = new DefaultInputPort<String>()
- {
- @Override
- public void process(String word)
- {
- WCPair pair = wordMap.get(word);
- if (null != pair) { // word seen previously
- pair.freq += 1;
- return;
- }
-
- // new word
- pair = new WCPair();
- pair.word = word;
- pair.freq = 1;
- wordMap.put(word, pair);
- }
- };
-
- /**
- * Output port which emits the list of word frequencies for current window
- */
- public final transient DefaultOutputPort<List<WCPair>> output = new DefaultOutputPort<>();
-
- /**
- * {@inheritDoc}
- * If we've seen some words in this window, emit the map and clear it for next window
- */
- @Override
- public void endWindow()
- {
- LOG.info("WindowWordCount: endWindow");
-
- // got EOF; if no words found, do nothing
- if (wordMap.isEmpty()) {
- return;
- }
-
- // have some words; emit single map and reset for next file
- final ArrayList<WCPair> list = new ArrayList<>(wordMap.values());
- output.emit(list);
- list.clear();
- wordMap.clear();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WordCountInputOperator.java
----------------------------------------------------------------------
diff --git a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WordCountInputOperator.java b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WordCountInputOperator.java
deleted file mode 100644
index 3a88bab..0000000
--- a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WordCountInputOperator.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.demos.wordcount;
-
-import java.io.BufferedReader;
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.datatorrent.lib.io.SimpleSinglePortInputOperator;
-
-/**
- * <p>WordCountInputOperator class.</p>
- *
- * @since 0.3.2
- */
-public class WordCountInputOperator extends SimpleSinglePortInputOperator<String> implements Runnable
-{
-
- private static final Logger logger = LoggerFactory.getLogger(WordCountInputOperator.class);
- protected long averageSleep = 300;
- protected long sleepPlusMinus = 100;
- protected String fileName = "com/datatorrent/demos/wordcount/samplefile.txt";
-
- public void setAverageSleep(long as)
- {
- averageSleep = as;
- }
-
- public void setSleepPlusMinus(long spm)
- {
- sleepPlusMinus = spm;
- }
-
- public void setFileName(String fn)
- {
- fileName = fn;
- }
-
- @Override
- public void run()
- {
- BufferedReader br = null;
- DataInputStream in = null;
- InputStream fstream = null;
-
- while (true) {
- try {
- String line;
- fstream = this.getClass().getClassLoader().getResourceAsStream(fileName);
-
- in = new DataInputStream(fstream);
- br = new BufferedReader(new InputStreamReader(in));
-
- while ((line = br.readLine()) != null) {
- String[] words = line.trim().split("[\\p{Punct}\\s\\\"\\'\u201c\u201d]+");
- for (String word : words) {
- word = word.trim().toLowerCase();
- if (!word.isEmpty()) {
- outputPort.emit(word);
- }
- }
- try {
- Thread.sleep(averageSleep + (new Double(sleepPlusMinus * (Math.random() * 2 - 1))).longValue());
- } catch (InterruptedException ex) {
- // nothing
- }
- }
-
- } catch (IOException ex) {
- logger.debug(ex.toString());
- } finally {
- try {
- if (br != null) {
- br.close();
- }
- if (in != null) {
- in.close();
- }
- if (fstream != null) {
- fstream.close();
- }
- } catch (IOException exc) {
- // nothing
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WordCountWriter.java
----------------------------------------------------------------------
diff --git a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WordCountWriter.java b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WordCountWriter.java
deleted file mode 100644
index 30aab10..0000000
--- a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WordCountWriter.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.demos.wordcount;
-
-import java.io.UnsupportedEncodingException;
-
-import java.util.List;
-import java.util.Map;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.datatorrent.lib.io.fs.AbstractFileOutputOperator;
-
-/**
- * Write top N words and their frequencies to a file
- *
- * @since 3.2.0
- */
-public class WordCountWriter extends AbstractFileOutputOperator<Map<String, Object>>
-{
- private static final Logger LOG = LoggerFactory.getLogger(WordCountWriter.class);
- private static final String charsetName = "UTF-8";
- private static final String nl = System.lineSeparator();
-
- private String fileName; // current file name
- private final transient StringBuilder sb = new StringBuilder();
-
- /**
- * {@inheritDoc}
- * Invoke requestFinalize() to create the output file with the desired name without decorations.
- */
- @Override
- public void endWindow()
- {
- if (null != fileName) {
- requestFinalize(fileName);
- }
- super.endWindow();
- }
-
- /**
- * Extracts file name from argument
- * @param tuple Singleton map {@literal (fileName => L) where L is a list of (word, frequency) pairs}
- * @return the file name to write the tuple to
- */
- @Override
- protected String getFileName(Map<String, Object> tuple)
- {
- LOG.info("getFileName: tuple.size = {}", tuple.size());
-
- final Map.Entry<String, Object> entry = tuple.entrySet().iterator().next();
- fileName = entry.getKey();
- LOG.info("getFileName: fileName = {}", fileName);
- return fileName;
- }
-
- /**
- * Extracts output file content from argument
- * @param tuple Singleton map {@literal (fileName => L) where L is a list of (word, frequency) pairs}
- * @return input tuple converted to an array of bytes
- */
- @Override
- protected byte[] getBytesForTuple(Map<String, Object> tuple)
- {
- LOG.info("getBytesForTuple: tuple.size = {}", tuple.size());
-
- // get first and only pair; key is the fileName and is ignored here
- final Map.Entry<String, Object> entry = tuple.entrySet().iterator().next();
- final List<WCPair> list = (List<WCPair>)entry.getValue();
-
- if (sb.length() > 0) { // clear buffer
- sb.delete(0, sb.length());
- }
-
- for ( WCPair pair : list ) {
- sb.append(pair.word);
- sb.append(" : ");
- sb.append(pair.freq);
- sb.append(nl);
- }
-
- final String data = sb.toString();
- LOG.info("getBytesForTuple: data = {}", data);
- try {
- final byte[] result = data.getBytes(charsetName);
- return result;
- } catch (UnsupportedEncodingException ex) {
- throw new RuntimeException("Should never get here", ex);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WordReader.java
----------------------------------------------------------------------
diff --git a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WordReader.java b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WordReader.java
deleted file mode 100644
index 58c44b4..0000000
--- a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WordReader.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.demos.wordcount;
-
-import java.util.regex.Pattern;
-
-import com.datatorrent.api.Context.OperatorContext;
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.common.util.BaseOperator;
-
-/**
- * Extracts words from input line
- *
- * @since 3.3.0
- */
-public class WordReader extends BaseOperator
-{
- // default pattern for word-separators
- private static final Pattern nonWordDefault = Pattern.compile("[\\p{Punct}\\s]+");
-
- private String nonWordStr; // configurable regex
- private transient Pattern nonWord; // compiled regex
-
- /**
- * Output port on which words from the current file are emitted
- */
- public final transient DefaultOutputPort<String> output = new DefaultOutputPort<>();
-
- /**
- * Input port on which lines from the current file are received
- */
- public final transient DefaultInputPort<String> input = new DefaultInputPort<String>()
- {
-
- @Override
- public void process(String line)
- {
- // line; split it into words and emit them
- final String[] words = nonWord.split(line);
- for (String word : words) {
- if (word.isEmpty()) {
- continue;
- }
- output.emit(word);
- }
- }
- };
-
- /**
- * Returns the regular expression that matches strings between words
- * @return Regular expression for strings that separate words
- */
- public String getNonWordStr()
- {
- return nonWordStr;
- }
-
- /**
- * Sets the regular expression that matches strings between words
- * @param regex New regular expression for strings that separate words
- */
- public void setNonWordStr(String regex)
- {
- nonWordStr = regex;
- }
-
- /**
- * {@inheritDoc}
- * Set nonWord to the default pattern if necessary
- */
- @Override
- public void setup(OperatorContext context)
- {
- if (null == nonWordStr) {
- nonWord = nonWordDefault;
- } else {
- nonWord = Pattern.compile(nonWordStr);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/doc-files/UniqueWordCounter.jpg
----------------------------------------------------------------------
diff --git a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/doc-files/UniqueWordCounter.jpg b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/doc-files/UniqueWordCounter.jpg
deleted file mode 100644
index 054baed..0000000
Binary files a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/doc-files/UniqueWordCounter.jpg and /dev/null differ
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/package-info.java
----------------------------------------------------------------------
diff --git a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/package-info.java b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/package-info.java
deleted file mode 100644
index d00397d..0000000
--- a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-/**
- * Streaming word count demonstration application.
- */
-package com.datatorrent.demos.wordcount;
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/wordcount/src/main/resources/META-INF/properties.xml
----------------------------------------------------------------------
diff --git a/demos/wordcount/src/main/resources/META-INF/properties.xml b/demos/wordcount/src/main/resources/META-INF/properties.xml
deleted file mode 100644
index 1d3594e..0000000
--- a/demos/wordcount/src/main/resources/META-INF/properties.xml
+++ /dev/null
@@ -1,98 +0,0 @@
-<!--
-
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
-
--->
-<configuration>
- <!-- TopNWordsWithQueries -->
-
- <!-- for debugging -->
- <!--
- <property>
- <name>dt.attr.CONTAINER_JVM_OPTIONS</name>
- <value>-Dlog4j.configuration=my_log4j.properties</value>
- </property>
- -->
-
- <!-- monitored input directory -->
- <property>
- <name>dt.application.TopNWordsWithQueries.operator.lineReader.directory</name>
- <value>/tmp/test/input-dir</value>
- </property>
-
- <!-- regular expression for word separator -->
- <property>
- <name>dt.application.TopNWordsWithQueries.operator.wordReader.nonWordStr</name>
- <value>[\p{Punct}\s]+</value>
- </property>
-
- <!-- output directory for word counts -->
- <property>
- <name>dt.application.TopNWordsWithQueries.operator.wcWriter.filePath</name>
- <value>/tmp/test/output-dir</value>
- </property>
-
- <!-- Top N value -->
- <property>
- <name>dt.application.TopNWordsWithQueries.operator.fileWordCount.topN</name>
- <value>10</value>
- </property>
-
- <!-- topic for queries (current file) -->
- <property>
- <name>dt.application.TopNWordsWithQueries.operator.snapshotServerFile.embeddableQueryInfoProvider.topic</name>
- <value>TopNWordsQueryFile</value>
- </property>
-
- <!-- topic for query results (current file) -->
- <property>
- <name>dt.application.TopNWordsWithQueries.operator.wsResultFile.topic</name>
- <value>TopNWordsQueryFileResult</value>
- </property>
-
- <!-- topic for queries (global) -->
- <property>
- <name>dt.application.TopNWordsWithQueries.operator.snapshotServerGlobal.embeddableQueryInfoProvider.topic</name>
- <value>TopNWordsQueryGlobal</value>
- </property>
-
- <!-- topic for query results (global) -->
- <property>
- <name>dt.application.TopNWordsWithQueries.operator.wsResultGlobal.topic</name>
- <value>TopNWordsQueryGlobalResult</value>
- </property>
-
- <!-- retry count -->
- <property>
- <name>dt.application.TwitterDemo.operator.wsResult.numRetries</name>
- <value>2147483647</value>
- </property>
-
-
- <!-- WordCountDemo -->
- <property>
- <name>dt.application.WordCountDemo.operator.wordinput.fileName</name>
- <value>samplefile.txt</value>
- </property>
- <property>
- <name>dt.application.WordCountDemo.stream.wordinput.count.locality</name>
- <value>CONTAINER_LOCAL</value>
- <description>Specify container locality for the viewtuplecount stream
- </description>
- </property>
-</configuration>
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/wordcount/src/main/resources/WordDataSchema.json
----------------------------------------------------------------------
diff --git a/demos/wordcount/src/main/resources/WordDataSchema.json b/demos/wordcount/src/main/resources/WordDataSchema.json
deleted file mode 100644
index 5e8e7c0..0000000
--- a/demos/wordcount/src/main/resources/WordDataSchema.json
+++ /dev/null
@@ -1,4 +0,0 @@
-{
- "values": [{"name": "word", "type": "string"},
- {"name": "count", "type": "integer"}]
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/wordcount/src/main/resources/com/datatorrent/demos/wordcount/samplefile.txt
----------------------------------------------------------------------
diff --git a/demos/wordcount/src/main/resources/com/datatorrent/demos/wordcount/samplefile.txt b/demos/wordcount/src/main/resources/com/datatorrent/demos/wordcount/samplefile.txt
deleted file mode 100644
index 83eaaed..0000000
--- a/demos/wordcount/src/main/resources/com/datatorrent/demos/wordcount/samplefile.txt
+++ /dev/null
@@ -1 +0,0 @@
-CONTENT DELETED
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/wordcount/src/main/resources/samplefile.txt
----------------------------------------------------------------------
diff --git a/demos/wordcount/src/main/resources/samplefile.txt b/demos/wordcount/src/main/resources/samplefile.txt
deleted file mode 100644
index 02a5e70..0000000
--- a/demos/wordcount/src/main/resources/samplefile.txt
+++ /dev/null
@@ -1,2 +0,0 @@
-CONTENT DELETED
-
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/wordcount/src/site/conf/my-app-conf1.xml
----------------------------------------------------------------------
diff --git a/demos/wordcount/src/site/conf/my-app-conf1.xml b/demos/wordcount/src/site/conf/my-app-conf1.xml
deleted file mode 100644
index f35873b..0000000
--- a/demos/wordcount/src/site/conf/my-app-conf1.xml
+++ /dev/null
@@ -1,27 +0,0 @@
-<?xml version="1.0" encoding="UTF-8" standalone="no"?>
-<!--
-
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
-
--->
-<configuration>
- <property>
- <name>dt.attr.MASTER_MEMORY_MB</name>
- <value>1024</value>
- </property>
-</configuration>
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/wordcount/src/test/java/com/datatorrent/demos/wordcount/ApplicationTest.java
----------------------------------------------------------------------
diff --git a/demos/wordcount/src/test/java/com/datatorrent/demos/wordcount/ApplicationTest.java b/demos/wordcount/src/test/java/com/datatorrent/demos/wordcount/ApplicationTest.java
deleted file mode 100644
index 1df0459..0000000
--- a/demos/wordcount/src/test/java/com/datatorrent/demos/wordcount/ApplicationTest.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.demos.wordcount;
-
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.hadoop.conf.Configuration;
-import com.datatorrent.api.LocalMode;
-
-/**
- *
- */
-public class ApplicationTest
-{
- private final transient Logger LOG = LoggerFactory.getLogger(ApplicationTest.class);
- public ApplicationTest()
- {
- }
-
- @Test
- public void testSomeMethod() throws Exception
- {
- LocalMode lma = LocalMode.newInstance();
- Configuration conf = new Configuration(false);
- conf.addResource("dt-site-wordcount.xml");
- lma.prepareDAG(new Application(), conf);
- LocalMode.Controller lc = lma.getController();
- long start = System.currentTimeMillis();
- lc.run(300000);
- long end = System.currentTimeMillis();
- long time = end - start;
- LOG.debug("Test used " + time + " ms");
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/wordcount/src/test/resources/dt-site-wordcount.xml
----------------------------------------------------------------------
diff --git a/demos/wordcount/src/test/resources/dt-site-wordcount.xml b/demos/wordcount/src/test/resources/dt-site-wordcount.xml
deleted file mode 100644
index a25dac4..0000000
--- a/demos/wordcount/src/test/resources/dt-site-wordcount.xml
+++ /dev/null
@@ -1,37 +0,0 @@
-<!--
-
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
-
--->
-<configuration>
- <property>
- <name>dt.application.WordCountDemo.class</name>
- <value>com.datatorrent.demos.wordcount.Application</value>
- <description>An alias for the application</description>
- </property>
- <property>
- <name>dt.application.WordCountDemo.operator.wordinput.fileName</name>
- <value>samplefile.txt</value>
- </property>
- <property>
- <name>dt.application.WordCountDemo.stream.wordinput.count.locality</name>
- <value>CONTAINER_LOCAL</value>
- <description>Specify container locality for the viewtuplecount stream
- </description>
- </property>
-</configuration>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/wordcount/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/demos/wordcount/src/test/resources/log4j.properties b/demos/wordcount/src/test/resources/log4j.properties
deleted file mode 100644
index cf0d19e..0000000
--- a/demos/wordcount/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,43 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-log4j.rootLogger=DEBUG,CONSOLE
-
-log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
-log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
-log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
-log4j.appender.CONSOLE.threshold=${test.log.console.threshold}
-test.log.console.threshold=DEBUG
-
-log4j.appender.RFA=org.apache.log4j.RollingFileAppender
-log4j.appender.RFA.layout=org.apache.log4j.PatternLayout
-log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
-log4j.appender.RFA.File=/tmp/app.log
-
-# to enable, add SYSLOG to rootLogger
-log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender
-log4j.appender.SYSLOG.syslogHost=127.0.0.1
-log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout
-log4j.appender.SYSLOG.layout.conversionPattern=${dt.cid} %-5p [%t] %c{2} %x - %m%n
-log4j.appender.SYSLOG.Facility=LOCAL1
-
-log4j.logger.org=info
-#log4j.logger.org.apache.commons.beanutils=warn
-log4j.logger.com.datatorrent=debug
-log4j.logger.org.apache.apex=debug
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/yahoofinance/pom.xml
----------------------------------------------------------------------
diff --git a/demos/yahoofinance/pom.xml b/demos/yahoofinance/pom.xml
deleted file mode 100644
index 819a475..0000000
--- a/demos/yahoofinance/pom.xml
+++ /dev/null
@@ -1,65 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
-
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
-
- <artifactId>yahoo-finance-demo</artifactId>
- <packaging>jar</packaging>
-
- <name>Apache Apex Malhar Yahoo! Finance Demo</name>
- <description>Apex demo applications that get Yahoo finance feed and calculate minute price range, minute volume and simple moving average.</description>
-
- <parent>
- <groupId>org.apache.apex</groupId>
- <artifactId>malhar-demos</artifactId>
- <version>3.7.0-SNAPSHOT</version>
- </parent>
-
- <properties>
- <skipTests>true</skipTests>
- </properties>
-
- <dependencies>
- <dependency>
- <groupId>net.sf.opencsv</groupId>
- <artifactId>opencsv</artifactId>
- <version>2.0</version>
- </dependency>
- <dependency>
- <groupId>org.apache.derby</groupId>
- <artifactId>derby</artifactId>
- <version>10.9.1.0</version>
- </dependency>
- <dependency>
- <groupId>org.apache.apex</groupId>
- <artifactId>malhar-contrib</artifactId>
- <version>${project.version}</version>
- <exclusions>
- <exclusion>
- <groupId>*</groupId>
- <artifactId>*</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- </dependencies>
-
-</project>
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/yahoofinance/src/assemble/appPackage.xml
----------------------------------------------------------------------
diff --git a/demos/yahoofinance/src/assemble/appPackage.xml b/demos/yahoofinance/src/assemble/appPackage.xml
deleted file mode 100644
index 4138cf2..0000000
--- a/demos/yahoofinance/src/assemble/appPackage.xml
+++ /dev/null
@@ -1,59 +0,0 @@
-<!--
-
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
-
--->
-<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd">
- <id>appPackage</id>
- <formats>
- <format>jar</format>
- </formats>
- <includeBaseDirectory>false</includeBaseDirectory>
- <fileSets>
- <fileSet>
- <directory>${basedir}/target/</directory>
- <outputDirectory>/app</outputDirectory>
- <includes>
- <include>${project.artifactId}-${project.version}.jar</include>
- </includes>
- </fileSet>
- <fileSet>
- <directory>${basedir}/target/deps</directory>
- <outputDirectory>/lib</outputDirectory>
- </fileSet>
- <fileSet>
- <directory>${basedir}/src/site/conf</directory>
- <outputDirectory>/conf</outputDirectory>
- <includes>
- <include>*.xml</include>
- </includes>
- </fileSet>
- <fileSet>
- <directory>${basedir}/src/main/resources/META-INF</directory>
- <outputDirectory>/META-INF</outputDirectory>
- </fileSet>
- <fileSet>
- <directory>${basedir}/src/main/resources/app</directory>
- <outputDirectory>/app</outputDirectory>
- </fileSet>
- </fileSets>
-
-</assembly>
-
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/ApplicationWithDerbySQL.java
----------------------------------------------------------------------
diff --git a/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/ApplicationWithDerbySQL.java b/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/ApplicationWithDerbySQL.java
deleted file mode 100644
index 1a38495..0000000
--- a/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/ApplicationWithDerbySQL.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.demos.yahoofinance;
-
-import org.apache.apex.malhar.contrib.misc.streamquery.AbstractSqlStreamOperator;
-import org.apache.apex.malhar.contrib.misc.streamquery.DerbySqlStreamOperator;
-import org.apache.hadoop.conf.Configuration;
-
-import com.datatorrent.api.DAG;
-import com.datatorrent.api.StreamingApplication;
-import com.datatorrent.api.annotation.ApplicationAnnotation;
-import com.datatorrent.lib.io.ConsoleOutputOperator;
-
-/**
- * This demo will output the stock market data from yahoo finance
- *
- * @since 0.3.2
- */
-@ApplicationAnnotation(name = "YahooFinanceWithDerbySQLDemo")
-public class ApplicationWithDerbySQL implements StreamingApplication
-{
- @Override
- public void populateDAG(DAG dag, Configuration conf)
- {
- String symbolStr = conf.get(ApplicationWithDerbySQL.class.getName() + ".tickerSymbols", "YHOO,GOOG,AAPL,FB,AMZN,NFLX,IBM");
-
- String[] symbols = symbolStr.split(",");
-
- YahooFinanceCSVInputOperator input1 = dag.addOperator("input1", new YahooFinanceCSVInputOperator());
- YahooFinanceCSVInputOperator input2 = dag.addOperator("input2", new YahooFinanceCSVInputOperator());
- DerbySqlStreamOperator sqlOper = dag.addOperator("sqlOper", new DerbySqlStreamOperator());
- ConsoleOutputOperator consoleOperator = dag.addOperator("console", new ConsoleOutputOperator());
-
- for (String symbol : symbols) {
- input1.addSymbol(symbol);
- input2.addSymbol(symbol);
- }
- input1.addFormat("s0");
- input1.addFormat("l1");
- input2.addFormat("s0");
- input2.addFormat("e0");
- input2.addFormat("b4");
-
- AbstractSqlStreamOperator.InputSchema inputSchema1 = new AbstractSqlStreamOperator.InputSchema("t1");
- AbstractSqlStreamOperator.InputSchema inputSchema2 = new AbstractSqlStreamOperator.InputSchema("t2");
- inputSchema1.setColumnInfo("s0", "varchar(100)", true); // symbol
- inputSchema1.setColumnInfo("l1", "float", false); // last trade
- inputSchema2.setColumnInfo("s0", "varchar(100)", true); // symbol
- inputSchema2.setColumnInfo("e0", "float", false); // EPS
- inputSchema2.setColumnInfo("b4", "float", false); // Book value
-
- sqlOper.setInputSchema(0, inputSchema1);
- sqlOper.setInputSchema(1, inputSchema2);
-
- // Calculate PE Ratio and PB Ratio using SQL
- sqlOper.addExecStatementString("SELECT SESSION.t1.s0 AS symbol, SESSION.t1.l1 / SESSION.t2.e0 AS pe_ratio, SESSION.t1.l1 / SESSION.t2.b4 AS pb_ratio FROM SESSION.t1,SESSION.t2 WHERE SESSION.t1.s0 = SESSION.t2.s0");
-
- dag.addStream("input1_sql", input1.outputPort, sqlOper.in1);
- dag.addStream("input2_sql", input2.outputPort, sqlOper.in2);
-
- dag.addStream("result_console", sqlOper.result, consoleOperator.input);
-
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/StockTickInput.java
----------------------------------------------------------------------
diff --git a/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/StockTickInput.java b/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/StockTickInput.java
deleted file mode 100644
index 01e3ce9..0000000
--- a/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/StockTickInput.java
+++ /dev/null
@@ -1,192 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.demos.yahoofinance;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-
-import javax.validation.constraints.NotNull;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.commons.httpclient.HttpClient;
-import org.apache.commons.httpclient.HttpStatus;
-import org.apache.commons.httpclient.cookie.CookiePolicy;
-import org.apache.commons.httpclient.methods.GetMethod;
-import org.apache.commons.httpclient.params.DefaultHttpParams;
-import org.apache.hadoop.util.StringUtils;
-
-import com.datatorrent.api.Context.OperatorContext;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.InputOperator;
-import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
-import com.datatorrent.lib.util.KeyValPair;
-
-import au.com.bytecode.opencsv.CSVReader;
-
-/**
- * This operator sends price, volume and time into separate ports and calculates incremental volume.
- *
- * @since 0.3.2
- */
-public class StockTickInput implements InputOperator
-{
- private static final Logger logger = LoggerFactory.getLogger(StockTickInput.class);
- /**
- * Timeout interval for reading from server. 0 or negative indicates no timeout.
- */
- public int readIntervalMillis = 500;
- /**
- * The URL of the web service resource for the POST request.
- */
- private String url;
- private String[] symbols;
- @NotNull
- private String tickers;
- private transient HttpClient client;
- private transient GetMethod method;
- private HashMap<String, Long> lastVolume = new HashMap<String, Long>();
- private boolean outputEvenIfZeroVolume = false;
- /**
- * The output port to emit price.
- */
- @OutputPortFieldAnnotation(optional = true)
- public final transient DefaultOutputPort<KeyValPair<String, Double>> price = new DefaultOutputPort<KeyValPair<String, Double>>();
- /**
- * The output port to emit incremental volume.
- */
- @OutputPortFieldAnnotation(optional = true)
- public final transient DefaultOutputPort<KeyValPair<String, Long>> volume = new DefaultOutputPort<KeyValPair<String, Long>>();
- /**
- * The output port to emit last traded time.
- */
- @OutputPortFieldAnnotation(optional = true)
- public final transient DefaultOutputPort<KeyValPair<String, String>> time = new DefaultOutputPort<KeyValPair<String, String>>();
-
- /**
- * Prepare URL from symbols and parameters. URL will be something like: http://download.finance.yahoo.com/d/quotes.csv?s=IBM,GOOG,AAPL,YHOO&f=sl1vt1
- *
- * @return the URL
- */
- private String prepareURL()
- {
- String str = "http://download.finance.yahoo.com/d/quotes.csv?s=";
- for (int i = 0; i < symbols.length; i++) {
- if (i != 0) {
- str += ",";
- }
- str += symbols[i];
- }
- str += "&f=sl1vt1&e=.csv";
- return str;
- }
-
- @Override
- public void setup(OperatorContext context)
- {
- url = prepareURL();
- client = new HttpClient();
- method = new GetMethod(url);
- DefaultHttpParams.getDefaultParams().setParameter("http.protocol.cookie-policy", CookiePolicy.BROWSER_COMPATIBILITY);
- }
-
- @Override
- public void teardown()
- {
- }
-
- @Override
- public void emitTuples()
- {
-
- try {
- int statusCode = client.executeMethod(method);
- if (statusCode != HttpStatus.SC_OK) {
- logger.error("Method failed: " + method.getStatusLine());
- } else {
- InputStream istream = method.getResponseBodyAsStream();
- // Process response
- InputStreamReader isr = new InputStreamReader(istream);
- CSVReader reader = new CSVReader(isr);
- List<String[]> myEntries = reader.readAll();
- for (String[] stringArr: myEntries) {
- ArrayList<String> tuple = new ArrayList<String>(Arrays.asList(stringArr));
- if (tuple.size() != 4) {
- return;
- }
- // input csv is <Symbol>,<Price>,<Volume>,<Time>
- String symbol = tuple.get(0);
- double currentPrice = Double.valueOf(tuple.get(1));
- long currentVolume = Long.valueOf(tuple.get(2));
- String timeStamp = tuple.get(3);
- long vol = currentVolume;
- // Sends total volume in first tick, and incremental volume afterwards.
- if (lastVolume.containsKey(symbol)) {
- vol -= lastVolume.get(symbol);
- }
-
- if (vol > 0 || outputEvenIfZeroVolume) {
- price.emit(new KeyValPair<String, Double>(symbol, currentPrice));
- volume.emit(new KeyValPair<String, Long>(symbol, vol));
- time.emit(new KeyValPair<String, String>(symbol, timeStamp));
- lastVolume.put(symbol, currentVolume);
- }
- }
- }
- Thread.sleep(readIntervalMillis);
- } catch (InterruptedException ex) {
- logger.debug(ex.toString());
- } catch (IOException ex) {
- logger.debug(ex.toString());
- }
- }
-
- @Override
- public void beginWindow(long windowId)
- {
- }
-
- @Override
- public void endWindow()
- {
- }
-
- public void setOutputEvenIfZeroVolume(boolean outputEvenIfZeroVolume)
- {
- this.outputEvenIfZeroVolume = outputEvenIfZeroVolume;
- }
-
- public void setTickers(String tickers)
- {
- this.tickers = tickers;
- symbols = StringUtils.split(tickers, ',');
- }
-
- public String getTickers()
- {
- return tickers;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/YahooFinanceApplication.java
----------------------------------------------------------------------
diff --git a/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/YahooFinanceApplication.java b/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/YahooFinanceApplication.java
deleted file mode 100644
index a6aaece..0000000
--- a/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/YahooFinanceApplication.java
+++ /dev/null
@@ -1,365 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.demos.yahoofinance;
-
-import org.apache.hadoop.conf.Configuration;
-import com.datatorrent.api.Context.OperatorContext;
-import com.datatorrent.api.Context.PortContext;
-import com.datatorrent.api.DAG;
-import com.datatorrent.api.Operator.InputPort;
-import com.datatorrent.api.StreamingApplication;
-import com.datatorrent.api.annotation.ApplicationAnnotation;
-import com.datatorrent.lib.io.ConsoleOutputOperator;
-import com.datatorrent.lib.math.RangeKeyVal;
-import com.datatorrent.lib.math.SumKeyVal;
-import com.datatorrent.lib.multiwindow.SimpleMovingAverage;
-import com.datatorrent.lib.stream.ConsolidatorKeyVal;
-import com.datatorrent.lib.util.BaseKeyValueOperator.DefaultPartitionCodec;
-import com.datatorrent.lib.util.HighLow;
-
-/**
- * Yahoo! Finance Application Demo :<br>
- * Get Yahoo finance feed and calculate minute price range, minute volume,
- * simple moving average of 5 minutes. <br>
- * <br>
- * Functional Description : <br>
- * Application samples yahoo finance ticker every 200ms. All data points in one
- * second are streamed from input adapter. <br>
- * <br>
- *
- * Application calculates following Real Time Value(s):<br>
- * <ul>
- * <li>Quotes for IBM, Google, Apple, Yahoo stocks price/volume/time displayed
- * every second.</li>
- * <li>Charts for Stocks in terms for high/low price vs volume for last minute.</li>
- * <li>Simple moving average over last 5 minutes for IBM, Google, Apple, Yahoo
- * stocks.</li>
- * </ul>
- * <br>
- * <br>
- *
- * Custom Attribute : <br>
- * <ul>
- * <li>Application streaming window size(STREAMING_WINDOW_SIZE_MILLIS) = 1 sec,
- * since we are only interested in quotes every second.</li>
- * <li>Range/Minute Volume operator's window size(APPLICATION_WINDOW_COUNT) =
- * 60, aggregate over one minute.</li>
- * <li>Sum operator window length : 300, sliding average over last 5 minutes.</li>
- * </ul>
- * <br>
- *
- * Input Adapter : <br>
- * Stock Tick input operator get yahoo finance real time stock quotes data and
- * pushes application. <br>
- * <br>
- *
- * Output Adapter : <br>
- * Output values are written to console through ConsoleOutputOerator<br>
- * if you need to change write to HDFS,HTTP .. instead of console, <br>
- * Please refer to {@link com.datatorrent.lib.io.HttpOutputOperator} or
- * {@link com.datatorrent.lib.io.fs.HdfsOutputOperator}. <br>
- * <br>
- *
- * Run Sample Application : <br>
- * <p>
- * Running Java Test or Main app in IDE:
- *
- * <pre>
- * LocalMode.runApp(new Application(), 600000); // 10 min run
- * </pre>
- *
- * Run Success : <br>
- * For successful deployment and run, user should see following output on
- * console:
- *
- * <pre>
- * Price SMA: AAPL=435.965
- * Price SMA: GOOG=877.0
- * QUOTE: {YHOO=[26.37, 9760360, 4:00pm, null, null], IBM=[203.77, 2899698, 4:00pm, null, null], GOOG=[877.0, 2069614, 4:00pm, null, null], AAPL=[435.965, 10208099, 4:00pm, null, null]}
- * Price SMA: YHOO=26.37
- * </pre>
- *
- * Scaling Options : <br>
- * <ul>
- * <li>Volume operator can be replicated using a {@link StatelessPartitioner}
- * on an operator.</li>
- * <li>Range value operator can replicated but using proper unifier
- * operator(read App Dev Guide).</li>
- * <li>Slinging window operator can be replicated with proper unifier operator.</li>
- * </ul>
- * <br>
- *
- * Application DAG : <br>
- * <img src="doc-files/Application.gif" width=600px > <br>
- * <br>
- *
- * Streaming Window Size : 1000 ms(1 Sec) <br>
- * Operator Details : <br>
- * <ul>
- * <li>
- * <p>
- * <b>The operator DailyVolume:</b> This operator reads from the input port,
- * which contains the incremental volume tuples from StockTickInput, and
- * aggregates the data to provide the cumulative volume. It just utilizes the
- * library class SumKeyVal<K,V> provided in math package. In this case,
- * SumKeyVal<String,Long>, where K is the stock symbol, V is the aggregated
- * volume, with cumulative set to true. (Otherwise if cumulative was set to
- * false, SumKeyVal would provide the sum for the application window.) The platform
- * provides a number of built-in operators for simple operations like this so
- * that application developers do not have to write them. More examples to
- * follow. This operator assumes that the application restarts before market
- * opens every day.
- * </p>
- * Class : {@link com.datatorrent.lib.math.SumKeyVal} <br>
- * Operator Application Window Count : 1 <br>
- * StateFull : Yes, volume gets aggregated every window count.</li>
- *
- * <li>
- * <p>
- * <b>The operator MinuteVolume:</b> This operator reads from the input port,
- * which contains the volume tuples from StockTickInput, and aggregates the data
- * to provide the sum of the volume within one minute. Like the operator
- * DailyVolume, this operator is also SumKeyVal<String,Long>, but with
- * cumulative set to false. Application Window is set to 1 minute. We will
- * explain how to set this later. <br>
- * Class : {@link com.datatorrent.lib.math.SumKeyVal} <br>
- * Operator App Window Count : 60 (1 Minute) <br>
- * StateFull : Yes, aggregate over last 60 windows.</li>
- *
- * <li>
- * <p>
- * <b>The operator Quote:</b> This operator has three input ports, which are
- * price (from StockTickInput), daily_vol (from Daily Volume), and time (from
- * StockTickInput). This operator just consolidates the three data and and emits
- * the consolidated data. It utilizes the class ConsolidatorKeyVal<K> from
- * stream package.<br>
- * Class : {@link com.datatorrent.lib.stream.ConsolidatorKeyVal} <br>
- * Operator App Window Count : 1 <br>
- * StateFull : No</li>
- *
- * <li>
- * <p>
- * <b>The operator Chart:</b> This operator is very similar to the operator
- * Quote, except that it takes inputs from High Low and Minute Vol and outputs
- * the consolidated tuples to the output port. <br>
- * Class : {@link com.datatorrent.lib.stream.ConsolidatorKeyVal} <br>
- * StateFull : No<br>
- * Operator App Window Count : 1</li>
- *
- *
- * <li>
- * <p>
- * <b>The operator PriceSMA:</b> SMA stands for - Simple Moving Average. It
- * reads from the input port, which contains the price tuples from
- * StockTickInput, and provides the moving average price of the stock. It
- * utilizes SimpleMovingAverage<String,Double>, which is provided in multiwindow
- * package. SimpleMovingAverage keeps track of the data of the previous N
- * application windows in a sliding manner. For each end window event, it
- * provides the average of the data in those application windows. <br>
- * Class : {@link com.datatorrent.lib.multiwindow.SimpleMovingAverage} <br>
- * StateFull : Yes, stores values across application window. <br>
- * Operator App Window : 1 <br>
- * Operator Sliding Window : 300 (5 mins).</li>
- *
- * <li>
- * <p>
- * <b>The operator Console: </b> This operator just outputs the input tuples to
- * the console (or stdout). In this example, there are four console operators,
- * which connect to the output of Quote, Chart, PriceSMA and VolumeSMA. In
- * practice, they should be replaced by operators which would do something about
- * the data, like drawing charts. </li>
- *
- * </ul>
- * <br>
- *
- * @since 0.3.2
- */
-@ApplicationAnnotation(name = "YahooFinanceDemo")
-public class YahooFinanceApplication implements StreamingApplication
-{
- protected int streamingWindowSizeMilliSeconds = 1000; // 1 second
- protected int appWindowCountMinute = 60; // 1 minute
- protected int appWindowCountSMA = 300; // 5 minute
- //protected String[] tickers = {"IBM", "GOOG", "AAPL", "YHOO"};
-
- /**
- * Instantiate stock input operator for actual Yahoo finance ticks of symbol, last price, total daily volume, and last traded price.
- * @param name Operator name
- * @param dag Application DAG graph.
- * @return StockTickInput instance.
- */
- public StockTickInput getStockTickInputOperator(String name, DAG dag)
- {
- StockTickInput oper = dag.addOperator(name, StockTickInput.class);
- oper.readIntervalMillis = 200;
- //oper.symbols = tickers;
- return oper;
- }
-
- /**
- * Instantiate {@link com.datatorrent.lib.math.SumKeyVal} operator
- * to sends total daily volume by adding volumes from each ticks.
- * @param name Operator name
- * @param dag Application DAG graph.
- * @return SumKeyVal instance.
- */
- public SumKeyVal<String, Long> getDailyVolumeOperator(String name, DAG dag)
- {
- SumKeyVal<String, Long> oper = dag.addOperator(name, new SumKeyVal<String, Long>());
- oper.setType(Long.class);
- oper.setCumulative(true);
- return oper;
- }
-
- /**
- * Instantiate {@link com.datatorrent.lib.math.SumKeyVal} operator
- * Get aggregated volume of 1 minute and send at the end window of 1 minute.
- * @param name Operator name
- * @param dag Application DAG graph.
- * @param appWindowCount Operator aggregate window size.
- * @return SumKeyVal instance.
- */
- public SumKeyVal<String, Long> getMinuteVolumeOperator(String name, DAG dag, int appWindowCount)
- {
- SumKeyVal<String, Long> oper = dag.addOperator(name, new SumKeyVal<String, Long>());
- oper.setType(Long.class);
- dag.getOperatorMeta(name).getAttributes().put(OperatorContext.APPLICATION_WINDOW_COUNT, appWindowCount);
- return oper;
- }
-
- /**
- * Instantiate {@link com.datatorrent.lib.math.RangeKeyVal} operator to get high/low
- * value for each key within given application window.
- * Get High-low range for 1 minute.
- * @param name Operator name
- * @param dag Application DAG graph.
- * @param appWindowCount Operator aggregate window size.
- * @return RangeKeyVal instance.
- */
- public RangeKeyVal<String, Double> getHighLowOperator(String name, DAG dag, int appWindowCount)
- {
- RangeKeyVal<String, Double> oper = dag.addOperator(name, new RangeKeyVal<String, Double>());
- dag.getOperatorMeta(name).getAttributes().put(OperatorContext.APPLICATION_WINDOW_COUNT, appWindowCount);
- oper.setType(Double.class);
- return oper;
- }
-
- /**
- * Instantiate {@link com.datatorrent.lib.stream.ConsolidatorKeyVal} to send
- * Quote (Merge price, daily volume, time)
- * @param name Operator name
- * @param dag Application DAG graph.
- * @return ConsolidatorKeyVal instance.
- */
- public ConsolidatorKeyVal<String,Double,Long,String,?,?> getQuoteOperator(String name, DAG dag)
- {
- ConsolidatorKeyVal<String,Double,Long,String,?,?> oper = dag.addOperator(name, new ConsolidatorKeyVal<String,Double,Long,String,Object,Object>());
- return oper;
- }
-
- /**
- * Instantiate {@link com.datatorrent.lib.stream.ConsolidatorKeyVal} to send
- * Chart (Merge minute volume and minute high-low)
- * @param name Operator name
- * @param dag Application DAG graph.
- * @return ConsolidatorKeyVal instance.
- */
- public ConsolidatorKeyVal<String,HighLow<Double>,Long,?,?,?> getChartOperator(String name, DAG dag)
- {
- ConsolidatorKeyVal<String,HighLow<Double>,Long,?,?,?> oper = dag.addOperator(name, new ConsolidatorKeyVal<String,HighLow<Double>,Long,Object,Object,Object>());
- return oper;
- }
-
- /**
- * Instantiate {@link com.datatorrent.lib.multiwindow.SimpleMovingAverage} to calculate moving average for price
- * over given window size. Sliding window size is 1.
- * @param name Operator name
- * @param dag Application DAG graph.
- * @param appWindowCount Operator aggregate window size.
- * @return SimpleMovingAverage instance.
- */
- public SimpleMovingAverage<String, Double> getPriceSimpleMovingAverageOperator(String name, DAG dag, int appWindowCount)
- {
- SimpleMovingAverage<String, Double> oper = dag.addOperator(name, new SimpleMovingAverage<String, Double>());
- oper.setWindowSize(appWindowCount);
- oper.setType(Double.class);
- return oper;
- }
-
- /**
- * Get console for output operator.
- * @param name Operator name
- * @param dag Application DAG graph.
- * @return input port for console output.
- */
- public InputPort<Object> getConsole(String name, /*String nodeName,*/ DAG dag, String prefix)
- {
- // hack to output to HTTP based on actual environment
- /*
- String serverAddr = System.getenv("MALHAR_AJAXSERVER_ADDRESS");
- if (serverAddr != null) {
- HttpOutputOperator<Object> oper = dag.addOperator(name, new HttpOutputOperator<Object>());
- oper.setResourceURL(URI.create("http://" + serverAddr + "/channel/" + nodeName));
- return oper.input;
- }
- */
-
- ConsoleOutputOperator oper = dag.addOperator(name, ConsoleOutputOperator.class);
- oper.setStringFormat(prefix + ": %s");
- return oper.input;
- }
-
- /**
- * Populate Yahoo Finance Demo Application DAG.
- */
- @SuppressWarnings("unchecked")
- @Override
- public void populateDAG(DAG dag, Configuration conf)
- {
-
- dag.getAttributes().put(DAG.STREAMING_WINDOW_SIZE_MILLIS, streamingWindowSizeMilliSeconds);
-
- StockTickInput tick = getStockTickInputOperator("StockTickInput", dag);
- SumKeyVal<String, Long> dailyVolume = getDailyVolumeOperator("DailyVolume", dag);
- ConsolidatorKeyVal<String,Double,Long,String,?,?> quoteOperator = getQuoteOperator("Quote", dag);
-
- RangeKeyVal<String, Double> highlow = getHighLowOperator("HighLow", dag, appWindowCountMinute);
- SumKeyVal<String, Long> minuteVolume = getMinuteVolumeOperator("MinuteVolume", dag, appWindowCountMinute);
- ConsolidatorKeyVal<String,HighLow<Double>,Long,?,?,?> chartOperator = getChartOperator("Chart", dag);
-
- SimpleMovingAverage<String, Double> priceSMA = getPriceSimpleMovingAverageOperator("PriceSMA", dag, appWindowCountSMA);
- DefaultPartitionCodec<String, Double> codec = new DefaultPartitionCodec<String, Double>();
- dag.setInputPortAttribute(highlow.data, PortContext.STREAM_CODEC, codec);
- dag.setInputPortAttribute(priceSMA.data, PortContext.STREAM_CODEC, codec);
- dag.addStream("price", tick.price, quoteOperator.in1, highlow.data, priceSMA.data);
- dag.addStream("vol", tick.volume, dailyVolume.data, minuteVolume.data);
- dag.addStream("time", tick.time, quoteOperator.in3);
- dag.addStream("daily_vol", dailyVolume.sum, quoteOperator.in2);
-
- dag.addStream("quote_data", quoteOperator.out, getConsole("quoteConsole", dag, "QUOTE"));
-
- dag.addStream("high_low", highlow.range, chartOperator.in1);
- dag.addStream("vol_1min", minuteVolume.sum, chartOperator.in2);
- dag.addStream("chart_data", chartOperator.out, getConsole("chartConsole", dag, "CHART"));
-
- dag.addStream("sma_price", priceSMA.doubleSMA, getConsole("priceSMAConsole", dag, "Price SMA"));
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/YahooFinanceCSVInputOperator.java
----------------------------------------------------------------------
diff --git a/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/YahooFinanceCSVInputOperator.java b/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/YahooFinanceCSVInputOperator.java
deleted file mode 100644
index cf3801e..0000000
--- a/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/YahooFinanceCSVInputOperator.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.demos.yahoofinance;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.commons.httpclient.HttpClient;
-import org.apache.commons.httpclient.HttpStatus;
-import org.apache.commons.httpclient.cookie.CookiePolicy;
-import org.apache.commons.httpclient.methods.GetMethod;
-import org.apache.commons.httpclient.params.DefaultHttpParams;
-
-import com.datatorrent.api.Context.OperatorContext;
-import com.datatorrent.lib.io.SimpleSinglePortInputOperator;
-
-import au.com.bytecode.opencsv.CSVReader;
-
-/**
- * Grabs Yahoo Finance quotes data and emits HashMap, with key equals the format name (e.g. "s0") <p>
- *
- * @since 0.3.2
- */
-public class YahooFinanceCSVInputOperator extends SimpleSinglePortInputOperator<HashMap<String, Object>> implements Runnable
-{
- private static final Logger logger = LoggerFactory.getLogger(YahooFinanceCSVInputOperator.class);
- /**
- * Timeout interval for reading from server. 0 or negative indicates no timeout.
- */
- private int readIntervalMillis = 500;
-
- /**
- * The URL of the web service resource for the POST request.
- */
- private String url;
- private transient HttpClient client;
- private transient GetMethod method;
-
- private ArrayList<String> symbolList = new ArrayList<String>();
- private ArrayList<String> parameterList = new ArrayList<String>();
-
- public void addSymbol(String symbol)
- {
- symbolList.add(symbol);
- }
-
- public void addFormat(String format)
- {
- parameterList.add(format);
- }
-
- public ArrayList<String> getSymbolList()
- {
- return symbolList;
- }
-
- public ArrayList<String> getParameterList()
- {
- return parameterList;
- }
-
- public int getReadIntervalMillis()
- {
- return readIntervalMillis;
- }
-
- public void setReadIntervalMillis(int readIntervalMillis)
- {
- this.readIntervalMillis = readIntervalMillis;
- }
-
- /**
- * Prepare URL from symbols and parameters.
- * URL will be something like: http://download.finance.yahoo.com/d/quotes.csv?s=GOOG,FB,YHOO&f=sl1vt1&e=.csv
- * @return
- */
- private String prepareURL()
- {
- String str = "http://download.finance.yahoo.com/d/quotes.csv?";
-
- str += "s=";
- for (int i = 0; i < symbolList.size(); i++) {
- if (i != 0) {
- str += ",";
- }
- str += symbolList.get(i);
- }
- str += "&f=";
- for (String format: parameterList) {
- str += format;
- }
- str += "&e=.csv";
- return str;
- }
-
- @Override
- public void setup(OperatorContext context)
- {
- url = prepareURL();
- client = new HttpClient();
- method = new GetMethod(url);
- DefaultHttpParams.getDefaultParams().setParameter("http.protocol.cookie-policy", CookiePolicy.BROWSER_COMPATIBILITY);
- }
-
- @Override
- public void run()
- {
- while (true) {
- try {
- int statusCode = client.executeMethod(method);
- if (statusCode != HttpStatus.SC_OK) {
- logger.error("Method failed: " + method.getStatusLine());
- } else {
- InputStream istream;
- istream = method.getResponseBodyAsStream();
- // Process response
- InputStreamReader isr = new InputStreamReader(istream);
- CSVReader reader = new CSVReader(isr);
- List<String[]> myEntries;
- myEntries = reader.readAll();
- for (String[] stringArr: myEntries) {
- HashMap<String,Object> hm = new HashMap<String,Object>();
- for (int i = 0; i < parameterList.size(); i++) {
- hm.put(parameterList.get(i), stringArr[i]);
- }
- outputPort.emit(hm); // send out one symbol at a time
- }
- }
- Thread.sleep(readIntervalMillis);
- } catch (InterruptedException ex) {
- logger.debug(ex.toString());
- } catch (IOException ex) {
- logger.debug(ex.toString());
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/doc-files/Application.gif
----------------------------------------------------------------------
diff --git a/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/doc-files/Application.gif b/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/doc-files/Application.gif
deleted file mode 100644
index 8fc8331..0000000
Binary files a/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/doc-files/Application.gif and /dev/null differ
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/package-info.java
----------------------------------------------------------------------
diff --git a/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/package-info.java b/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/package-info.java
deleted file mode 100644
index 992bc30..0000000
--- a/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-/**
- * Yahoo Finance demonstration applications.
- */
-package com.datatorrent.demos.yahoofinance;