You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2017/03/19 14:54:14 UTC

[3/3] apex-malhar git commit: Merge commit 'refs/pull/480/head' of https://github.com/apache/apex-malhar

Merge commit 'refs/pull/480/head' of https://github.com/apache/apex-malhar


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/a017dfaa
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/a017dfaa
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/a017dfaa

Branch: refs/heads/master
Commit: a017dfaa45e74e270b4d5619f83994388ed6dc09
Parents: a1c319c 65488fd
Author: Thomas Weise <th...@apache.org>
Authored: Sun Mar 19 07:39:52 2017 -0700
Committer: Thomas Weise <th...@apache.org>
Committed: Sun Mar 19 07:39:52 2017 -0700

----------------------------------------------------------------------
 .../malhar/stream/sample/MinimalWordCount.java  |   2 +-
 .../malhar/stream/sample/WindowedWordCount.java |   2 +-
 .../stream/sample/complete/AutoComplete.java    |   2 +-
 .../sample/complete/StreamingWordExtract.java   |   2 +-
 .../sample/complete/TopWikipediaSessions.java   |   2 +-
 .../stream/sample/complete/TrafficRoutes.java   |   2 +-
 .../sample/complete/TwitterAutoComplete.java    |   2 +-
 .../sample/cookbook/CombinePerKeyExamples.java  |   2 +-
 .../stream/sample/cookbook/DeDupExample.java    |   2 +-
 .../sample/cookbook/MaxPerKeyExamples.java      |   2 +-
 .../stream/sample/cookbook/TriggerExample.java  |   2 +-
 .../lib/function/AnnonymousClassModifier.java   | 134 +++++++
 .../apex/malhar/lib/function/Function.java      |  87 +++++
 .../malhar/lib/function/FunctionOperator.java   | 378 +++++++++++++++++++
 .../malhar/lib/utils/ByteArrayClassLoader.java  |  54 +++
 .../apache/apex/malhar/lib/utils/TupleUtil.java |  46 +++
 .../apex/malhar/stream/api/ApexStream.java      |   2 +-
 .../apex/malhar/stream/api/WindowedStream.java  |   5 +-
 .../malhar/stream/api/function/Function.java    |  88 -----
 .../malhar/stream/api/impl/ApexStreamImpl.java  |   6 +-
 .../stream/api/impl/ApexWindowedStreamImpl.java |   2 +-
 .../api/operator/AnnonymousClassModifier.java   | 134 -------
 .../api/operator/ByteArrayClassLoader.java      |  54 ---
 .../stream/api/operator/FunctionOperator.java   | 378 -------------------
 .../apex/malhar/stream/api/util/TupleUtil.java  |  46 ---
 .../FunctionOperator/FunctionOperatorTest.java  |   4 +-
 .../stream/sample/ApplicationWithStreamAPI.java |   2 +-
 .../LocalTestWithoutStreamApplication.java      |   2 +-
 .../apex/malhar/stream/sample/MyStream.java     |   2 +-
 .../apex/malhar/stream/sample/MyStreamTest.java |   2 +-
 .../stream/sample/WordCountWithStreamAPI.java   |   2 +-
 31 files changed, 723 insertions(+), 727 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a017dfaa/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/MinimalWordCount.java
----------------------------------------------------------------------
diff --cc examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/MinimalWordCount.java
index 327c882,0000000..160175f
mode 100644,000000..100644
--- a/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/MinimalWordCount.java
+++ b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/MinimalWordCount.java
@@@ -1,128 -1,0 +1,128 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.apex.malhar.stream.sample;
 +
 +import java.util.Arrays;
 +import java.util.HashMap;
 +import java.util.Map;
 +
++import org.apache.apex.malhar.lib.function.Function;
 +import org.apache.apex.malhar.lib.window.TriggerOption;
 +import org.apache.apex.malhar.lib.window.Tuple;
 +import org.apache.apex.malhar.lib.window.WindowOption;
- import org.apache.apex.malhar.stream.api.function.Function;
 +import org.apache.apex.malhar.stream.api.impl.StreamFactory;
 +import org.apache.hadoop.conf.Configuration;
 +
 +import com.datatorrent.api.Context;
 +import com.datatorrent.api.DAG;
 +import com.datatorrent.api.DefaultInputPort;
 +import com.datatorrent.api.StreamingApplication;
 +import com.datatorrent.api.annotation.ApplicationAnnotation;
 +import com.datatorrent.common.util.BaseOperator;
 +import com.datatorrent.lib.util.KeyValPair;
 +
 +import static org.apache.apex.malhar.stream.api.Option.Options.name;
 +
 +/**
 + * Beam MinimalWordCount Example
 + *
 + * @since 3.5.0
 + */
 +@ApplicationAnnotation(name = "MinimalWordCount")
 +public class MinimalWordCount implements StreamingApplication
 +{
 +  public static class Collector extends BaseOperator
 +  {
 +    static Map<String, Long> result;
 +    private static boolean done = false;
 +
 +    public static boolean isDone()
 +    {
 +      return done;
 +    }
 +
 +    @Override
 +    public void setup(Context.OperatorContext context)
 +    {
 +      done = false;
 +      result = new HashMap<>();
 +    }
 +
 +    public final transient DefaultInputPort<KeyValPair<String, Long>> input = new DefaultInputPort<KeyValPair<String, Long>>()
 +    {
 +      @Override
 +      public void process(KeyValPair<String, Long> tuple)
 +      {
 +        if (tuple.getKey().equals("bye")) {
 +          done = true;
 +        }
 +        result.put(tuple.getKey(), tuple.getValue());
 +      }
 +    };
 +  }
 +
 +  /**
 +   * Populate the dag using High-Level API.
 +   * @param dag
 +   * @param conf
 +   */
 +  @Override
 +  public void populateDAG(DAG dag, Configuration conf)
 +  {
 +    Collector collector = new Collector();
 +    // Create a stream reading from a file line by line using StreamFactory.
 +    StreamFactory.fromFolder("./src/test/resources/wordcount", name("textInput"))
 +        // Use a flatmap transformation to extract words from the incoming stream of lines.
 +        .flatMap(new Function.FlatMapFunction<String, String>()
 +        {
 +          @Override
 +          public Iterable<String> f(String input)
 +          {
 +            return Arrays.asList(input.split("[^a-zA-Z']+"));
 +
 +          }
 +        }, name("ExtractWords"))
 +        // Apply windowing to the stream for counting, in this case, the window option is global window.
 +        .window(new WindowOption.GlobalWindow(), new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(1))
 +        // Count the appearances of every word.
 +        .countByKey(new Function.ToKeyValue<String, String, Long>()
 +        {
 +          @Override
 +          public Tuple<KeyValPair<String, Long>> f(String input)
 +          {
 +            return new Tuple.PlainTuple<KeyValPair<String, Long>>(new KeyValPair<String, Long>(input, 1L));
 +          }
 +        }, name("countByKey"))
 +        // Format the counting result to a readable format by unwrapping the tuples.
 +        .map(new Function.MapFunction<Tuple.WindowedTuple<KeyValPair<String, Long>>, KeyValPair<String, Long>>()
 +        {
 +          @Override
 +          public KeyValPair<String, Long> f(Tuple.WindowedTuple<KeyValPair<String, Long>> input)
 +          {
 +            return input.getValue();
 +          }
 +        }, name("FormatResults"))
 +        // Print the result.
 +        .print(name("console"))
 +        // Attach a collector to the stream to collect results.
 +        .endWith(collector, collector.input, name("Collector"))
 +        // populate the dag using the stream.
 +        .populateDag(dag);
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a017dfaa/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/WindowedWordCount.java
----------------------------------------------------------------------
diff --cc examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/WindowedWordCount.java
index 5b83bd0,0000000..6e57bfd
mode 100644,000000..100644
--- a/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/WindowedWordCount.java
+++ b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/WindowedWordCount.java
@@@ -1,290 -1,0 +1,290 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.apex.malhar.stream.sample;
 +
 +import java.io.BufferedReader;
 +import java.io.IOException;
 +import java.io.InputStream;
 +import java.io.InputStreamReader;
 +import java.util.Arrays;
 +import java.util.HashMap;
 +import java.util.Map;
 +
 +import org.joda.time.Duration;
++import org.apache.apex.malhar.lib.function.Function;
 +import org.apache.apex.malhar.lib.window.TriggerOption;
 +import org.apache.apex.malhar.lib.window.Tuple;
 +import org.apache.apex.malhar.lib.window.WindowOption;
 +import org.apache.apex.malhar.stream.api.ApexStream;
 +import org.apache.apex.malhar.stream.api.WindowedStream;
- import org.apache.apex.malhar.stream.api.function.Function;
 +import org.apache.apex.malhar.stream.api.impl.StreamFactory;
 +import org.apache.commons.io.IOUtils;
 +import org.apache.hadoop.conf.Configuration;
 +
 +import com.google.common.base.Throwables;
 +import com.datatorrent.api.Context;
 +import com.datatorrent.api.DAG;
 +import com.datatorrent.api.DefaultInputPort;
 +import com.datatorrent.api.DefaultOutputPort;
 +import com.datatorrent.api.InputOperator;
 +import com.datatorrent.api.StreamingApplication;
 +import com.datatorrent.api.annotation.ApplicationAnnotation;
 +import com.datatorrent.common.util.BaseOperator;
 +import com.datatorrent.lib.util.KeyValPair;
 +
 +import static org.apache.apex.malhar.stream.api.Option.Options.name;
 +
 +/**
 + * Beam WindowedWordCount Example.
 + *
 + * @since 3.5.0
 + */
 +@ApplicationAnnotation(name = "WindowedWordCount")
 +public class WindowedWordCount implements StreamingApplication
 +{
 +  static final int WINDOW_SIZE = 1;  // Default window duration in minutes
 +
 +  /**
 +   * A input operator that reads from and output a file line by line to downstream with a time gap between
 +   * every two lines.
 +   */
 +  public static class TextInput extends BaseOperator implements InputOperator
 +  {
 +    public final transient DefaultOutputPort<String> output = new DefaultOutputPort<>();
 +    private boolean done = false;
 +
 +    private transient BufferedReader reader;
 +
 +    @Override
 +    public void setup(Context.OperatorContext context)
 +    {
 +      done = false;
 +      initReader();
 +    }
 +
 +    private void initReader()
 +    {
 +      try {
 +        InputStream resourceStream = this.getClass().getResourceAsStream("/wordcount/word.txt");
 +        reader = new BufferedReader(new InputStreamReader(resourceStream));
 +      } catch (Exception ex) {
 +        throw Throwables.propagate(ex);
 +      }
 +    }
 +
 +    @Override
 +    public void teardown()
 +    {
 +      IOUtils.closeQuietly(reader);
 +    }
 +
 +    @Override
 +    public void emitTuples()
 +    {
 +      if (!done) {
 +        try {
 +          String line = reader.readLine();
 +          if (line == null) {
 +            done = true;
 +            reader.close();
 +          } else {
 +            this.output.emit(line);
 +          }
 +          Thread.sleep(50);
 +        } catch (IOException ex) {
 +          throw new RuntimeException(ex);
 +        } catch (InterruptedException e) {
 +          throw Throwables.propagate(e);
 +        }
 +      }
 +    }
 +  }
 +
 +  public static class Collector extends BaseOperator
 +  {
 +    private static Map<KeyValPair<Long, String>, Long> result = new HashMap<>();
 +    private static boolean done = false;
 +
 +    @Override
 +    public void setup(Context.OperatorContext context)
 +    {
 +      super.setup(context);
 +      done = false;
 +    }
 +
 +    public static boolean isDone()
 +    {
 +      return done;
 +    }
 +
 +    public static Map<KeyValPair<Long, String>, Long> getResult()
 +    {
 +      return result;
 +    }
 +
 +    public final transient DefaultInputPort<PojoEvent> input = new DefaultInputPort<PojoEvent>()
 +    {
 +      @Override
 +      public void process(PojoEvent tuple)
 +      {
 +        result.put(new KeyValPair<Long, String>(tuple.getTimestamp(), tuple.getWord()), tuple.getCount());
 +        if (tuple.getWord().equals("bye")) {
 +          done = true;
 +        }
 +      }
 +    };
 +  }
 +
 +  /**
 +   * A Pojo Tuple class used for outputting result to JDBC.
 +   */
 +  public static class PojoEvent
 +  {
 +    private String word;
 +    private long count;
 +    private long timestamp;
 +
 +    @Override
 +    public String toString()
 +    {
 +      return "PojoEvent (word=" + getWord() + ", count=" + getCount() + ", timestamp=" + getTimestamp() + ")";
 +    }
 +
 +    public String getWord()
 +    {
 +      return word;
 +    }
 +
 +    public void setWord(String word)
 +    {
 +      this.word = word;
 +    }
 +
 +    public long getCount()
 +    {
 +      return count;
 +    }
 +
 +    public void setCount(long count)
 +    {
 +      this.count = count;
 +    }
 +
 +    public long getTimestamp()
 +    {
 +      return timestamp;
 +    }
 +
 +    public void setTimestamp(long timestamp)
 +    {
 +      this.timestamp = timestamp;
 +    }
 +  }
 +
 +  /**
 +   * A map function that wrap the input string with a random generated timestamp.
 +   */
 +  public static class AddTimestampFn implements Function.MapFunction<String, Tuple.TimestampedTuple<String>>
 +  {
 +    private static final Duration RAND_RANGE = Duration.standardMinutes(10);
 +    private final Long minTimestamp;
 +
 +    AddTimestampFn()
 +    {
 +      this.minTimestamp = System.currentTimeMillis();
 +    }
 +
 +    @Override
 +    public Tuple.TimestampedTuple<String> f(String input)
 +    {
 +      // Generate a timestamp that falls somewhere in the past two hours.
 +      long randMillis = (long)(Math.random() * RAND_RANGE.getMillis());
 +      long randomTimestamp = minTimestamp + randMillis;
 +
 +      return new Tuple.TimestampedTuple<>(randomTimestamp, input);
 +    }
 +  }
 +
 +  /** A MapFunction that converts a Word and Count into a PojoEvent. */
 +  public static class FormatAsTableRowFn implements Function.MapFunction<Tuple.WindowedTuple<KeyValPair<String, Long>>, PojoEvent>
 +  {
 +    @Override
 +    public PojoEvent f(Tuple.WindowedTuple<KeyValPair<String, Long>> input)
 +    {
 +      PojoEvent row = new PojoEvent();
 +      row.setTimestamp(input.getTimestamp());
 +      row.setCount(input.getValue().getValue());
 +      row.setWord(input.getValue().getKey());
 +      return row;
 +    }
 +  }
 +
 +  /**
 +   * Populate dag with High-Level API.
 +   * @param dag
 +   * @param conf
 +   */
 +  @Override
 +  public void populateDAG(DAG dag, Configuration conf)
 +  {
 +    TextInput input = new TextInput();
 +    Collector collector = new Collector();
 +
 +    // Create stream from the TextInput operator.
 +    ApexStream<Tuple.TimestampedTuple<String>> stream = StreamFactory.fromInput(input, input.output, name("input"))
 +
 +        // Extract all the words from the input line of text.
 +        .flatMap(new Function.FlatMapFunction<String, String>()
 +        {
 +          @Override
 +          public Iterable<String> f(String input)
 +          {
 +            return Arrays.asList(input.split("[\\p{Punct}\\s]+"));
 +          }
 +        }, name("ExtractWords"))
 +
 +        // Wrap the word with a randomly generated timestamp.
 +        .map(new AddTimestampFn(), name("AddTimestampFn"));
 +
 +
 +    // apply window and trigger option.
 +    // TODO: change trigger option to atWaterMark when available.
 +    WindowedStream<Tuple.TimestampedTuple<String>> windowedWords = stream
 +        .window(new WindowOption.TimeWindows(Duration.standardMinutes(WINDOW_SIZE)),
 +        new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(1));
 +
 +
 +    WindowedStream<PojoEvent> wordCounts =
 +        // Perform a countByKey transformation to count the appearance of each word in every time window.
 +        windowedWords.countByKey(new Function.ToKeyValue<Tuple.TimestampedTuple<String>, String, Long>()
 +        {
 +          @Override
 +          public Tuple<KeyValPair<String, Long>> f(Tuple.TimestampedTuple<String> input)
 +          {
 +            return new Tuple.TimestampedTuple<KeyValPair<String, Long>>(input.getTimestamp(),
 +              new KeyValPair<String, Long>(input.getValue(), 1L));
 +          }
 +        }, name("count words"))
 +
 +        // Format the output and print out the result.
 +        .map(new FormatAsTableRowFn(), name("FormatAsTableRowFn")).print(name("console"));
 +
 +    wordCounts.endWith(collector, collector.input, name("Collector")).populateDag(dag);
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a017dfaa/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/AutoComplete.java
----------------------------------------------------------------------
diff --cc examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/AutoComplete.java
index 2db59b6,0000000..571a25f
mode 100644,000000..100644
--- a/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/AutoComplete.java
+++ b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/AutoComplete.java
@@@ -1,324 -1,0 +1,324 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.apex.malhar.stream.sample.complete;
 +
 +import java.io.BufferedReader;
 +import java.io.IOException;
 +import java.io.InputStream;
 +import java.io.InputStreamReader;
 +import java.util.HashMap;
 +import java.util.LinkedList;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.regex.Matcher;
 +import java.util.regex.Pattern;
 +
++import org.apache.apex.malhar.lib.function.Function;
 +import org.apache.apex.malhar.lib.window.TriggerOption;
 +import org.apache.apex.malhar.lib.window.Tuple;
 +import org.apache.apex.malhar.lib.window.Window;
 +import org.apache.apex.malhar.lib.window.WindowOption;
 +import org.apache.apex.malhar.stream.api.ApexStream;
 +import org.apache.apex.malhar.stream.api.CompositeStreamTransform;
 +import org.apache.apex.malhar.stream.api.WindowedStream;
- import org.apache.apex.malhar.stream.api.function.Function;
 +import org.apache.apex.malhar.stream.api.impl.StreamFactory;
 +import org.apache.commons.io.IOUtils;
 +import org.apache.hadoop.conf.Configuration;
 +
 +import com.google.common.base.Throwables;
 +
 +import com.datatorrent.api.Context.OperatorContext;
 +import com.datatorrent.api.DAG;
 +import com.datatorrent.api.DefaultInputPort;
 +import com.datatorrent.api.DefaultOutputPort;
 +import com.datatorrent.api.InputOperator;
 +import com.datatorrent.api.StreamingApplication;
 +import com.datatorrent.api.annotation.ApplicationAnnotation;
 +import com.datatorrent.common.util.BaseOperator;
 +import com.datatorrent.lib.util.KeyValPair;
 +
 +import static org.apache.apex.malhar.stream.api.Option.Options.name;
 +
 +/**
 + * An example that computes the most popular hash tags
 + * for every prefix, which can be used for auto-completion.
 + * This application is identical to TwitterAutoComplete, except it's
 + * reading from a file. This application is mainly for local testing
 + * purpose.
 + *
 + * <p>This will update the datastore every 10 seconds based on the last
 + * 30 minutes of data received.
 + *
 + * @since 3.5.0
 + */
 +@ApplicationAnnotation(name = "AutoComplete")
 +public class AutoComplete implements StreamingApplication
 +{
 +
 +  /**
 +   * A dummy Twitter input operator. It reads from a text file containing some tweets and output a line every
 +   * half of a second.
 +   */
 +  public static class TweetsInput extends BaseOperator implements InputOperator
 +  {
 +    public final transient DefaultOutputPort<String> output = new DefaultOutputPort<>();
 +    private boolean done;
 +
 +    private transient BufferedReader reader;
 +
 +    @Override
 +    public void setup(OperatorContext context)
 +    {
 +      done = false;
 +      initReader();
 +    }
 +
 +    private void initReader()
 +    {
 +      try {
 +        InputStream resourceStream = this.getClass().getResourceAsStream("/sampletweets.txt");
 +        reader = new BufferedReader(new InputStreamReader(resourceStream));
 +      } catch (Exception ex) {
 +        throw Throwables.propagate(ex);
 +      }
 +    }
 +
 +    @Override
 +    public void teardown()
 +    {
 +      IOUtils.closeQuietly(reader);
 +    }
 +
 +    @Override
 +    public void emitTuples()
 +    {
 +      if (!done) {
 +        try {
 +          String line = reader.readLine();
 +          if (line == null) {
 +            done = true;
 +            reader.close();
 +          } else {
 +            this.output.emit(line);
 +          }
 +          Thread.sleep(50);
 +        } catch (IOException ex) {
 +          throw new RuntimeException(ex);
 +        } catch (InterruptedException e) {
 +          // Ignore it.
 +        }
 +      }
 +    }
 +  }
 +
 +  public static class Collector extends BaseOperator
 +  {
 +    private static Map<String, List<CompletionCandidate>> result = new HashMap<>();
 +    private static boolean done = false;
 +
 +    public static boolean isDone()
 +    {
 +      return done;
 +    }
 +
 +    @Override
 +    public void setup(OperatorContext context)
 +    {
 +      super.setup(context);
 +      done = false;
 +    }
 +
 +    public static Map<String, List<CompletionCandidate>> getResult()
 +    {
 +      return result;
 +    }
 +
 +    public final transient DefaultInputPort<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>> input = new DefaultInputPort<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>>()
 +    {
 +      @Override
 +      public void process(Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>> tuple)
 +      {
 +        if (tuple.getValue().getKey().equals("yarn")) {
 +          done = true;
 +        }
 +        result.put(tuple.getValue().getKey(), tuple.getValue().getValue());
 +      }
 +    };
 +  }
 +
 +  /**
 +   * FlapMap Function to extract all hashtags from a text form tweet.
 +   */
 +  private static class ExtractHashtags implements Function.FlatMapFunction<String, String>
 +  {
 +
 +    @Override
 +    public Iterable<String> f(String input)
 +    {
 +      List<String> result = new LinkedList<>();
 +      Matcher m = Pattern.compile("#\\S+").matcher(input);
 +      while (m.find()) {
 +        result.add(m.group().substring(1));
 +      }
 +      return result;
 +    }
 +  }
 +
 +  /**
 +   * Lower latency, but more expensive.
 +   */
 +  private static class ComputeTopFlat
 +      extends CompositeStreamTransform<WindowedStream<CompletionCandidate>, WindowedStream<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>>>
 +  {
 +    private final int candidatesPerPrefix;
 +    private final int minPrefix;
 +
 +    public ComputeTopFlat(int candidatesPerPrefix, int minPrefix)
 +    {
 +      this.candidatesPerPrefix = candidatesPerPrefix;
 +      this.minPrefix = minPrefix;
 +    }
 +
 +    @Override
 +    public WindowedStream<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>> compose(
 +        WindowedStream<CompletionCandidate> input)
 +    {
 +      return input
 +        .<KeyValPair<String, CompletionCandidate>, WindowedStream<KeyValPair<String, CompletionCandidate>>>flatMap(new AllPrefixes(minPrefix))
 +        .accumulateByKey(new TopNByKey(), new Function.ToKeyValue<KeyValPair<String, CompletionCandidate>, String,
 +          CompletionCandidate>()
 +        {
 +          @Override
 +          public Tuple<KeyValPair<String, CompletionCandidate>> f(KeyValPair<String, CompletionCandidate> tuple)
 +          {
 +            // TODO: Should be removed after Auto-wrapping is supported.
 +            return new Tuple.WindowedTuple<>(Window.GlobalWindow.INSTANCE, tuple);
 +          }
 +        });
 +    }
 +  }
 +
 +  /**
 +   * FlapMap Function to extract all prefixes of the hashtag in the input CompletionCandidate, and output
 +   * KeyValPairs of the prefix and the CompletionCandidate
 +   */
 +  private static class AllPrefixes implements Function.FlatMapFunction<CompletionCandidate, KeyValPair<String, CompletionCandidate>>
 +  {
 +    private final int minPrefix;
 +    private final int maxPrefix;
 +
 +    public AllPrefixes()
 +    {
 +      this(0, Integer.MAX_VALUE);
 +    }
 +
 +    public AllPrefixes(int minPrefix)
 +    {
 +      this(minPrefix, Integer.MAX_VALUE);
 +    }
 +
 +    public AllPrefixes(int minPrefix, int maxPrefix)
 +    {
 +      this.minPrefix = minPrefix;
 +      this.maxPrefix = maxPrefix;
 +    }
 +
 +    @Override
 +    public Iterable<KeyValPair<String, CompletionCandidate>> f(CompletionCandidate input)
 +    {
 +      List<KeyValPair<String, CompletionCandidate>> result = new LinkedList<>();
 +      String word = input.getValue();
 +      for (int i = minPrefix; i <= Math.min(word.length(), maxPrefix); i++) {
 +
 +        result.add(new KeyValPair<>(input.getValue().substring(0, i).toLowerCase(), input));
 +      }
 +      return result;
 +    }
 +  }
 +
 +  /**
 +   * A Composite stream transform that takes as input a list of tokens and returns
 +   * the most common tokens per prefix.
 +   */
 +  public static class ComputeTopCompletions
 +      extends CompositeStreamTransform<WindowedStream<String>, WindowedStream<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>>>
 +  {
 +    private final int candidatesPerPrefix;
 +    private final boolean recursive;
 +
 +    protected ComputeTopCompletions(int candidatesPerPrefix, boolean recursive)
 +    {
 +      this.candidatesPerPrefix = candidatesPerPrefix;
 +      this.recursive = recursive;
 +    }
 +
 +    public static ComputeTopCompletions top(int candidatesPerPrefix, boolean recursive)
 +    {
 +      return new ComputeTopCompletions(candidatesPerPrefix, recursive);
 +    }
 +
 +    @Override
 +    @SuppressWarnings("unchecked")
 +    public WindowedStream<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>> compose(WindowedStream<String> inputStream)
 +    {
 +      ApexStream<CompletionCandidate> candidates = inputStream
 +          .countByKey(new Function.ToKeyValue<String, String, Long>()
 +          {
 +            @Override
 +            public Tuple<KeyValPair<String, Long>> f(String input)
 +            {
 +              return new Tuple.PlainTuple<>(new KeyValPair<>(input, 1L));
 +            }
 +          }, name("countByKey"))
 +          .map(new Function.MapFunction<Tuple.WindowedTuple<KeyValPair<String,Long>>, CompletionCandidate>()
 +          {
 +            @Override
 +            public CompletionCandidate f(Tuple.WindowedTuple<KeyValPair<String, Long>> input)
 +            {
 +              return new CompletionCandidate(input.getValue().getKey(), input.getValue().getValue());
 +            }
 +          }, name("ToCompletionCandidate"));
 +
 +      return candidates.addCompositeStreams(new ComputeTopFlat(10, 1));
 +
 +    }
 +  }
 +
 +  /**
 +   * Populate the dag with High-Level API.
 +   * @param dag
 +   * @param conf
 +   */
 +  @Override
 +  public void populateDAG(DAG dag, Configuration conf)
 +  {
 +    TweetsInput input = new TweetsInput();
 +    Collector collector = new Collector();
 +
 +    WindowOption windowOption = new WindowOption.GlobalWindow();
 +
 +    ApexStream<String> tags = StreamFactory.fromInput(input, input.output, name("tweetSampler"))
 +        .flatMap(new ExtractHashtags());
 +
 +    tags.window(windowOption, new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(1))
 +        .addCompositeStreams(ComputeTopCompletions.top(10, true)).print(name("console"))
 +        .endWith(collector, collector.input, name("collector"))
 +        .populateDag(dag);
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a017dfaa/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/StreamingWordExtract.java
----------------------------------------------------------------------
diff --cc examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/StreamingWordExtract.java
index 07f01d0,0000000..b5e491e
mode 100644,000000..100644
--- a/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/StreamingWordExtract.java
+++ b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/StreamingWordExtract.java
@@@ -1,162 -1,0 +1,162 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.apex.malhar.stream.sample.complete;
 +
 +import java.util.ArrayList;
 +import java.util.Arrays;
 +import java.util.List;
 +
++import org.apache.apex.malhar.lib.function.Function;
 +import org.apache.apex.malhar.stream.api.ApexStream;
 +import org.apache.apex.malhar.stream.api.Option;
- import org.apache.apex.malhar.stream.api.function.Function;
 +import org.apache.apex.malhar.stream.api.impl.StreamFactory;
 +import org.apache.hadoop.conf.Configuration;
 +
 +import com.datatorrent.api.DAG;
 +import com.datatorrent.api.StreamingApplication;
 +import com.datatorrent.api.annotation.ApplicationAnnotation;
 +import com.datatorrent.lib.db.jdbc.JdbcFieldInfo;
 +import com.datatorrent.lib.db.jdbc.JdbcPOJOInsertOutputOperator;
 +import com.datatorrent.lib.db.jdbc.JdbcTransactionalStore;
 +
 +import static java.sql.Types.VARCHAR;
 +
 +/**
 + * Beam StreamingWordExtract Example.
 + *
 + * @since 3.5.0
 + */
 +@ApplicationAnnotation(name = "StreamingWordExtract")
 +public class StreamingWordExtract implements StreamingApplication
 +{
 +  private static int wordCount = 0; // A counter to count number of words have been extracted.
 +  private static int entriesMapped = 0; // A counter to count number of entries have been mapped.
 +
 +  public int getWordCount()
 +  {
 +    return wordCount;
 +  }
 +
 +  public int getEntriesMapped()
 +  {
 +    return entriesMapped;
 +  }
 +
 +  /**
 +   * A MapFunction that tokenizes lines of text into individual words.
 +   */
 +  public static class ExtractWords implements Function.FlatMapFunction<String, String>
 +  {
 +    @Override
 +    public Iterable<String> f(String input)
 +    {
 +      List<String> result = new ArrayList<>(Arrays.asList(input.split("[^a-zA-Z0-9']+")));
 +      wordCount += result.size();
 +      return result;
 +    }
 +  }
 +
 +
 +  /**
 +   * A MapFunction that uppercases a word.
 +   */
 +  public static class Uppercase implements Function.MapFunction<String, String>
 +  {
 +    @Override
 +    public String f(String input)
 +    {
 +      return input.toUpperCase();
 +    }
 +  }
 +
 +
 +  /**
 +   * A filter function to filter out empty strings.
 +   */
 +  public static class EmptyStringFilter implements Function.FilterFunction<String>
 +  {
 +    @Override
 +    public boolean f(String input)
 +    {
 +      return !input.isEmpty();
 +    }
 +  }
 +
 +
 +  /**
 +   * A map function to map the result string to a pojo entry.
 +   */
 +  public static class PojoMapper implements Function.MapFunction<String, Object>
 +  {
 +
 +    @Override
 +    public Object f(String input)
 +    {
 +      PojoEvent pojo = new PojoEvent();
 +      pojo.setStringValue(input);
 +      entriesMapped++;
 +      return pojo;
 +    }
 +  }
 +
 +  /**
 +   * Add field infos to the {@link JdbcPOJOInsertOutputOperator}.
 +   */
 +  private static List<JdbcFieldInfo> addFieldInfos()
 +  {
 +    List<JdbcFieldInfo> fieldInfos = new ArrayList<>();
 +    fieldInfos.add(new JdbcFieldInfo("STRINGVALUE", "stringValue", JdbcFieldInfo.SupportType.STRING, VARCHAR));
 +    return fieldInfos;
 +  }
 +
 +  /**
 +   * Populate dag with High-Level API.
 +   * @param dag
 +   * @param conf
 +   */
 +  @Override
 +  public void populateDAG(DAG dag, Configuration conf)
 +  {
 +    JdbcPOJOInsertOutputOperator jdbcOutput = new JdbcPOJOInsertOutputOperator();
 +    jdbcOutput.setFieldInfos(addFieldInfos());
 +    JdbcTransactionalStore outputStore = new JdbcTransactionalStore();
 +    jdbcOutput.setStore(outputStore);
 +    jdbcOutput.setTablename("TestTable");
 +
 +    // Create a stream reading from a folder.
 +    ApexStream<String> stream = StreamFactory.fromFolder("./src/test/resources/data");
 +
 +    // Extract all the words from the input line of text.
 +    stream.flatMap(new ExtractWords())
 +
 +        // Filter out the empty strings.
 +        .filter(new EmptyStringFilter())
 +
 +        // Change every word to uppercase.
 +        .map(new Uppercase())
 +
 +        // Map the resulted word to a Pojo entry.
 +        .map(new PojoMapper())
 +
 +        // Output the entries to JdbcOutput and insert them into a table.
 +        .endWith(jdbcOutput, jdbcOutput.input, Option.Options.name("jdbcOutput"));
 +
 +    stream.populateDag(dag);
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a017dfaa/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessions.java
----------------------------------------------------------------------
diff --cc examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessions.java
index 68ec733,0000000..b2b9ae4
mode 100644,000000..100644
--- a/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessions.java
+++ b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessions.java
@@@ -1,347 -1,0 +1,347 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.apex.malhar.stream.sample.complete;
 +
 +import java.util.ArrayList;
 +import java.util.Comparator;
 +import java.util.List;
 +import java.util.Random;
 +import javax.annotation.Nullable;
 +
 +import org.joda.time.Duration;
 +
++import org.apache.apex.malhar.lib.function.Function;
 +import org.apache.apex.malhar.lib.window.TriggerOption;
 +import org.apache.apex.malhar.lib.window.Tuple;
 +import org.apache.apex.malhar.lib.window.Window;
 +import org.apache.apex.malhar.lib.window.WindowOption;
 +import org.apache.apex.malhar.lib.window.accumulation.TopN;
 +import org.apache.apex.malhar.stream.api.ApexStream;
 +import org.apache.apex.malhar.stream.api.CompositeStreamTransform;
 +import org.apache.apex.malhar.stream.api.WindowedStream;
- import org.apache.apex.malhar.stream.api.function.Function;
 +import org.apache.apex.malhar.stream.api.impl.StreamFactory;
 +import org.apache.hadoop.conf.Configuration;
 +
 +
 +import com.datatorrent.api.Context;
 +import com.datatorrent.api.DAG;
 +import com.datatorrent.api.DefaultInputPort;
 +import com.datatorrent.api.DefaultOutputPort;
 +import com.datatorrent.api.InputOperator;
 +import com.datatorrent.api.StreamingApplication;
 +import com.datatorrent.api.annotation.ApplicationAnnotation;
 +import com.datatorrent.common.util.BaseOperator;
 +import com.datatorrent.lib.util.KeyValPair;
 +
 +import static org.apache.apex.malhar.stream.api.Option.Options.name;
 +
 +/**
 + * Beam's TopWikipediaSessions Example.
 + *
 + * @since 3.5.0
 + */
 +@ApplicationAnnotation(name = "TopWikipediaSessions")
 +public class TopWikipediaSessions implements StreamingApplication
 +{
 +  /**
 +   * A generator that outputs a stream of combinations of some users and some randomly generated edit time.
 +   */
 +  public static class SessionGen extends BaseOperator implements InputOperator
 +  {
 +    private String[] names = new String[]{"user1", "user2", "user3", "user4"};
 +    public transient DefaultOutputPort<KeyValPair<String, Long>> output = new DefaultOutputPort<>();
 +
 +    private static final Duration RAND_RANGE = Duration.standardDays(365);
 +    private Long minTimestamp;
 +    private long sleepTime;
 +    private static int tupleCount = 0;
 +
 +    public static int getTupleCount()
 +    {
 +      return tupleCount;
 +    }
 +
 +    private String randomName(String[] names)
 +    {
 +      int index = new Random().nextInt(names.length);
 +      return names[index];
 +    }
 +
 +    @Override
 +    public void setup(Context.OperatorContext context)
 +    {
 +      super.setup(context);
 +      tupleCount = 0;
 +      minTimestamp = System.currentTimeMillis();
 +      sleepTime = context.getValue(Context.OperatorContext.SPIN_MILLIS);
 +    }
 +
 +    @Override
 +    public void emitTuples()
 +    {
 +      long randMillis = (long)(Math.random() * RAND_RANGE.getMillis());
 +      long randomTimestamp = minTimestamp + randMillis;
 +      output.emit(new KeyValPair<String, Long>(randomName(names), randomTimestamp));
 +      tupleCount++;
 +      try {
 +        Thread.sleep(sleepTime);
 +      } catch (InterruptedException e) {
 +        // Ignore it.
 +      }
 +    }
 +  }
 +
 +  public static class Collector extends BaseOperator
 +  {
 +    private final int resultSize = 5;
 +    private static List<List<TempWrapper>> result = new ArrayList<>();
 +
 +    public static List<List<TempWrapper>> getResult()
 +    {
 +      return result;
 +    }
 +
 +    public final transient DefaultInputPort<Tuple.WindowedTuple<List<TempWrapper>>> input = new DefaultInputPort<Tuple.WindowedTuple<List<TempWrapper>>>()
 +    {
 +      @Override
 +      public void process(Tuple.WindowedTuple<List<TempWrapper>> tuple)
 +      {
 +        if (result.size() == resultSize) {
 +          result.remove(0);
 +        }
 +        result.add(tuple.getValue());
 +      }
 +    };
 +  }
 +
 +
 +  /**
 +   * Convert the upstream (user, time) combination to a timestamped tuple of user.
 +   */
 +  static class ExtractUserAndTimestamp implements Function.MapFunction<KeyValPair<String, Long>, Tuple.TimestampedTuple<String>>
 +  {
 +    @Override
 +    public Tuple.TimestampedTuple<String> f(KeyValPair<String, Long> input)
 +    {
 +      long timestamp = input.getValue();
 +      String userName = input.getKey();
 +
 +      // Sets the implicit timestamp field to be used in windowing.
 +      return new Tuple.TimestampedTuple<>(timestamp, userName);
 +
 +    }
 +  }
 +
 +  /**
 +   * Computes the number of edits in each user session.  A session is defined as
 +   * a string of edits where each is separated from the next by less than an hour.
 +   */
 +  static class ComputeSessions
 +      extends CompositeStreamTransform<ApexStream<Tuple.TimestampedTuple<String>>, WindowedStream<Tuple.WindowedTuple<KeyValPair<String, Long>>>>
 +  {
 +    @Override
 +    public WindowedStream<Tuple.WindowedTuple<KeyValPair<String, Long>>> compose(ApexStream<Tuple.TimestampedTuple<String>> inputStream)
 +    {
 +      return inputStream
 +
 +        // Chuck the stream into session windows.
 +        .window(new WindowOption.SessionWindows(Duration.standardHours(1)), new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(1))
 +
 +        // Count the number of edits for a user within one session.
 +        .countByKey(new Function.ToKeyValue<Tuple.TimestampedTuple<String>, String, Long>()
 +        {
 +          @Override
 +          public Tuple.TimestampedTuple<KeyValPair<String, Long>> f(Tuple.TimestampedTuple<String> input)
 +          {
 +            return new Tuple.TimestampedTuple<KeyValPair<String, Long>>(input.getTimestamp(), new KeyValPair<String, Long>(input.getValue(), 1L));
 +          }
 +        }, name("ComputeSessions"));
 +    }
 +  }
 +
 +  /**
 +   * A comparator class used for comparing two TempWrapper objects.
 +   */
 +  public static class Comp implements Comparator<TempWrapper>
 +  {
 +    @Override
 +    public int compare(TempWrapper o1, TempWrapper o2)
 +    {
 +      return Long.compare(o1.getValue().getValue(), o2.getValue().getValue());
 +    }
 +  }
 +
 +  /**
 +   * A function to extract timestamp from a TempWrapper object.
 +   */
 +  // TODO: Need to revisit and change back to using TimestampedTuple.
 +  public static class TimestampExtractor implements com.google.common.base.Function<TempWrapper, Long>
 +  {
 +    @Override
 +    public Long apply(@Nullable TempWrapper input)
 +    {
 +      return input.getTimestamp();
 +    }
 +  }
 +
 +  /**
 +   * A temporary wrapper to wrap a KeyValPair and a timestamp together to represent a timestamped tuple, the reason
 +   * for this is that we cannot resolve a type conflict when calling accumulate(). After the issue resolved, we can
 +   * remove this class.
 +   */
 +  public static class TempWrapper
 +  {
 +    private KeyValPair<String, Long> value;
 +    private Long timestamp;
 +
 +    public TempWrapper()
 +    {
 +
 +    }
 +
 +    public TempWrapper(KeyValPair<String, Long> value, Long timestamp)
 +    {
 +      this.value = value;
 +      this.timestamp = timestamp;
 +    }
 +
 +    @Override
 +    public String toString()
 +    {
 +      return this.value + "  -  " + this.timestamp;
 +    }
 +
 +    public Long getTimestamp()
 +    {
 +      return timestamp;
 +    }
 +
 +    public void setTimestamp(Long timestamp)
 +    {
 +      this.timestamp = timestamp;
 +    }
 +
 +    public KeyValPair<String, Long> getValue()
 +    {
 +      return value;
 +    }
 +
 +    public void setValue(KeyValPair<String, Long> value)
 +    {
 +      this.value = value;
 +    }
 +  }
 +
 +  /**
 +   * Computes the longest session ending in each month, in this case we use 30 days to represent every month.
 +   */
 +  private static class TopPerMonth
 +      extends CompositeStreamTransform<ApexStream<Tuple.WindowedTuple<KeyValPair<String, Long>>>, WindowedStream<Tuple.WindowedTuple<List<TempWrapper>>>>
 +  {
 +
 +    @Override
 +    public WindowedStream<Tuple.WindowedTuple<List<TempWrapper>>> compose(ApexStream<Tuple.WindowedTuple<KeyValPair<String, Long>>> inputStream)
 +    {
 +      TopN<TempWrapper> topN = new TopN<>();
 +      topN.setN(10);
 +      topN.setComparator(new Comp());
 +
 +      return inputStream
 +
 +        // Map the input WindowedTuple to a TempWrapper object.
 +        .map(new Function.MapFunction<Tuple.WindowedTuple<KeyValPair<String, Long>>, TempWrapper>()
 +        {
 +          @Override
 +          public TempWrapper f(Tuple.WindowedTuple<KeyValPair<String, Long>> input)
 +          {
 +            Window window = input.getWindows().iterator().next();
 +            return new TempWrapper(input.getValue(), window.getBeginTimestamp());
 +          }
 +        }, name("TempWrapper"))
 +
 +        // Apply window and trigger option again, this time chuck the stream into fixed time windows.
 +        .window(new WindowOption.TimeWindows(Duration.standardDays(30)), new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(Duration.standardSeconds(5)))
 +
 +        // Compute the top 10 user-sessions with most number of edits.
 +        .accumulate(topN, name("TopN")).with("timestampExtractor", new TimestampExtractor());
 +    }
 +  }
 +
 +  /**
 +   * A map function that combine the user and his/her edit session together to a string and use that string as a key
 +   * with number of edits in that session as value to create a new key value pair to send to downstream.
 +   */
 +  static class SessionsToStringsDoFn implements Function.MapFunction<Tuple.WindowedTuple<KeyValPair<String, Long>>, Tuple.WindowedTuple<KeyValPair<String, Long>>>
 +  {
 +    @Override
 +    public Tuple.WindowedTuple<KeyValPair<String, Long>> f(Tuple.WindowedTuple<KeyValPair<String, Long>> input)
 +    {
 +      Window window = input.getWindows().iterator().next();
 +      return new Tuple.WindowedTuple<KeyValPair<String, Long>>(window, new KeyValPair<String, Long>(
 +        input.getValue().getKey()  + " : " + window.getBeginTimestamp() + " : " + window.getDurationMillis(),
 +        input.getValue().getValue()));
 +    }
 +  }
 +
 +  /**
 +   * A flatmap function that turns the result into readable format.
 +   */
 +  static class FormatOutputDoFn implements Function.FlatMapFunction<Tuple.WindowedTuple<List<TempWrapper>>, String>
 +  {
 +    @Override
 +    public Iterable<String> f(Tuple.WindowedTuple<List<TempWrapper>> input)
 +    {
 +      ArrayList<String> result = new ArrayList<>();
 +      for (TempWrapper item : input.getValue()) {
 +        String session = item.getValue().getKey();
 +        long count = item.getValue().getValue();
 +        Window window = input.getWindows().iterator().next();
 +        result.add(session + " + " + count + " : " + window.getBeginTimestamp());
 +      }
 +      return result;
 +    }
 +  }
 +
 +  /**
 +   * A composite transform that compute the top wikipedia sessions.
 +   */
 +  public static class ComputeTopSessions extends CompositeStreamTransform<ApexStream<KeyValPair<String, Long>>, WindowedStream<Tuple.WindowedTuple<List<TempWrapper>>>>
 +  {
 +    @Override
 +    public WindowedStream<Tuple.WindowedTuple<List<TempWrapper>>> compose(ApexStream<KeyValPair<String, Long>> inputStream)
 +    {
 +      return inputStream
 +        .map(new ExtractUserAndTimestamp(), name("ExtractUserAndTimestamp"))
 +        .addCompositeStreams(new ComputeSessions())
 +        .map(new SessionsToStringsDoFn(), name("SessionsToStringsDoFn"))
 +        .addCompositeStreams(new TopPerMonth());
 +    }
 +  }
 +
 +  @Override
 +  public void populateDAG(DAG dag, Configuration conf)
 +  {
 +    SessionGen sg = new SessionGen();
 +    Collector collector = new Collector();
 +    StreamFactory.fromInput(sg, sg.output, name("sessionGen"))
 +      .addCompositeStreams(new ComputeTopSessions())
 +      .print(name("console"))
 +      .endWith(collector, collector.input, name("collector")).populateDag(dag);
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a017dfaa/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutes.java
----------------------------------------------------------------------
diff --cc examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutes.java
index e6a53d6,0000000..431263a
mode 100644,000000..100644
--- a/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutes.java
+++ b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutes.java
@@@ -1,523 -1,0 +1,523 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.apex.malhar.stream.sample.complete;
 +
 +import java.util.ArrayList;
 +import java.util.Collections;
 +import java.util.HashMap;
 +import java.util.Hashtable;
 +import java.util.List;
 +import java.util.Map;
 +
 +import javax.annotation.Nullable;
 +
 +import org.joda.time.Duration;
 +
++import org.apache.apex.malhar.lib.function.Function;
 +import org.apache.apex.malhar.lib.window.TriggerOption;
 +import org.apache.apex.malhar.lib.window.Tuple;
 +import org.apache.apex.malhar.lib.window.WindowOption;
 +import org.apache.apex.malhar.lib.window.accumulation.Group;
 +import org.apache.apex.malhar.stream.api.ApexStream;
 +import org.apache.apex.malhar.stream.api.CompositeStreamTransform;
 +import org.apache.apex.malhar.stream.api.WindowedStream;
- import org.apache.apex.malhar.stream.api.function.Function;
 +import org.apache.apex.malhar.stream.api.impl.StreamFactory;
 +import org.apache.hadoop.conf.Configuration;
 +
 +import com.google.common.collect.Lists;
 +
 +import com.datatorrent.api.Context;
 +import com.datatorrent.api.DAG;
 +import com.datatorrent.api.DefaultInputPort;
 +import com.datatorrent.api.DefaultOutputPort;
 +import com.datatorrent.api.InputOperator;
 +import com.datatorrent.api.StreamingApplication;
 +import com.datatorrent.api.annotation.ApplicationAnnotation;
 +import com.datatorrent.common.util.BaseOperator;
 +import com.datatorrent.lib.util.KeyValPair;
 +
 +import static org.apache.apex.malhar.stream.api.Option.Options.name;
 +
 +/**
 + * Beam's TrafficRoutes example.
 + *
 + * @since 3.5.0
 + */
 +@ApplicationAnnotation(name = "TrafficRoutes")
 +public class TrafficRoutes implements StreamingApplication
 +{
 +  static Map<String, String> sdStations = buildStationInfo();
 +  static final int WINDOW_DURATION = 3;  // Default sliding window duration in minutes
 +  static final int WINDOW_SLIDE_EVERY = 1;  // Default window 'slide every' setting in minutes
 +
 +  /**
 +   * This class holds information about a station reading's average speed.
 +   */
 +  public static class StationSpeed implements Comparable<StationSpeed>
 +  {
 +    @Nullable
 +    String stationId;
 +    @Nullable
 +    Double avgSpeed;
 +    @Nullable
 +    Long timestamp;
 +
 +    public StationSpeed() {}
 +
 +    public StationSpeed(String stationId, Double avgSpeed, Long timestamp)
 +    {
 +      this.stationId = stationId;
 +      this.avgSpeed = avgSpeed;
 +      this.timestamp = timestamp;
 +    }
 +
 +    public void setAvgSpeed(@Nullable Double avgSpeed)
 +    {
 +      this.avgSpeed = avgSpeed;
 +    }
 +
 +    public void setStationId(@Nullable String stationId)
 +    {
 +      this.stationId = stationId;
 +    }
 +
 +    public void setTimestamp(@Nullable Long timestamp)
 +    {
 +      this.timestamp = timestamp;
 +    }
 +
 +    @Nullable
 +    public Long getTimestamp()
 +    {
 +      return timestamp;
 +    }
 +
 +    public String getStationId()
 +    {
 +      return this.stationId;
 +    }
 +
 +    public Double getAvgSpeed()
 +    {
 +      return this.avgSpeed;
 +    }
 +
 +    @Override
 +    public int compareTo(StationSpeed other)
 +    {
 +      return Long.compare(this.timestamp, other.timestamp);
 +    }
 +  }
 +
 +  /**
 +   * This class holds information about a route's speed/slowdown.
 +   */
 +  static class RouteInfo
 +  {
 +    @Nullable
 +    String route;
 +    @Nullable
 +    Double avgSpeed;
 +    @Nullable
 +    Boolean slowdownEvent;
 +
 +    public RouteInfo()
 +    {
 +
 +    }
 +
 +    public RouteInfo(String route, Double avgSpeed, Boolean slowdownEvent)
 +    {
 +      this.route = route;
 +      this.avgSpeed = avgSpeed;
 +      this.slowdownEvent = slowdownEvent;
 +    }
 +
 +    public String getRoute()
 +    {
 +      return this.route;
 +    }
 +
 +    public Double getAvgSpeed()
 +    {
 +      return this.avgSpeed;
 +    }
 +
 +    public Boolean getSlowdownEvent()
 +    {
 +      return this.slowdownEvent;
 +    }
 +  }
 +
 +  /**
 +   * Extract the timestamp field from the input string, and wrap the input string in a {@link Tuple.TimestampedTuple}
 +   * with the extracted timestamp.
 +   */
 +  static class ExtractTimestamps implements Function.MapFunction<String, Tuple.TimestampedTuple<String>>
 +  {
 +
 +    @Override
 +    public Tuple.TimestampedTuple<String> f(String input)
 +    {
 +      String[] items = input.split(",");
 +      String timestamp = tryParseTimestamp(items);
 +
 +      return new Tuple.TimestampedTuple<>(Long.parseLong(timestamp), input);
 +    }
 +  }
 +
 +  /**
 +   * Filter out readings for the stations along predefined 'routes', and output
 +   * (station, speed info) keyed on route.
 +   */
 +  static class ExtractStationSpeedFn implements Function.FlatMapFunction<Tuple.TimestampedTuple<String>, KeyValPair<String, StationSpeed>>
 +  {
 +
 +    @Override
 +    public Iterable<KeyValPair<String, StationSpeed>> f(Tuple.TimestampedTuple<String> input)
 +    {
 +
 +      ArrayList<KeyValPair<String, StationSpeed>> result = new ArrayList<>();
 +      String[] items = input.getValue().split(",");
 +      String stationType = tryParseStationType(items);
 +      // For this analysis, use only 'main line' station types
 +      if (stationType != null && stationType.equals("ML")) {
 +        Double avgSpeed = tryParseAvgSpeed(items);
 +        String stationId = tryParseStationId(items);
 +        // For this simple example, filter out everything but some hardwired routes.
 +        if (avgSpeed != null && stationId != null && sdStations.containsKey(stationId)) {
 +          StationSpeed stationSpeed =
 +              new StationSpeed(stationId, avgSpeed, input.getTimestamp());
 +          // The tuple key is the 'route' name stored in the 'sdStations' hash.
 +          KeyValPair<String, StationSpeed> outputValue = new KeyValPair<>(sdStations.get(stationId), stationSpeed);
 +          result.add(outputValue);
 +        }
 +      }
 +      return result;
 +    }
 +  }
 +
 +  /**
 +   * For a given route, track average speed for the window. Calculate whether
 +   * traffic is currently slowing down, via a predefined threshold. If a supermajority of
 +   * speeds in this sliding window are less than the previous reading we call this a 'slowdown'.
 +   * Note: these calculations are for example purposes only, and are unrealistic and oversimplified.
 +   */
 +  static class GatherStats
 +      implements Function.FlatMapFunction<Tuple.WindowedTuple<KeyValPair<String, List<StationSpeed>>>, Tuple.TimestampedTuple<KeyValPair<String, RouteInfo>>>
 +  {
 +    @Override
 +    public Iterable<Tuple.TimestampedTuple<KeyValPair<String, RouteInfo>>> f(Tuple.WindowedTuple<KeyValPair<String, List<StationSpeed>>> input)
 +    {
 +      ArrayList<Tuple.TimestampedTuple<KeyValPair<String, RouteInfo>>> result = new ArrayList<>();
 +      String route = input.getValue().getKey();
 +      double speedSum = 0.0;
 +      int speedCount = 0;
 +      int speedups = 0;
 +      int slowdowns = 0;
 +      List<StationSpeed> infoList = Lists.newArrayList(input.getValue().getValue());
 +      // StationSpeeds sort by embedded timestamp.
 +      Collections.sort(infoList);
 +      Map<String, Double> prevSpeeds = new HashMap<>();
 +      // For all stations in the route, sum (non-null) speeds. Keep a count of the non-null speeds.
 +      for (StationSpeed item : infoList) {
 +        Double speed = item.getAvgSpeed();
 +        if (speed != null) {
 +          speedSum += speed;
 +          speedCount++;
 +          Double lastSpeed = prevSpeeds.get(item.getStationId());
 +          if (lastSpeed != null) {
 +            if (lastSpeed < speed) {
 +              speedups += 1;
 +            } else {
 +              slowdowns += 1;
 +            }
 +          }
 +          prevSpeeds.put(item.getStationId(), speed);
 +        }
 +      }
 +      if (speedCount == 0) {
 +        // No average to compute.
 +        return result;
 +      }
 +      double speedAvg = speedSum / speedCount;
 +      boolean slowdownEvent = slowdowns >= 2 * speedups;
 +      RouteInfo routeInfo = new RouteInfo(route, speedAvg, slowdownEvent);
 +      result.add(new Tuple.TimestampedTuple<KeyValPair<String, RouteInfo>>(input.getTimestamp(), new KeyValPair<String, RouteInfo>(route, routeInfo)));
 +      return result;
 +    }
 +  }
 +
 +  /**
 +   * Output Pojo class for outputting result to JDBC.
 +   */
 +  static class OutputPojo
 +  {
 +    private Double avgSpeed;
 +    private Boolean slowdownEvent;
 +    private String key;
 +    private Long timestamp;
 +
 +    public OutputPojo()
 +    {
 +    }
 +
 +    public OutputPojo(Double avgSpeed, Boolean slowdownEvent, String key, Long timestamp)
 +    {
 +      this.avgSpeed = avgSpeed;
 +      this.slowdownEvent = slowdownEvent;
 +      this.key = key;
 +      this.timestamp = timestamp;
 +    }
 +
 +    @Override
 +    public String toString()
 +    {
 +      return key + " + " + avgSpeed + " + " + slowdownEvent + " + " + timestamp;
 +    }
 +
 +    public void setTimestamp(Long timestamp)
 +    {
 +      this.timestamp = timestamp;
 +    }
 +
 +    public Long getTimestamp()
 +    {
 +      return timestamp;
 +    }
 +
 +    public void setAvgSpeed(Double avgSpeed)
 +    {
 +      this.avgSpeed = avgSpeed;
 +    }
 +
 +    public Double getAvgSpeed()
 +    {
 +      return avgSpeed;
 +    }
 +
 +    public void setKey(String key)
 +    {
 +      this.key = key;
 +    }
 +
 +    public String getKey()
 +    {
 +      return key;
 +    }
 +
 +    public void setSlowdownEvent(Boolean slowdownEvent)
 +    {
 +      this.slowdownEvent = slowdownEvent;
 +    }
 +
 +    public Boolean getSlowdownEvent()
 +    {
 +      return slowdownEvent;
 +    }
 +
 +  }
 +
 +  public static class Collector extends BaseOperator
 +  {
 +    private static Map<KeyValPair<Long, String>, KeyValPair<Double, Boolean>> result = new HashMap<>();
 +
 +    public static Map<KeyValPair<Long, String>, KeyValPair<Double, Boolean>> getResult()
 +    {
 +      return result;
 +    }
 +
 +    public final transient DefaultInputPort<OutputPojo> input = new DefaultInputPort<OutputPojo>()
 +    {
 +      @Override
 +      public void process(OutputPojo tuple)
 +      {
 +        result.put(new KeyValPair<Long, String>(tuple.getTimestamp(), tuple.getKey()), new KeyValPair<Double, Boolean>(tuple.getAvgSpeed(), tuple.getSlowdownEvent()));
 +      }
 +    };
 +  }
 +
 +  /**
 +   * Format the results of the slowdown calculations to a OutputPojo.
 +   */
 +  static class FormatStatsFn implements Function.MapFunction<Tuple.TimestampedTuple<KeyValPair<String, RouteInfo>>, OutputPojo>
 +  {
 +    @Override
 +    public OutputPojo f(Tuple.TimestampedTuple<KeyValPair<String, RouteInfo>> input)
 +    {
 +      RouteInfo routeInfo = input.getValue().getValue();
 +      OutputPojo row = new OutputPojo(routeInfo.getAvgSpeed(), routeInfo.getSlowdownEvent(), input.getValue().getKey(), input.getTimestamp());
 +      return row;
 +    }
 +  }
 +
 +
 +  /**
 +   * This composite transformation extracts speed info from traffic station readings.
 +   * It groups the readings by 'route' and analyzes traffic slowdown for that route.
 +   * Lastly, it formats the results for JDBC.
 +   */
 +  static class TrackSpeed extends
 +      CompositeStreamTransform<WindowedStream<KeyValPair<String, StationSpeed>>, WindowedStream<OutputPojo>>
 +  {
 +    @Override
 +    public WindowedStream<OutputPojo> compose(WindowedStream<KeyValPair<String, StationSpeed>> inputStream)
 +    {
 +      // Apply a GroupByKey transform to collect a list of all station
 +      // readings for a given route.
 +      WindowedStream<Tuple.WindowedTuple<KeyValPair<String, List<StationSpeed>>>> timeGroup =
 +          inputStream
 +          .accumulateByKey(new Group<StationSpeed>(), new Function.ToKeyValue<KeyValPair<String, StationSpeed>, String, StationSpeed>()
 +          {
 +            @Override
 +            public Tuple<KeyValPair<String, StationSpeed>> f(KeyValPair<String, StationSpeed> input)
 +            {
 +              return new Tuple.TimestampedTuple<>(input.getValue().getTimestamp(), input);
 +            }
 +          }, name("GroupByKey"));
 +
 +      // Analyze 'slowdown' over the route readings.
 +      WindowedStream<Tuple.TimestampedTuple<KeyValPair<String, RouteInfo>>> stats = timeGroup
 +          .flatMap(new GatherStats(), name("GatherStats"));
 +
 +      // Format the results for writing to JDBC table.
 +      WindowedStream<OutputPojo> results = stats.map(new FormatStatsFn(), name("FormatStatsFn"));
 +
 +      return results;
 +    }
 +  }
 +
 +
 +  private static Double tryParseAvgSpeed(String[] inputItems)
 +  {
 +    try {
 +      return Double.parseDouble(tryParseString(inputItems, 3));
 +    } catch (NumberFormatException e) {
 +      return null;
 +    } catch (NullPointerException e) {
 +      return null;
 +    }
 +  }
 +
 +  private static String tryParseStationType(String[] inputItems)
 +  {
 +    return tryParseString(inputItems, 2);
 +  }
 +
 +  private static String tryParseStationId(String[] inputItems)
 +  {
 +    return tryParseString(inputItems, 1);
 +  }
 +
 +  private static String tryParseTimestamp(String[] inputItems)
 +  {
 +    return tryParseString(inputItems, 0);
 +  }
 +
 +  private static String tryParseString(String[] inputItems, int index)
 +  {
 +    return inputItems.length >= index ? inputItems[index] : null;
 +  }
 +
 +  /**
 +   * Define some small hard-wired San Diego 'routes' to track based on sensor station ID.
 +   */
 +  private static Map<String, String> buildStationInfo()
 +  {
 +    Map<String, String> stations = new Hashtable<String, String>();
 +    stations.put("1108413", "SDRoute1"); // from freeway 805 S
 +    stations.put("1108699", "SDRoute2"); // from freeway 78 E
 +    stations.put("1108702", "SDRoute2");
 +    return stations;
 +  }
 +
 +  /**
 +   * A dummy generator to generate some traffic information.
 +   */
 +  public static class InfoGen extends BaseOperator implements InputOperator
 +  {
 +    public transient DefaultOutputPort<String> output = new DefaultOutputPort<>();
 +
 +    private String[] stationTypes = new String[]{"ML", "BL", "GL"};
 +    private int[] stationIDs = new int[]{1108413, 1108699, 1108702};
 +    private double ave = 55.0;
 +    private long timestamp;
 +    private static final Duration RAND_RANGE = Duration.standardMinutes(10);
 +    private static int tupleCount = 0;
 +
 +    public static int getTupleCount()
 +    {
 +      return tupleCount;
 +    }
 +
 +    @Override
 +    public void setup(Context.OperatorContext context)
 +    {
 +      tupleCount = 0;
 +      timestamp = System.currentTimeMillis();
 +    }
 +
 +    @Override
 +    public void emitTuples()
 +    {
 +      for (String stationType : stationTypes) {
 +        for (int stationID : stationIDs) {
 +          double speed = Math.random() * 20 + ave;
 +          long time = (long)(Math.random() * RAND_RANGE.getMillis()) + timestamp;
 +          try {
 +            output.emit(time + "," + stationID + "," + stationType + "," + speed);
 +            tupleCount++;
 +
 +            Thread.sleep(50);
 +          } catch (Exception e) {
 +            // Ignore it
 +          }
 +        }
 +      }
 +    }
 +  }
 +
 +  @Override
 +  public void populateDAG(DAG dag, Configuration conf)
 +  {
 +    InfoGen infoGen = new InfoGen();
 +    Collector collector = new Collector();
 +
 +    // Create a stream from the input operator.
 +    ApexStream<Tuple.TimestampedTuple<String>> stream = StreamFactory.fromInput(infoGen, infoGen.output, name("infoGen"))
 +
 +        // Extract the timestamp from the input and wrap it into a TimestampedTuple.
 +        .map(new ExtractTimestamps(), name("ExtractTimestamps"));
 +
 +    stream
 +        // Extract the average speed of a station.
 +        .flatMap(new ExtractStationSpeedFn(), name("ExtractStationSpeedFn"))
 +
 +        // Apply window and trigger option.
 +        .window(new WindowOption.SlidingTimeWindows(Duration.standardMinutes(WINDOW_DURATION), Duration.standardMinutes(WINDOW_SLIDE_EVERY)), new TriggerOption().withEarlyFiringsAtEvery(Duration.millis(5000)).accumulatingFiredPanes())
 +
 +        // Apply TrackSpeed composite transformation to compute the route information.
 +        .addCompositeStreams(new TrackSpeed())
 +
 +        // print the result to console.
 +        .print(name("console"))
 +        .endWith(collector, collector.input, name("Collector"))
 +        .populateDag(dag);
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a017dfaa/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TwitterAutoComplete.java
----------------------------------------------------------------------
diff --cc examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TwitterAutoComplete.java
index 6332c66,0000000..cf52cb6
mode 100644,000000..100644
--- a/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TwitterAutoComplete.java
+++ b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TwitterAutoComplete.java
@@@ -1,254 -1,0 +1,254 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.apex.malhar.stream.sample.complete;
 +
 +import java.nio.charset.Charset;
 +import java.nio.charset.CharsetEncoder;
 +
 +import java.util.LinkedList;
 +import java.util.List;
 +import java.util.regex.Matcher;
 +import java.util.regex.Pattern;
 +
 +import org.joda.time.Duration;
 +
++import org.apache.apex.malhar.lib.function.Function;
 +import org.apache.apex.malhar.lib.window.TriggerOption;
 +import org.apache.apex.malhar.lib.window.Tuple;
 +import org.apache.apex.malhar.lib.window.Window;
 +import org.apache.apex.malhar.lib.window.WindowOption;
 +
 +import org.apache.apex.malhar.stream.api.ApexStream;
 +import org.apache.apex.malhar.stream.api.CompositeStreamTransform;
 +import org.apache.apex.malhar.stream.api.WindowedStream;
- import org.apache.apex.malhar.stream.api.function.Function;
 +import org.apache.apex.malhar.stream.api.impl.StreamFactory;
 +
 +import org.apache.hadoop.conf.Configuration;
 +
 +import com.datatorrent.api.DAG;
 +import com.datatorrent.api.StreamingApplication;
 +import com.datatorrent.api.annotation.ApplicationAnnotation;
 +import com.datatorrent.contrib.twitter.TwitterSampleInput;
 +import com.datatorrent.lib.util.KeyValPair;
 +
 +import static org.apache.apex.malhar.stream.api.Option.Options.name;
 +
 +/**
 + * Auto Complete Hashtag Example with real time twitter input. In order to run this application, you need to create an app
 + * at https://apps.twitter.com, then generate your consumer and access keys and tokens, and enter those information
 + * accordingly in /resources/META-INF/properties.xml.
 + *
 + * The authentication requires following 4 information.
 + * Your application consumer key,
 + * Your application consumer secret,
 + * Your twitter access token, and
 + * Your twitter access token secret.
 + *
 + * @since 3.5.0
 + */
 +@ApplicationAnnotation(name = "TwitterAutoComplete")
 +public class TwitterAutoComplete implements StreamingApplication
 +{
 +  /**
 +   * Check whether every character in a string is ASCII encoding.
 +   */
 +  public static class StringUtils
 +  {
 +    static CharsetEncoder encoder = Charset.forName("US-ASCII").newEncoder();
 +
 +    public static boolean isAscii(String v)
 +    {
 +      return encoder.canEncode(v);
 +    }
 +  }
 +
 +  /**
 +   * FlapMap Function to extract all hashtags from a text form tweet.
 +   */
 +  private static class ExtractHashtags implements Function.FlatMapFunction<String, String>
 +  {
 +
 +    @Override
 +    public Iterable<String> f(String input)
 +    {
 +      List<String> result = new LinkedList<>();
 +      Matcher m = Pattern.compile("#\\S+").matcher(input);
 +      while (m.find()) {
 +        result.add(m.group().substring(1));
 +      }
 +      return result;
 +    }
 +  }
 +
 +  /**
 +   * Lower latency, but more expensive.
 +   */
 +  private static class ComputeTopFlat
 +      extends CompositeStreamTransform<WindowedStream<CompletionCandidate>, WindowedStream<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>>>
 +  {
 +    private final int candidatesPerPrefix;
 +    private final int minPrefix;
 +
 +    public ComputeTopFlat(int candidatesPerPrefix, int minPrefix)
 +    {
 +      this.candidatesPerPrefix = candidatesPerPrefix;
 +      this.minPrefix = minPrefix;
 +    }
 +
 +    @Override
 +    public WindowedStream<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>> compose(
 +        WindowedStream<CompletionCandidate> input)
 +    {
 +      TopNByKey topNByKey = new TopNByKey();
 +      topNByKey.setN(candidatesPerPrefix);
 +      return input
 +        .<KeyValPair<String, CompletionCandidate>, WindowedStream<KeyValPair<String, CompletionCandidate>>>flatMap(new AllPrefixes(minPrefix, 3), name("Extract Prefixes"))
 +        .accumulateByKey(topNByKey, new Function.ToKeyValue<KeyValPair<String, CompletionCandidate>, String, CompletionCandidate>()
 +        {
 +          @Override
 +          public Tuple<KeyValPair<String, CompletionCandidate>> f(KeyValPair<String, CompletionCandidate> tuple)
 +          {
 +            // TODO: Should be removed after Auto-wrapping is supported.
 +            return new Tuple.WindowedTuple<>(Window.GlobalWindow.INSTANCE, tuple);
 +          }
 +        }, name("TopNByKey"));
 +    }
 +  }
 +
 +  /**
 +   * FlapMap Function to extract all prefixes of the hashtag in the input CompletionCandidate, and output
 +   * KeyValPairs of the prefix and the CompletionCandidate
 +   */
 +  private static class AllPrefixes implements Function.FlatMapFunction<CompletionCandidate, KeyValPair<String, CompletionCandidate>>
 +  {
 +    private final int minPrefix;
 +    private final int maxPrefix;
 +
 +    public AllPrefixes()
 +    {
 +      this(0, Integer.MAX_VALUE);
 +    }
 +
 +    public AllPrefixes(int minPrefix)
 +    {
 +      this(minPrefix, Integer.MAX_VALUE);
 +    }
 +
 +    public AllPrefixes(int minPrefix, int maxPrefix)
 +    {
 +      this.minPrefix = minPrefix;
 +      this.maxPrefix = maxPrefix;
 +    }
 +
 +    @Override
 +    public Iterable<KeyValPair<String, CompletionCandidate>> f(CompletionCandidate input)
 +    {
 +      List<KeyValPair<String, CompletionCandidate>> result = new LinkedList<>();
 +      String word = input.getValue();
 +      for (int i = minPrefix; i <= Math.min(word.length(), maxPrefix); i++) {
 +        result.add(new KeyValPair<>(input.getValue().substring(0, i).toLowerCase(), input));
 +      }
 +      return result;
 +    }
 +  }
 +
 +  /**
 +   * A Composite stream transform that takes as input a list of tokens and returns
 +   * the most common tokens per prefix.
 +   */
 +  public static class ComputeTopCompletions
 +      extends CompositeStreamTransform<WindowedStream<String>, WindowedStream<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>>>
 +  {
 +    private final int candidatesPerPrefix;
 +    private final boolean recursive;
 +
 +    protected ComputeTopCompletions(int candidatesPerPrefix, boolean recursive)
 +    {
 +      this.candidatesPerPrefix = candidatesPerPrefix;
 +      this.recursive = recursive;
 +    }
 +
 +    public static ComputeTopCompletions top(int candidatesPerPrefix, boolean recursive)
 +    {
 +      return new ComputeTopCompletions(candidatesPerPrefix, recursive);
 +    }
 +
 +    @Override
 +    @SuppressWarnings("unchecked")
 +    public WindowedStream<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>> compose(WindowedStream<String> inputStream)
 +    {
 +
 +      ApexStream<CompletionCandidate> candidates = inputStream
 +          .countByKey(new Function.ToKeyValue<String, String, Long>()
 +          {
 +            @Override
 +            public Tuple<KeyValPair<String, Long>> f(String input)
 +            {
 +              return new Tuple.PlainTuple<>(new KeyValPair<>(input, 1L));
 +            }
 +          }, name("Hashtag Count"))
 +          .map(new Function.MapFunction<Tuple.WindowedTuple<KeyValPair<String,Long>>, CompletionCandidate>()
 +          {
 +            @Override
 +            public CompletionCandidate f(Tuple.WindowedTuple<KeyValPair<String, Long>> input)
 +            {
 +              return new CompletionCandidate(input.getValue().getKey(), input.getValue().getValue());
 +            }
 +          }, name("KeyValPair to CompletionCandidate"));
 +
 +      return candidates.addCompositeStreams(new ComputeTopFlat(candidatesPerPrefix, 1));
 +
 +    }
 +  }
 +
 +  /**
 +   * FilterFunction to filter out tweets with non-acsii characters.
 +   */
 +  static class ASCIIFilter implements Function.FilterFunction<String>
 +  {
 +    @Override
 +    public boolean f(String input)
 +    {
 +      return StringUtils.isAscii(input);
 +    }
 +  }
 +
 +  /**
 +   * Populate the dag with High-Level API.
 +   * @param dag
 +   * @param conf
 +   */
 +  @Override
 +  public void populateDAG(DAG dag, Configuration conf)
 +  {
 +    TwitterSampleInput input = new TwitterSampleInput();
 +
 +    WindowOption windowOption = new WindowOption.GlobalWindow();
 +
 +    ApexStream<String> tags = StreamFactory.fromInput(input, input.text, name("tweetSampler"))
 +        .filter(new ASCIIFilter(), name("ACSII Filter"))
 +        .flatMap(new ExtractHashtags(), name("Extract Hashtags"));
 +
 +    ApexStream<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>> s =
 +        tags.window(windowOption, new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(Duration.standardSeconds(10)))
 +        .addCompositeStreams(ComputeTopCompletions.top(10, true)).print();
 +
 +    s.populateDag(dag);
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a017dfaa/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamples.java
----------------------------------------------------------------------
diff --cc examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamples.java
index bfdb268,0000000..937476e
mode 100644,000000..100644
--- a/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamples.java
+++ b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamples.java
@@@ -1,285 -1,0 +1,285 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.apex.malhar.stream.sample.cookbook;
 +
 +import java.util.ArrayList;
 +import java.util.List;
 +
++import org.apache.apex.malhar.lib.function.Function;
 +import org.apache.apex.malhar.lib.window.TriggerOption;
 +import org.apache.apex.malhar.lib.window.Tuple;
 +import org.apache.apex.malhar.lib.window.WindowOption;
 +import org.apache.apex.malhar.lib.window.accumulation.ReduceFn;
 +import org.apache.apex.malhar.stream.api.ApexStream;
 +import org.apache.apex.malhar.stream.api.CompositeStreamTransform;
 +import org.apache.apex.malhar.stream.api.WindowedStream;
- import org.apache.apex.malhar.stream.api.function.Function;
 +import org.apache.apex.malhar.stream.api.impl.StreamFactory;
 +
 +import org.apache.hadoop.conf.Configuration;
 +
 +import com.datatorrent.api.Context;
 +import com.datatorrent.api.DAG;
 +import com.datatorrent.api.DefaultInputPort;
 +import com.datatorrent.api.DefaultOutputPort;
 +import com.datatorrent.api.InputOperator;
 +import com.datatorrent.api.StreamingApplication;
 +import com.datatorrent.api.annotation.ApplicationAnnotation;
 +import com.datatorrent.common.util.BaseOperator;
 +import com.datatorrent.lib.util.KeyValPair;
 +
 +import static org.apache.apex.malhar.stream.api.Option.Options.name;
 +
 +/**
 + * An example that reads the public 'Shakespeare' data, and for each word in
 + * the dataset that is over a given length, generates a string containing the
 + * list of play names in which that word appears
 + *
 + * <p>Concepts: the combine transform, which lets you combine the values in a
 + * key-grouped Collection
 + *
 + *
 + * @since 3.5.0
 + */
 +@ApplicationAnnotation(name = "CombinePerKeyExamples")
 +public class CombinePerKeyExamples implements StreamingApplication
 +{
 +  // Use the shakespeare public BigQuery sample
 +  private static final String SHAKESPEARE_TABLE = "publicdata:samples.shakespeare";
 +  // We'll track words >= this word length across all plays in the table.
 +  private static final int MIN_WORD_LENGTH = 0;
 +
 +  /**
 +   * Examines each row in the input table. If the word is greater than or equal to MIN_WORD_LENGTH,
 +   * outputs word, play_name.
 +   */
 +  static class ExtractLargeWordsFn implements Function.MapFunction<SampleBean, KeyValPair<String, String>>
 +  {
 +
 +    @Override
 +    public KeyValPair<String, String> f(SampleBean input)
 +    {
 +      String playName = input.getCorpus();
 +      String word = input.getWord();
 +      if (word.length() >= MIN_WORD_LENGTH) {
 +        return new KeyValPair<>(word, playName);
 +      } else {
 +        return null;
 +      }
 +    }
 +  }
 +
 +
 +  /**
 +   * Prepares the output data which is in same bean
 +   */
 +  static class FormatShakespeareOutputFn implements Function.MapFunction<Tuple.WindowedTuple<KeyValPair<String, String>>, SampleBean>
 +  {
 +    @Override
 +    public SampleBean f(Tuple.WindowedTuple<KeyValPair<String, String>> input)
 +    {
 +      return new SampleBean(input.getValue().getKey(), input.getValue().getValue());
 +    }
 +  }
 +
 +  /**
 +   * A reduce function to concat two strings together.
 +   */
 +  public static class Concat extends ReduceFn<String>
 +  {
 +    @Override
 +    public String reduce(String input1, String input2)
 +    {
 +      return input1 + ", " + input2;
 +    }
 +  }
 +
 +  /**
 +   * Reads the public 'Shakespeare' data, and for each word in the dataset
 +   * over a given length, generates a string containing the list of play names
 +   * in which that word appears.
 +   */
 +  private static class PlaysForWord extends CompositeStreamTransform<ApexStream<SampleBean>, WindowedStream<SampleBean>>
 +  {
 +
 +    @Override
 +    public WindowedStream<SampleBean> compose(ApexStream<SampleBean> inputStream)
 +    {
 +      return inputStream
 +          // Extract words from the input SampleBeam stream.
 +          .map(new ExtractLargeWordsFn(), name("ExtractLargeWordsFn"))
 +
 +          // Apply window and trigger option to the streams.
 +          .window(new WindowOption.GlobalWindow(), new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(1))
 +
 +          // Apply reduceByKey transformation to concat the names of all the plays that a word has appeared in together.
 +          .reduceByKey(new Concat(), new Function.ToKeyValue<KeyValPair<String,String>, String, String>()
 +          {
 +            @Override
 +            public Tuple<KeyValPair<String, String>> f(KeyValPair<String, String> input)
 +            {
 +              return new Tuple.PlainTuple<KeyValPair<String, String>>(input);
 +            }
 +          }, name("Concat"))
 +
 +          // Format the output back to a SampleBeam object.
 +          .map(new FormatShakespeareOutputFn(), name("FormatShakespeareOutputFn"));
 +    }
 +  }
 +
 +
 +  /**
 +   * A Java Beam class that contains information about a word appears in a corpus written by Shakespeare.
 +   */
 +  public static class SampleBean
 +  {
 +
 +    public SampleBean()
 +    {
 +
 +    }
 +
 +    public SampleBean(String word, String corpus)
 +    {
 +      this.word = word;
 +      this.corpus = corpus;
 +    }
 +
 +    @Override
 +    public String toString()
 +    {
 +      return this.word + " : "  + this.corpus;
 +    }
 +
 +    private String word;
 +
 +    private String corpus;
 +
 +    public void setWord(String word)
 +    {
 +      this.word = word;
 +    }
 +
 +    public String getWord()
 +    {
 +      return word;
 +    }
 +
 +    public void setCorpus(String corpus)
 +    {
 +      this.corpus = corpus;
 +    }
 +
 +    public String getCorpus()
 +    {
 +      return corpus;
 +    }
 +  }
 +
 +  /**
 +   * A dummy info generator to generate {@link SampleBean} objects to mimic reading from real 'Shakespeare'
 +   * data.
 +   */
 +  public static class SampleInput extends BaseOperator implements InputOperator
 +  {
 +
 +    public final transient DefaultOutputPort<SampleBean> beanOutput = new DefaultOutputPort();
 +    private String[] words = new String[]{"A", "B", "C", "D", "E", "F", "G"};
 +    private String[] corpuses = new String[]{"1", "2", "3", "4", "5", "6", "7", "8"};
 +    private static int i;
 +
 +    @Override
 +    public void setup(Context.OperatorContext context)
 +    {
 +      super.setup(context);
 +      i = 0;
 +    }
 +
 +    @Override
 +    public void emitTuples()
 +    {
 +      while (i < 1) {
 +        for (String word : words) {
 +          for (String corpus : corpuses) {
 +            try {
 +              Thread.sleep(50);
 +              beanOutput.emit(new SampleBean(word, corpus));
 +            } catch (Exception e) {
 +              // Ignore it
 +            }
 +          }
 +        }
 +        i++;
 +      }
 +
 +    }
 +  }
 +
 +  public static class Collector extends BaseOperator
 +  {
 +    private static List<SampleBean> result;
 +    private static boolean done = false;
 +
 +    public static List<SampleBean> getResult()
 +    {
 +      return result;
 +    }
 +
 +    public static boolean isDone()
 +    {
 +      return done;
 +    }
 +
 +    @Override
 +    public void setup(Context.OperatorContext context)
 +    {
 +      result = new ArrayList<>();
 +      done = false;
 +    }
 +
 +    public final transient DefaultInputPort<SampleBean> input = new DefaultInputPort<SampleBean>()
 +    {
 +      @Override
 +      public void process(SampleBean tuple)
 +      {
 +        if (tuple.getWord().equals("F")) {
 +          done = true;
 +        }
 +        result.add(tuple);
 +      }
 +    };
 +  }
 +
 +  /**
 +   * Populate dag using High-Level API.
 +   * @param dag
 +   * @param conf
 +   */
 +  @Override
 +  public void populateDAG(DAG dag, Configuration conf)
 +  {
 +    SampleInput input = new SampleInput();
 +    Collector collector = new Collector();
 +    StreamFactory.fromInput(input, input.beanOutput, name("input"))
 +      .addCompositeStreams(new PlaysForWord())
 +      .print(name("console"))
 +      .endWith(collector, collector.input, name("Collector"))
 +      .populateDag(dag);
 +
 +  }
 +}