You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by hu...@apache.org on 2017/01/20 08:47:50 UTC

[10/19] incubator-gearpump git commit: merge master into akka-streams branch

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/docs/docs/introduction/gearpump-internals.md
----------------------------------------------------------------------
diff --git a/docs/docs/introduction/gearpump-internals.md b/docs/docs/introduction/gearpump-internals.md
deleted file mode 100644
index bc7e6bf..0000000
--- a/docs/docs/introduction/gearpump-internals.md
+++ /dev/null
@@ -1,228 +0,0 @@
-### Actor Hierarchy?
-
-![Actor Hierarchy](../img/actor_hierarchy.png)
-
-Everything in the diagram is an actor; they fall into two categories, Cluster Actors and Application Actors.
-
-#### Cluster Actors
-
-  **Worker**: Maps to a physical worker machine. It is responsible for managing resources and report metrics on that machine.
-
-  **Master**: Heart of the cluster, which manages workers, resources, and applications. The main function is delegated to three child actors, App Manager, Worker Manager, and Resource Scheduler.
-
-#### Application Actors:
-
-  **AppMaster**: Responsible to schedule the tasks to workers and manage the state of the application. Different applications have different AppMaster instances and are isolated.
-
-  **Executor**: Child of AppMaster, represents a JVM process. Its job is to manage the life cycle of tasks and recover the tasks in case of failure.
-
-  **Task**: Child of Executor, does the real job. Every task actor has a global unique address. One task actor can send data to any other task actors. This gives us great flexibility of how the computation DAG is distributed.
-
-  All actors in the graph are weaved together with actor supervision, and actor watching and every error is handled properly via supervisors. In a master, a risky job is isolated and delegated to child actors, so it's more robust. In the application, an extra intermediate layer "Executor" is created so that we can do fine-grained and fast recovery in case of task failure. A master watches the lifecycle of AppMaster and worker to handle the failures, but the life cycle of Worker and AppMaster are not bound to a Master Actor by supervision, so that Master node can fail independently.  Several Master Actors form an Akka cluster, the Master state is exchanged using the Gossip protocol in a conflict-free consistent way so that there is no single point of failure. With this hierarchy design, we are able to achieve high availability.
-
-### Application Clock and Global Clock Service
-
-Global clock service will track the minimum time stamp of all pending messages in the system. Every task will update its own minimum-clock to global clock service; the minimum-clock of task is decided by the minimum of:
-
-  - Minimum time stamp of all pending messages in the inbox.
-  - Minimum time stamp of all un-acked outgoing messages. When there is message loss, the minimum clock will not advance.
-  - Minimum clock of all task states. If the state is accumulated by a lot of input messages, then the clock value is decided by the oldest message's timestamp. The state clock will advance by doing snapshots to persistent storage or by fading out the effect of old messages.
-
-![Clock](../img/clock.png)
-
-The global clock service will keep track of all task minimum clocks effectively and maintain a global view of minimum clock. The global minimum clock value is monotonically increasing; it means that all source messages before this clock value have been processed. If there is message loss or task crash, the global minimum clock will stop.
-
-### How do we optimize the message passing performance?
-
-For streaming application, message passing performance is extremely important. For example, one streaming platform may need to process millions of messages per second with millisecond level latency. High throughput and low latency is not that easy to achieve. There are a number of challenges:
-
-#### First Challenge: Network is not efficient for small messages
-
-In streaming, typical message size is very small, usually less than 100 bytes per message, like the floating car GPS data. But network efficiency is very bad when transferring small messages. As you can see in below diagram, when message size is 50 bytes, it can only use 20% bandwidth. How to improve the throughput?
-
-![Throughput vs. Message Size](../img/through_vs_message_size.png)
-
-#### Second Challenge: Message overhead is too big
-
-For each message sent between two actors, it contains sender and receiver actor path. When sending over the wire, the overhead of this ActorPath is not trivial. For example, the below actor path takes more than 200 bytes.
-
-	:::bash
-	akka.tcp://system1@192.168.1.53:51582/remote/akka.tcp/2120193a-e10b-474e-bccb-8ebc4b3a0247@192.168.1.53:48948/remote/akka.tcp/system2@192.168.1.54:43676/user/master/Worker1/app_0_executor_0/group_1_task_0#-768886794
-
-
-#### How do we solve this?
-
-We implement a custom Netty transportation layer with Akka extension. In the below diagram, Netty Client will translate ActorPath to TaskId, and Netty Server will translate it back. Only TaskId will be passed on wire, it is only about 10 bytes, the overhead is minimized. Different Netty Client Actors are isolated; they will not block each other.
-
-![Netty Transport](../img/netty_transport.png)
-
-For performance, effective batching is really the key! We group multiple messages to a single batch and send it on the wire. The batch size is not fixed; it is adjusted dynamically based on network status. If the network is available, we will flush pending messages immediately without waiting; otherwise we will put the message in a batch and trigger a timer to flush the batch later.
-
-### How do we do flow Control?
-
-Without flow control, one task can easily flood another task with too many messages, causing out of memory error. Typical flow control will use a TCP-like sliding window, so that source and target can run concurrently without blocking each other.
-
-![Flow Control](../img/flow_control.png)
-Figure: Flow control, each task is "star" connected to input tasks and output tasks
-
-The difficult part for our problem is that each task can have multiple input tasks and output tasks. The input and output must be geared together so that the back pressure can be properly propagated from downstream to upstream. The flow control also needs to consider failures, and it needs to be able to recover when there is message loss.
-Another challenge is that the overhead of flow control messages can be big. If we ack every message, there will be huge amount of acked messages in the system, degrading streaming performance. The approach we adopted is to use explicit AckRequest message. The target tasks will only ack back when they receive the AckRequest message, and the source will only send AckRequest when it feels necessary. With this approach, we can largely reduce the overhead.
-
-### How do we detect message loss?
-
-For example, for web ads, we may charge for every click, we don't want to miscount.  The streaming platform needs to effectively track what messages have been lost, and recover as fast as possible.
-
-![Message Loss](../img/messageLoss.png)
-Figure: Message Loss Detection
-
-We use the flow control message AckRequest and Ack to detect message loss. The target task will count how many messages has been received since last AckRequest, and ack the count back to source task. The source task will check the count and find message loss.
-This is just an illustration, the real case is more difficulty, we need to handle zombie tasks, and in-the-fly stale messages.
-
-### How Gearpump know what messages to replay?
-
-In some applications, a message cannot be lost, and must be replayed. For example, during the money transfer, the bank will SMS us the verification code. If that message is lost, the system must replay it so that money transfer can continue. We made the decision to use **source end message storage** and **time stamp based replay**.
-
-![Replay](../img/replay.png)
-Figure: Replay with Source End Message Store
-
-Every message is immutable, and tagged with a timestamp. We have an assumption that the timestamp is approximately incremental (allow small ratio message disorder).
-
-We assume the message is coming from a replay-able source, like Kafka queue; otherwise the message will be stored at customizable source end "message store". When the source task sends the message downstream, the timestamp and offset of the message is also check-pointed to offset-timestamp storage periodically. During recovery, the system will first retrieve the right time stamp and offset from the offset-timestamp storage, then it will replay the message store from that time stamp and offset. A Timestamp Filter will filter out old messages in case the message in message store is not strictly time-ordered.
-
-### Master High Availability
-
-In a distributed streaming system, any part can fail. The system must stay responsive and do recovery in case of errors.
-
-![HA](../img/ha.png)
-Figure: Master High Availability
-
-We use Akka clustering to implement the Master high availability. The cluster consists of several master nodes, but no worker nodes. With clustering facilities, we can easily detect and handle the failure of master node crash. The master state is replicated on all master nodes with the Typesafe akka-data-replication  library, when one master node crashes, another standby master will read the master state and take over. The master state contains the submission data of all applications. If one application dies, a master can use that state to recover that application. CRDT LwwMap  is used to represent the state; it is a hash map that can converge on distributed nodes without conflict. To have strong data consistency, the state read and write must happen on a quorum of master nodes.
-
-### How we do handle failures?
-
-With Akka's powerful actor supervision, we can implement a resilient system relatively easy. In Gearpump, different applications have a different AppMaster instance, they are totally isolated from each other. For each application, there is a supervision tree, AppMaster->Executor->Task. With this supervision hierarchy, we can free ourselves from the headache of zombie process, for example if AppMaster is down, Akka supervisor will ensure the whole tree is shutting down.
-
-There are multiple possible failure scenarios
-
-![Failures](../img/failures.png)
-Figure: Possible Failure Scenarios and Error Supervision Hierarchy
-
-#### What happens when the Master crashes?
-
-In case of a master crash, other standby masters will be notified, they will resume the master state, and take over control. Worker and AppMaster will also be notified, They will trigger a process to find the new active master, until the resolution complete. If AppMaster or Worker cannot resolve a new Master in a time out, they will make suicide and kill themselves.
-
-#### What happens when a worker crashes?
-
-In case of a worker crash, the Master will get notified and stop scheduling new computation to this worker. All supervised executors on current worker will be killed, AppMaster can treat it as recovery of executor crash like [What happen when an executor crashes?](#what-happen-when-an-executor-crashes)
-
-#### What happens when the AppMaster crashes?
-
-If an AppMaster crashes, Master will schedule a new resource to create a new AppMaster Instance elsewhere, and then the AppMaster will handle the recovery inside the application. For streaming, it will recover the latest min clock and other state from disk, request resources from master to start executors, and restart the tasks with recovered min clock.
-
-#### What happen when an executor crashes?
-
-If an executor crashes, its supervisor AppMaster will get notified, and request a new resource from the active master to start a new executor, to run the tasks which were located on the crashed executor.
-
-#### What happen when tasks crash?
-
-If a task throws an exception, its supervisor executor will restart that Task.
-
-When "at least once" message delivery is enabled, it will trigger the message replaying in the case of message loss. First AppMaster will read the latest minimum clock from the global clock service(or clock storage if the clock service crashes), then AppMaster will restart all the task actors to get a fresh task state, then the source end tasks will replay messages from that minimum clock.
-
-### How does "exactly-once" message delivery work?
-
-For some applications, it is extremely important to do "exactly once" message delivery. For example, for a real-time billing system, we will not want to bill the customer twice. The goal of "exactly once" message delivery is to make sure:
-  The error doesn't accumulate, today's error will not be accumulated to tomorrow.
-  Transparent to application developer
-We use global clock to synchronize the distributed transactions. We assume every message from the data source will have a unique timestamp, the timestamp can be a part of the message body, or can be attached later with system clock when the message is injected into the streaming system. With this global synchronized clock, we can coordinate all tasks to checkpoint at same timestamp.
-
-![Checkpoint](../img/checkpointing.png)
-Figure: Checkpointing and Exactly-Once Message delivery
-
-Workflow to do state checkpointing:
-
-1. The coordinator asks the streaming system to do checkpoint at timestamp Tc.
-2. For each application task, it will maintain two states, checkpoint state and current state. Checkpoint state only contains information before timestamp Tc. Current state contains all information.
-3. When global minimum clock is larger than Tc, it means all messages older than Tc has been processed; the checkpoint state will no longer change, so we will then persist the checkpoint state to storage safely.
-4. When there is message loss, we will start the recovery process.
-5. To recover, load the latest checkpoint state from store, and then use it to restore the application status.
-6. Data source replays messages from the checkpoint timestamp.
-
-The checkpoint interval is determined by global clock service dynamically. Each data source will track the max timestamp of input messages. Upon receiving min clock updates, the data source will report the time delta back to global clock service. The max time delta is the upper bound of the application state timespan. The checkpoint interval is bigger than max delta time:
-
-![Checkpoint Equation](../img/checkpoint_equation.png)
-
-![Checkpointing Interval](../img/checkpointing_interval.png)
-Figure: How to determine Checkpoint Interval
-
-After the checkpoint interval is notified to tasks by global clock service, each task will calculate its next checkpoint timestamp autonomously without global synchronization.
-
-![Checkpoint Interval Equation](../img/checkpoint_interval_equation.png)
-
-For each task, it contains two states, checkpoint state and current state. The code to update the state is shown in listing below.
-
-	:::python
-	TaskState(stateStore, initialTimeStamp):
-	  currentState = stateStore.load(initialTimeStamp)
-	  checkpointState = currentState.clone
-	  checkpointTimestamp = nextCheckpointTimeStamp(initialTimeStamp)
-	onMessage(msg):
-	  if (msg.timestamp < checkpointTimestamp):
-	    checkpointState.updateMessage(msg)
-	  currentState.updateMessage(msg)  
-	  maxClock = max(maxClock, msg.timeStamp)
-	
-	onMinClock(minClock):
-	  if (minClock > checkpointTimestamp):
-	    stateStore.persist(checkpointState)
-	    checkpointTimeStamp = nextCheckpointTimeStamp(maxClock)
-	    checkpointState = currentState.clone
-	
-	onNewCheckpointInterval(newStep):
-	  step = newStep  
-	nextCheckpointTimeStamp(timestamp):
-	  checkpointTimestamp = (1 + timestamp/step) * step
-	
-
-List 1: Task Transactional State Implementation
-
-### What is dynamic graph, and how it works?
-
-The DAG can be modified dynamically. We want to be able to dynamically add, remove, and replace a sub-graph.
-
-![Dynamic DAG](../img/dynamic.png)
-Figure: Dynamic Graph, Attach, Replace, and Remove
-
-## At least once message delivery and Kafka
-
-The Kafka source example project and tutorials can be found at:
-- [Kafka connector example project](https://github.com/apache/incubator-gearpump/tree/master/examples/streaming/kafka)
-- [Connect with Kafka source](../dev/dev-connectors)
-
-In this doc, we will talk about how the at least once message delivery works.
-
-We will use the WordCount example of [source tree](https://github.com/apache/incubator-gearpump/tree/master/examples/streaming/kafka)  to illustrate.
-
-### How the kafka WordCount DAG looks like:
-
-It contains three processors:
-![Kafka WordCount](../img/kafka_wordcount.png)
-
-- KafkaStreamProducer(or KafkaSource) will read message from kafka queue.
-- Split will split lines to words
-- Sum will summarize the words to get a count for each word.
-
-### How to read data from Kafka
-
-We use KafkaSource, please check [Connect with Kafka source](../dev/dev-connectors) for the introduction.
-
-Please note that we have set a startTimestamp for the KafkaSource, which means KafkaSource will read from Kafka queue starting from messages whose timestamp is near startTimestamp.
-
-### What happen where there is Task crash or message loss?
-When there is message loss, the AppMaster will first pause the global clock service so that the global minimum timestamp no longer change, then it will restart the Kafka source tasks. Upon restart, Kafka Source will start to replay. It will first read the global minimum timestamp from AppMaster, and start to read message from that timestamp.
-
-### What method KafkaSource used to read messages from a start timestamp? As we know Kafka queue doesn't expose the timestamp information.
-
-Kafka queue only expose the offset information for each partition. What KafkaSource do is to maintain its own mapping from Kafka offset to  Application timestamp, so that we can map from a application timestamp to a Kafka offset, and replay Kafka messages from that Kafka offset.
-
-The mapping between Application timestamp with Kafka offset is stored in a distributed file system or as a Kafka topic.

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/docs/docs/introduction/message-delivery.md
----------------------------------------------------------------------
diff --git a/docs/docs/introduction/message-delivery.md b/docs/docs/introduction/message-delivery.md
deleted file mode 100644
index 1ffeb9e..0000000
--- a/docs/docs/introduction/message-delivery.md
+++ /dev/null
@@ -1,47 +0,0 @@
-## What is At Least Once Message Delivery?
-
-Messages could be lost on delivery due to network partitions. **At Least Once Message Delivery** (at least once) means the lost messages are delivered one or more times such that at least one is processed and acknowledged by the whole flow. 
-
-Gearpump guarantees at least once for any source that is able to replay message from a past timestamp. In Gearpump, each message is tagged with a timestamp, and the system tracks the minimum timestamp of all pending messages (the global minimum clock). On message loss, application will be restarted to the global minimum clock. Since the source is able to replay from the global minimum clock, all pending messages before the restart will be replayed. Gearpump calls that kind of source `TimeReplayableSource` and already provides a built in
-[KafkaSource](gearpump-internals#at-least-once-message-delivery-and-kafka). With the KafkaSource to ingest data into Gearpump, users are guaranteed at least once message delivery.
-
-## What is Exactly Once Message Delivery?
-
-At least once delivery doesn't guarantee the correctness of the application result. For instance,  for a task keeping the count of received messages, there could be overcount with duplicated messages and the count is lost on task failure.
- In that case, **Exactly Once Message Delivery** (exactly once) is required, where state is updated by a message exactly once. This further requires that duplicated messages are filtered out and in-memory states are persisted.
-
-Users are guaranteed exactly once in Gearpump if they use both a `TimeReplayableSource` to ingest data and the Persistent API to manage their in memory states. With the Persistent API, user state is periodically checkpointed by the system to a persistent store (e.g HDFS) along with its checkpointed time. Gearpump tracks the global minimum checkpoint timestamp of all pending states (global minimum checkpoint clock), which is persisted as well. On application restart, the system restores states at the global minimum checkpoint clock and source replays messages from that clock. This ensures that a message updates all states exactly once.
-
-### Persistent API
-Persistent API consists of `PersistentTask` and `PersistentState`.
-
-Here is an example of using them to keep count of incoming messages.
-
-	:::scala
-	class CountProcessor(taskContext: TaskContext, conf: UserConfig)
-  	  extends PersistentTask[Long](taskContext, conf) {
-
-  	  override def persistentState: PersistentState[Long] = {
-        import com.twitter.algebird.Monoid.longMonoid
-        new NonWindowState[Long](new AlgebirdMonoid(longMonoid), new ChillSerializer[Long])
-      }
-
-      override def processMessage(state: PersistentState[Long], message: Message): Unit = {
-        state.update(message.timestamp, 1L)
-      }
-    }
-
-   
-The `CountProcessor` creates a customized `PersistentState` which will be managed by `PersistentTask` and overrides the `processMessage` method to define how the state is updated on a new message (each new message counts as `1`, which is added to the existing value)
-
-Gearpump has already offered two types of states
- 
-1. NonWindowState - state with no time or other boundary
-2. WindowState - each state is bounded by a time window
-
-They are intended for states that satisfy monoid laws.
-
-1. has binary associative operation, like `+`  
-2. has an identity element, like `0`
-
-In the above example, we make use of the `longMonoid` from [Twitter's Algebird](https://github.com/twitter/algebird) library which provides a bunch of useful monoids. 
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/docs/docs/introduction/performance-report.md
----------------------------------------------------------------------
diff --git a/docs/docs/introduction/performance-report.md b/docs/docs/introduction/performance-report.md
deleted file mode 100644
index e4254d1..0000000
--- a/docs/docs/introduction/performance-report.md
+++ /dev/null
@@ -1,34 +0,0 @@
-# Performance Evaluation
-
-To illustrate the performance of Gearpump, we mainly focused on two aspects, throughput and latency, using a micro benchmark called SOL (an example in the Gearpump package) whose topology is quite simple. SOLStreamProducer delivers messages to SOLStreamProcessor constantly and SOLStreamProcessor does nothing. We set up a 4-nodes cluster with 10GbE network and each node's hardware is briefly shown as follows:
-
-Processor: 32 core Intel(R) Xeon(R) CPU E5-2690 2.90GHz
-Memory: 64GB
-
-## Throughput
-
-We tried to explore the upper bound of the throughput, after launching 48 SOLStreamProducer and 48 SOLStreamProcessor the Figure below shows that the whole throughput of the cluster can reach about 18 million messages/second(100 bytes per message)
-
-## Latency
-
-When we transfer message at the max throughput above, the average latency between two tasks is 8ms.
-
-## Fault Recovery time
-
-When the corruption is detected, for example the Executor is down, Gearpump will reallocate the resource and restart the application. It takes about 10 seconds to recover the application.
-
-![Dashboard](../img/dashboard.png)
-
-## How to setup the benchmark environment?
-
-### Prepare the env
-
-1). Set up a 4-nodes Gearpump cluster with 10GbE network which have 4 Workers on each node. In our test environment, each node has 64GB memory and Intel(R) Xeon(R) 32-core processor E5-2690 2.90GHz. Make sure the metrics is enabled in Gearpump.
-
-2). Submit a SOL application with 48 StreamProducers and 48 StreamProcessors:
-
-    :::bash
-    bin/gear app -jar ./examples/sol-{{SCALA_BINARY_VERSION}}-{{GEARPUMP_VERSION}}-assembly.jar -streamProducer 48 -streamProcessor 48
-
-
-3). Launch Gearpump's dashboard and browser http://$HOST:8090/, switch to the Applications tab and you can see the detail information of your application. The HOST should be the node runs dashboard.

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/docs/docs/introduction/submit-your-1st-application.md
----------------------------------------------------------------------
diff --git a/docs/docs/introduction/submit-your-1st-application.md b/docs/docs/introduction/submit-your-1st-application.md
deleted file mode 100644
index 21cfaf2..0000000
--- a/docs/docs/introduction/submit-your-1st-application.md
+++ /dev/null
@@ -1,39 +0,0 @@
-Before you can submit and run your first Gearpump application, you will need a running Gearpump service.
-There are multiple ways to run Gearpump [Local mode](../deployment/deployment-local), [Standalone mode](../deployment/deployment-standalone), [YARN mode](../deployment/deployment-yarn) or [Docker mode](../deployment/deployment-docker).
-
-The easiest way is to run Gearpump in [Local mode](../deployment/deployment-local).
-Any Linux, MacOSX or Windows desktop can be used with zero configuration.
-
-In the example below, we assume your are running in [Local mode](../deployment/deployment-local).
-If you running Gearpump in one of the other modes, you will need to configure the Gearpump client to
-connect to the Gearpump service by setting the `gear.conf` configuration path in classpath.
-Within this file, you will need to change the parameter `gearpump.cluster.masters` to the correct Gearpump master(s).
-See [Configuration](../deployment/deployment-configuration) for details.
-
-## Steps to submit your first Application
-
-### Step 1: Submit application
-After the cluster is started, you can submit an example wordcount application to the cluster
-
-Open another shell,
-
-	:::bash
-	### To run WordCount example
-	bin/gear app -jar examples/wordcount-{{SCALA_BINARY_VERSION}}-{{GEARPUMP_VERSION}}-assembly.jar org.apache.gearpump.streaming.examples.wordcount.WordCount
-
-
-###  Step 2: Congratulations, you've submitted your first application.
-
-To view the application status and metrics, start the Web UI services, and browse to [http://127.0.0.1:8090](http://127.0.0.1:8090) to check the status.
-The default username and password is "admin:admin", you can check
-[UI Authentication](../deployment/deployment-ui-authentication) to find how to manage users.
-
-![Dashboard](../img/dashboard.gif)
-
-**NOTE:** the UI port setting can be defined in configuration, please check section [Configuration](../deployment/deployment-configuration).
-
-## A quick Look at the Web UI
-TBD
-
-## Other Application Examples
-Besides wordcount, there are several other example applications. Please check the source tree examples/ for detail information.

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/docs/mkdocs.yml
----------------------------------------------------------------------
diff --git a/docs/mkdocs.yml b/docs/mkdocs.yml
index a769960..6e23e41 100644
--- a/docs/mkdocs.yml
+++ b/docs/mkdocs.yml
@@ -2,6 +2,8 @@ site_name: Apache Gearpump(incubating)
 
 repo_url: 'https://github.com/apache/incubator-gearpump'
 
+edit_uri: 'edit/master/docs/contents'
+
 use_directory_urls: false
 
 # files under docs/ are copied to tmp/

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/docs/version.yml
----------------------------------------------------------------------
diff --git a/docs/version.yml b/docs/version.yml
index b13cd9d..8a18936 100644
--- a/docs/version.yml
+++ b/docs/version.yml
@@ -1,5 +1,5 @@
 ---
-GEARPUMP_VERSION: "0.8.2-SNAPSHOT"
+GEARPUMP_VERSION: "0.8.3-SNAPSHOT"
 SCALA_BINARY_VERSION: "2.11"
 SCALA_VERSION: "2.11.8"
 ---

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/examples/distributedshell/src/main/scala/org/apache/gearpump/examples/distributedshell/DistributedShell.scala
----------------------------------------------------------------------
diff --git a/examples/distributedshell/src/main/scala/org/apache/gearpump/examples/distributedshell/DistributedShell.scala b/examples/distributedshell/src/main/scala/org/apache/gearpump/examples/distributedshell/DistributedShell.scala
index c4eec07..6db8531 100644
--- a/examples/distributedshell/src/main/scala/org/apache/gearpump/examples/distributedshell/DistributedShell.scala
+++ b/examples/distributedshell/src/main/scala/org/apache/gearpump/examples/distributedshell/DistributedShell.scala
@@ -33,9 +33,9 @@ object DistributedShell extends AkkaApp with ArgumentsParser {
   override def main(akkaConf: Config, args: Array[String]): Unit = {
     LOG.info(s"Distributed shell submitting application...")
     val context = ClientContext(akkaConf)
-    val appId = context.submit(Application[DistShellAppMaster]("DistributedShell",
+    val app = context.submit(Application[DistShellAppMaster]("DistributedShell",
     UserConfig.empty))
     context.close()
-    LOG.info(s"Distributed Shell Application started with appId $appId !")
+    LOG.info(s"Distributed Shell Application started with appId ${app.appId} !")
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/examples/distributeservice/src/main/scala/org/apache/gearpump/experiments/distributeservice/DistributeService.scala
----------------------------------------------------------------------
diff --git a/examples/distributeservice/src/main/scala/org/apache/gearpump/experiments/distributeservice/DistributeService.scala b/examples/distributeservice/src/main/scala/org/apache/gearpump/experiments/distributeservice/DistributeService.scala
index df7a517..655389b 100644
--- a/examples/distributeservice/src/main/scala/org/apache/gearpump/experiments/distributeservice/DistributeService.scala
+++ b/examples/distributeservice/src/main/scala/org/apache/gearpump/experiments/distributeservice/DistributeService.scala
@@ -33,9 +33,9 @@ object DistributeService extends AkkaApp with ArgumentsParser {
   override def main(akkaConf: Config, args: Array[String]): Unit = {
     LOG.info(s"Distribute Service submitting application...")
     val context = ClientContext(akkaConf)
-    val appId = context.submit(Application[DistServiceAppMaster]("DistributedService",
+    val app = context.submit(Application[DistServiceAppMaster]("DistributedService",
       UserConfig.empty))
     context.close()
-    LOG.info(s"Distribute Service Application started with appId $appId !")
+    LOG.info(s"Distribute Service Application started with appId ${app.appId} !")
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/examples/pagerank/src/main/scala/org/apache/gearpump/experiments/pagerank/PageRankApplication.scala
----------------------------------------------------------------------
diff --git a/examples/pagerank/src/main/scala/org/apache/gearpump/experiments/pagerank/PageRankApplication.scala b/examples/pagerank/src/main/scala/org/apache/gearpump/experiments/pagerank/PageRankApplication.scala
index 023ee35..c7bfb43 100644
--- a/examples/pagerank/src/main/scala/org/apache/gearpump/experiments/pagerank/PageRankApplication.scala
+++ b/examples/pagerank/src/main/scala/org/apache/gearpump/experiments/pagerank/PageRankApplication.scala
@@ -21,7 +21,7 @@ import akka.actor.ActorSystem
 
 import org.apache.gearpump.cluster.{Application, ApplicationMaster, UserConfig}
 import org.apache.gearpump.experiments.pagerank.PageRankApplication.NodeWithTaskId
-import org.apache.gearpump.partitioner.HashPartitioner
+import org.apache.gearpump.streaming.partitioner.HashPartitioner
 import org.apache.gearpump.streaming.appmaster.AppMaster
 import org.apache.gearpump.streaming.{Processor, StreamApplication}
 import org.apache.gearpump.util.Graph

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Dag.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Dag.scala b/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Dag.scala
index 3b6ceb8..165df62 100644
--- a/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Dag.scala
+++ b/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Dag.scala
@@ -23,7 +23,7 @@ import org.slf4j.Logger
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.cluster.client.ClientContext
 import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult}
-import org.apache.gearpump.partitioner.HashPartitioner
+import org.apache.gearpump.streaming.partitioner.HashPartitioner
 import org.apache.gearpump.streaming.task.TaskContext
 import org.apache.gearpump.streaming.{Processor, StreamApplication}
 import org.apache.gearpump.util.Graph.{Node => GraphNode}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/SequenceFileIO.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/SequenceFileIO.scala b/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/SequenceFileIO.scala
index 5c75904..3a80549 100644
--- a/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/SequenceFileIO.scala
+++ b/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/SequenceFileIO.scala
@@ -23,7 +23,7 @@ import org.slf4j.Logger
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.cluster.client.ClientContext
 import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult}
-import org.apache.gearpump.partitioner.ShufflePartitioner
+import org.apache.gearpump.streaming.partitioner.ShufflePartitioner
 import org.apache.gearpump.streaming.examples.fsio.HadoopConfig._
 import org.apache.gearpump.streaming.{Processor, StreamApplication}
 import org.apache.gearpump.util.Graph._

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/examples/streaming/hbase/src/main/scala/org/apache/gearpump/streaming/examples/hbase/HBaseConn.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/hbase/src/main/scala/org/apache/gearpump/streaming/examples/hbase/HBaseConn.scala b/examples/streaming/hbase/src/main/scala/org/apache/gearpump/streaming/examples/hbase/HBaseConn.scala
new file mode 100644
index 0000000..df8c2f5
--- /dev/null
+++ b/examples/streaming/hbase/src/main/scala/org/apache/gearpump/streaming/examples/hbase/HBaseConn.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.examples.hbase
+
+import akka.actor.ActorSystem
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.cluster.client.ClientContext
+import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult}
+import org.apache.gearpump.external.hbase.HBaseSink
+import org.apache.gearpump.streaming.StreamApplication
+import org.apache.gearpump.streaming.partitioner.HashPartitioner
+import org.apache.gearpump.streaming.sink.DataSinkProcessor
+import org.apache.gearpump.streaming.source.DataSourceProcessor
+import org.apache.gearpump.util.Graph.Node
+import org.apache.gearpump.util.{AkkaApp, Graph, LogUtil}
+import org.slf4j.Logger
+
+object HBaseConn extends AkkaApp with ArgumentsParser {
+  private val LOG: Logger = LogUtil.getLogger(getClass)
+  val RUN_FOR_EVER = -1
+
+  override val options: Array[(String, CLIOption[Any])] = Array(
+    "splitNum" -> CLIOption[Int]("<how many sum tasks>", required = false, defaultValue = Some(1)),
+    "sinkNum" -> CLIOption[Int]("<how many sum tasks>", required = false, defaultValue = Some(1))
+  )
+
+  def application(config: ParseResult, system: ActorSystem): StreamApplication = {
+    implicit val actorSystem = system
+
+    val splitNum = config.getInt("splitNum")
+    val sinkNum = config.getInt("sinkNum")
+
+    val split = new Split
+    val sourceProcessor = DataSourceProcessor(split, splitNum, "Split")
+    val sink = HBaseSink(UserConfig.empty, "hbase")
+    val sinkProcessor = DataSinkProcessor(sink, sinkNum)
+    val partitioner = new HashPartitioner
+    val computation = sourceProcessor ~ partitioner ~> sinkProcessor
+    val application = StreamApplication("HBase", Graph(computation), UserConfig.empty)
+
+    application
+
+  }
+
+  override def main(akkaConf: Config, args: Array[String]): Unit = {
+    val config = parse(args)
+    val context = ClientContext(akkaConf)
+    val appId = context.submit(application(config, context.system))
+    context.close()
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/examples/streaming/hbase/src/main/scala/org/apache/gearpump/streaming/examples/hbase/Split.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/hbase/src/main/scala/org/apache/gearpump/streaming/examples/hbase/Split.scala b/examples/streaming/hbase/src/main/scala/org/apache/gearpump/streaming/examples/hbase/Split.scala
new file mode 100644
index 0000000..d16cd87
--- /dev/null
+++ b/examples/streaming/hbase/src/main/scala/org/apache/gearpump/streaming/examples/hbase/Split.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.examples.hbase
+
+import java.time.Instant
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.streaming.source.DataSource
+import org.apache.gearpump.streaming.task.TaskContext
+import org.apache.hadoop.hbase.util.Bytes
+
+class Split extends DataSource {
+
+  private var x = 0
+
+  override def open(context: TaskContext, startTime: Instant): Unit = {}
+
+  override def read(): Message = {
+
+    val tuple = (Bytes.toBytes(s"$x"), Bytes.toBytes("group"),
+      Bytes.toBytes("group:name"), Bytes.toBytes("99"))
+    x+=1
+
+    Message(tuple)
+  }
+
+  override def close(): Unit = {}
+
+  override def getWatermark: Instant = Instant.now()
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/KafkaReadWrite.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/KafkaReadWrite.scala b/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/KafkaReadWrite.scala
index cfeef5b..4b48e7d 100644
--- a/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/KafkaReadWrite.scala
+++ b/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/KafkaReadWrite.scala
@@ -27,7 +27,7 @@ import org.slf4j.Logger
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.cluster.client.ClientContext
 import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult}
-import org.apache.gearpump.partitioner.ShufflePartitioner
+import org.apache.gearpump.streaming.partitioner.ShufflePartitioner
 import org.apache.gearpump.streaming.StreamApplication
 import org.apache.gearpump.streaming.kafka._
 import org.apache.gearpump.streaming.sink.DataSinkProcessor

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/dsl/KafkaReadWrite.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/dsl/KafkaReadWrite.scala b/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/dsl/KafkaReadWrite.scala
index 49d3619..cbfe57a 100644
--- a/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/dsl/KafkaReadWrite.scala
+++ b/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/dsl/KafkaReadWrite.scala
@@ -21,8 +21,8 @@ package org.apache.gearpump.streaming.examples.kafka.dsl
 import java.util.Properties
 
 import org.apache.gearpump.cluster.client.ClientContext
-import org.apache.gearpump.cluster.main.{CLIOption, ArgumentsParser}
-import org.apache.gearpump.streaming.dsl.StreamApp
+import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption}
+import org.apache.gearpump.streaming.dsl.scalaapi.StreamApp
 import org.apache.gearpump.streaming.kafka.KafkaStoreFactory
 import org.apache.gearpump.streaming.kafka.dsl.KafkaDSL
 import org.apache.gearpump.streaming.kafka.dsl.KafkaDSL._

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/KafkaWordCount.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/KafkaWordCount.scala b/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/KafkaWordCount.scala
index aa9842f..80f0ff7 100644
--- a/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/KafkaWordCount.scala
+++ b/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/KafkaWordCount.scala
@@ -28,7 +28,7 @@ import org.slf4j.Logger
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.cluster.client.ClientContext
 import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult}
-import org.apache.gearpump.partitioner.HashPartitioner
+import org.apache.gearpump.streaming.partitioner.HashPartitioner
 import org.apache.gearpump.streaming.kafka._
 import org.apache.gearpump.streaming.sink.DataSinkProcessor
 import org.apache.gearpump.streaming.source.DataSourceProcessor

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOL.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOL.scala b/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOL.scala
index fb80ad3..01aa95e 100644
--- a/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOL.scala
+++ b/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOL.scala
@@ -23,7 +23,7 @@ import org.slf4j.Logger
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.cluster.client.ClientContext
 import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult}
-import org.apache.gearpump.partitioner.ShufflePartitioner
+import org.apache.gearpump.streaming.partitioner.ShufflePartitioner
 import org.apache.gearpump.streaming.{Processor, StreamApplication}
 import org.apache.gearpump.util.Graph._
 import org.apache.gearpump.util.{AkkaApp, Graph, LogUtil}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/MessageCountApp.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/MessageCountApp.scala b/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/MessageCountApp.scala
index 9bd2bc5..59289a5 100644
--- a/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/MessageCountApp.scala
+++ b/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/MessageCountApp.scala
@@ -27,7 +27,7 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.cluster.client.ClientContext
 import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult}
-import org.apache.gearpump.partitioner.HashPartitioner
+import org.apache.gearpump.streaming.partitioner.HashPartitioner
 import org.apache.gearpump.streaming.examples.state.processor.CountProcessor
 import org.apache.gearpump.streaming.hadoop.HadoopCheckpointStoreFactory
 import org.apache.gearpump.streaming.hadoop.lib.rotation.FileSizeRotation

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/WindowAverageApp.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/WindowAverageApp.scala b/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/WindowAverageApp.scala
index 629deb7..50235bc 100644
--- a/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/WindowAverageApp.scala
+++ b/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/WindowAverageApp.scala
@@ -24,7 +24,7 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.cluster.client.ClientContext
 import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult}
-import org.apache.gearpump.partitioner.HashPartitioner
+import org.apache.gearpump.streaming.partitioner.HashPartitioner
 import org.apache.gearpump.streaming.examples.state.processor.{NumberGeneratorProcessor, WindowAverageProcessor}
 import org.apache.gearpump.streaming.hadoop.HadoopCheckpointStoreFactory
 import org.apache.gearpump.streaming.state.impl.{PersistentStateConfig, WindowConfig}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/WordCount.java
----------------------------------------------------------------------
diff --git a/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/WordCount.java b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/WordCount.java
index 6b5bba0..5e3d472 100644
--- a/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/WordCount.java
+++ b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/WordCount.java
@@ -23,12 +23,11 @@ import org.apache.gearpump.cluster.ClusterConfig;
 import org.apache.gearpump.cluster.UserConfig;
 import org.apache.gearpump.cluster.client.ClientContext;
 import org.apache.gearpump.cluster.embedded.EmbeddedCluster;
-import org.apache.gearpump.partitioner.HashPartitioner;
-import org.apache.gearpump.partitioner.Partitioner;
+import org.apache.gearpump.streaming.partitioner.HashPartitioner;
+import org.apache.gearpump.streaming.partitioner.Partitioner;
 import org.apache.gearpump.streaming.javaapi.Graph;
 import org.apache.gearpump.streaming.javaapi.Processor;
 import org.apache.gearpump.streaming.javaapi.StreamApplication;
-import org.apache.gearpump.util.Constants;
 
 /** Java version of WordCount with Processor Graph API */
 public class WordCount {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java
----------------------------------------------------------------------
diff --git a/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java
index 0ecc42e..2942861 100644
--- a/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java
+++ b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java
@@ -19,21 +19,23 @@
 package org.apache.gearpump.streaming.examples.wordcountjava.dsl;
 
 import com.typesafe.config.Config;
+import org.apache.gearpump.Message;
 import org.apache.gearpump.cluster.ClusterConfig;
 import org.apache.gearpump.cluster.UserConfig;
 import org.apache.gearpump.cluster.client.ClientContext;
 import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
 import org.apache.gearpump.streaming.dsl.javaapi.JavaStreamApp;
-import org.apache.gearpump.streaming.javaapi.dsl.functions.FlatMapFunction;
-import org.apache.gearpump.streaming.javaapi.dsl.functions.GroupByFunction;
-import org.apache.gearpump.streaming.javaapi.dsl.functions.MapFunction;
-import org.apache.gearpump.streaming.javaapi.dsl.functions.ReduceFunction;
+import org.apache.gearpump.streaming.dsl.api.functions.MapFunction;
+import org.apache.gearpump.streaming.dsl.api.functions.ReduceFunction;
+import org.apache.gearpump.streaming.dsl.javaapi.functions.FlatMapFunction;
+import org.apache.gearpump.streaming.dsl.javaapi.functions.GroupByFunction;
+import org.apache.gearpump.streaming.source.DataSource;
+import org.apache.gearpump.streaming.task.TaskContext;
 import scala.Tuple2;
 
-import java.util.ArrayList;
+import java.time.Instant;
 import java.util.Arrays;
 import java.util.Iterator;
-import java.util.List;
 
 /** Java version of WordCount with high level DSL API */
 public class WordCount {
@@ -45,41 +47,80 @@ public class WordCount {
   public static void main(Config akkaConf, String[] args) throws InterruptedException {
     ClientContext context = new ClientContext(akkaConf);
     JavaStreamApp app = new JavaStreamApp("JavaDSL", context, UserConfig.empty());
-    List<String> source = new ArrayList<>(Arrays.asList("This is a good start, bingo!! bingo!!"));
-
-    JavaStream<String> sentence = app.source(source, 1, UserConfig.empty(), "source");
-
-    JavaStream<String> words = sentence.flatMap(new FlatMapFunction<String, String>() {
-      @Override
-      public Iterator<String> apply(String s) {
-        return new ArrayList<String>(Arrays.asList(s.split("\\s+"))).iterator();
-      }
-    }, "flatMap");
-
-    JavaStream<Tuple2<String, Integer>> ones = words.map(new MapFunction<String, Tuple2<String, Integer>>() {
-      @Override
-      public Tuple2<String, Integer> apply(String s) {
-        return new Tuple2<String, Integer>(s, 1);
-      }
-    }, "map");
-
-    JavaStream<Tuple2<String, Integer>> groupedOnes = ones.groupBy(new GroupByFunction<Tuple2<String, Integer>, String>() {
-      @Override
-      public String apply(Tuple2<String, Integer> tuple) {
-        return tuple._1();
-      }
-    }, 1, "groupBy");
-
-    JavaStream<Tuple2<String, Integer>> wordcount = groupedOnes.reduce(new ReduceFunction<Tuple2<String, Integer>>() {
-      @Override
-      public Tuple2<String, Integer> apply(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) {
-        return new Tuple2<String, Integer>(t1._1(), t1._2() + t2._2());
-      }
-    }, "reduce");
+
+    JavaStream<String> sentence = app.source(new StringSource("This is a good start, bingo!! bingo!!"),
+        1, UserConfig.empty(), "source");
+
+    JavaStream<String> words = sentence.flatMap(new Split(), "flatMap");
+
+    JavaStream<Tuple2<String, Integer>> ones = words.map(new Ones(), "map");
+
+    JavaStream<Tuple2<String, Integer>> groupedOnes = ones.groupBy(new TupleKey(), 1, "groupBy");
+
+    JavaStream<Tuple2<String, Integer>> wordcount = groupedOnes.reduce(new Count(), "reduce");
 
     wordcount.log();
 
-    app.run();
+    app.submit();
     context.close();
   }
+
+  private static class StringSource implements DataSource {
+
+    private final String str;
+
+    StringSource(String str) {
+      this.str = str;
+    }
+
+    @Override
+    public void open(TaskContext context, Instant startTime) {
+    }
+
+    @Override
+    public Message read() {
+      return Message.apply(str, Instant.now().toEpochMilli());
+    }
+
+    @Override
+    public void close() {
+    }
+
+    @Override
+    public Instant getWatermark() {
+      return Instant.now();
+    }
+  }
+
+  private static class Split extends FlatMapFunction<String, String> {
+
+    @Override
+    public Iterator<String> apply(String s) {
+      return Arrays.asList(s.split("\\s+")).iterator();
+    }
+  }
+
+  private static class Ones extends MapFunction<String, Tuple2<String, Integer>> {
+
+    @Override
+    public Tuple2<String, Integer> apply(String s) {
+      return new Tuple2<>(s, 1);
+    }
+  }
+
+  private static class Count extends ReduceFunction<Tuple2<String, Integer>> {
+
+    @Override
+    public Tuple2<String, Integer> apply(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) {
+      return new Tuple2<>(t1._1(), t1._2() + t2._2());
+    }
+  }
+
+  private static class TupleKey extends GroupByFunction<Tuple2<String, Integer>, String> {
+
+    @Override
+    public String apply(Tuple2<String, Integer> tuple) {
+      return tuple._1();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/WordCount.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/WordCount.scala b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/WordCount.scala
index 9580e63..0e3d840 100644
--- a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/WordCount.scala
+++ b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/WordCount.scala
@@ -23,7 +23,7 @@ import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.cluster.client.ClientContext
 import org.apache.gearpump.cluster.embedded.EmbeddedCluster
 import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult}
-import org.apache.gearpump.partitioner.HashPartitioner
+import org.apache.gearpump.streaming.partitioner.HashPartitioner
 import org.apache.gearpump.streaming.source.DataSourceProcessor
 import org.apache.gearpump.streaming.{Processor, StreamApplication}
 import org.apache.gearpump.util.Graph.Node

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WindowedWordCount.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WindowedWordCount.scala b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WindowedWordCount.scala
index 4f43fd4..401eac0 100644
--- a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WindowedWordCount.scala
+++ b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WindowedWordCount.scala
@@ -22,7 +22,7 @@ import java.time.{Duration, Instant}
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.client.ClientContext
 import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption}
-import org.apache.gearpump.streaming.dsl.{LoggerSink, StreamApp}
+import org.apache.gearpump.streaming.dsl.scalaapi.{LoggerSink, StreamApp}
 import org.apache.gearpump.streaming.dsl.window.api.{EventTimeTrigger, FixedWindow}
 import org.apache.gearpump.streaming.source.DataSource
 import org.apache.gearpump.streaming.task.TaskContext

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WordCount.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WordCount.scala b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WordCount.scala
index 22f597c..1cbfb22 100644
--- a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WordCount.scala
+++ b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WordCount.scala
@@ -20,8 +20,8 @@ package org.apache.gearpump.streaming.examples.wordcount.dsl
 
 import org.apache.gearpump.cluster.client.ClientContext
 import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption}
-import org.apache.gearpump.streaming.dsl.StreamApp
-import org.apache.gearpump.streaming.dsl.StreamApp._
+import org.apache.gearpump.streaming.dsl.scalaapi.StreamApp
+import org.apache.gearpump.streaming.dsl.scalaapi.StreamApp._
 import org.apache.gearpump.util.AkkaApp
 
 /** Same WordCount with High level DSL syntax */

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/GearAttributes.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/GearAttributes.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/GearAttributes.scala
index 016a7b2..4384b39 100644
--- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/GearAttributes.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/GearAttributes.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/GearpumpMaterializer.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/GearpumpMaterializer.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/GearpumpMaterializer.scala
index 9ff701c..07c95f8 100644
--- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/GearpumpMaterializer.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/GearpumpMaterializer.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -24,11 +24,11 @@ import akka.NotUsed
 import akka.actor.{ActorContext, ActorRef, ActorRefFactory, ActorSystem, Cancellable, ExtendedActorSystem}
 import akka.event.{Logging, LoggingAdapter}
 import akka.stream.Attributes.Attribute
-import akka.stream._
 import akka.stream.impl.Stages.SymbolicGraphStage
-import akka.stream.impl.StreamLayout._
-import akka.stream.impl._
+import akka.stream.impl.StreamLayout.{Atomic, Combine, CopiedModule, Ignore, MaterializedValueNode, Module, Transform}
+import akka.stream.{ActorAttributes, ActorMaterializerSettings, Attributes, ClosedShape, Fusing, Graph, InPort, OutPort, SinkShape}
 import akka.stream.impl.fusing.{GraphInterpreterShell, GraphStageModule}
+import akka.stream.impl.{ExtendedActorMaterializer, StreamSupervisor}
 import akka.stream.stage.GraphStage
 import org.apache.gearpump.akkastream.GearpumpMaterializer.Edge
 import org.apache.gearpump.akkastream.graph.GraphPartitioner.Strategy
@@ -43,16 +43,13 @@ import scala.concurrent.duration.FiniteDuration
 
 object GearpumpMaterializer {
 
-  final val Debug = true
-
   final case class Edge(from: OutPort, to: InPort)
 
   final case class MaterializedValueSourceAttribute(mat: MaterializedValueNode) extends Attribute
 
   implicit def boolToAtomic(bool: Boolean): AtomicBoolean = new AtomicBoolean(bool)
 
-  def apply(strategy: Strategy)(implicit context: ActorRefFactory):
-  ExtendedActorMaterializer = {
+  def apply(strategy: Strategy)(implicit context: ActorRefFactory): ExtendedActorMaterializer = {
     val system = actorSystemOf(context)
 
     apply(ActorMaterializerSettings(
@@ -95,7 +92,7 @@ object GearpumpMaterializer {
       case _ =>
         throw new IllegalArgumentException(
           s"""
-             |  context must be a ActorSystem or ActorContext, got [${context.getClass.getName}]
+            |  context must be a ActorSystem or ActorContext, got [${context.getClass.getName}]
           """.stripMargin
         )
     }
@@ -166,19 +163,10 @@ class GearpumpMaterializer(override val system: ActorSystem,
     system.scheduler.scheduleOnce(delay, task)(executionContext)
 
   override def materialize[Mat](runnableGraph: Graph[ClosedShape, Mat]): Mat = {
-    val initialAttributes = Attributes(
-      Attributes.InputBuffer(
-        settings.initialInputBufferSize,
-        settings.maxInputBufferSize
-      ) ::
-      ActorAttributes.Dispatcher(settings.dispatcher) ::
-      ActorAttributes.SupervisionStrategy(settings.supervisionDecider) ::
-      Nil)
-
     val info = Fusing.aggressive(runnableGraph).module.info
     val graph = GGraph.empty[Module, Edge]
 
-    info.allModules.foreach(module => {
+    info.subModules.foreach(module => {
       if (module.isCopied) {
         val original = module.asInstanceOf[CopiedModule].copyOf
         graph.addVertex(original)
@@ -201,9 +189,7 @@ class GearpumpMaterializer(override val system: ActorSystem,
       }
     })
 
-    if(Debug) {
-      printGraph(graph)
-    }
+    printGraph(graph)
 
     val subGraphs = GraphPartitioner(strategy).partition(graph)
     val matValues = subGraphs.foldLeft(mutable.Map.empty[Module, Any]) { (map, subGraph) =>
@@ -226,7 +212,7 @@ class GearpumpMaterializer(override val system: ActorSystem,
       }
     }).toList
     val matModule = subGraphs.last.graph.topologicalOrderIterator.toList.last
-    val mat2 = resolveMaterialized(matModule.materializedValueComputation, matValues)
+    resolveMaterialized(matModule.materializedValueComputation, matValues)
     val rt = Some(mat).flatMap(any => {
       any match {
         case promise: Promise[_] =>
@@ -235,7 +221,7 @@ class GearpumpMaterializer(override val system: ActorSystem,
           Some(other)
       }
     })
-    rt.getOrElse(null).asInstanceOf[Mat]
+    rt.orNull.asInstanceOf[Mat]
   }
 
   private def printGraph(graph: GGraph[Module, Edge]): Unit = {
@@ -269,10 +255,24 @@ class GearpumpMaterializer(override val system: ActorSystem,
   }
 
   override def materialize[Mat](runnableGraph: Graph[ClosedShape, Mat],
-      subflowFuser: GraphInterpreterShell => ActorRef): Mat = {
+      initialAttributes: Attributes): Mat = {
+    materialize(runnableGraph)
+  }
+
+  override def materialize[Mat](runnableGraph: Graph[ClosedShape, Mat],
+      subflowFuser: (GraphInterpreterShell) => ActorRef): Mat = {
+    materialize(runnableGraph)
+  }
+
+  override def materialize[Mat](runnableGraph: Graph[ClosedShape, Mat],
+      subflowFuser: (GraphInterpreterShell) => ActorRef, initialAttributes: Attributes): Mat = {
     materialize(runnableGraph)
   }
 
+  override def makeLogger(logSource: Class[_]): LoggingAdapter = {
+    logger
+  }
+
   def shutdown: Unit = {
     subMaterializers.values.foreach(_.shutdown)
   }
@@ -288,5 +288,8 @@ class GearpumpMaterializer(override val system: ActorSystem,
     case Ignore =>
       ()
   }
+
+
+
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/GearpumpMaterializerSession.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/GearpumpMaterializerSession.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/GearpumpMaterializerSession.scala
index 871dcf8..8a869d2 100644
--- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/GearpumpMaterializerSession.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/GearpumpMaterializerSession.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test.scala
index 40cd556..52a45d9 100644
--- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test10.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test10.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test10.scala
index 71678c3..826cdcf 100644
--- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test10.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test10.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test11.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test11.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test11.scala
index b80398c..087c57d 100644
--- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test11.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test11.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -38,8 +38,8 @@ object Test11 extends AkkaApp with ArgumentsParser {
 
     implicit val system = ActorSystem("Test11", akkaConfig)
     implicit val materializer = GearpumpMaterializer()
-//    implicit val materializer =
-//    ActorMaterializer(ActorMaterializerSettings(system).withAutoFusing(false))
+    // implicit val materializer =
+    //   ActorMaterializer(ActorMaterializerSettings(system).withAutoFusing(false))
     implicit val ec = system.dispatcher
 
     val g = RunnableGraph.fromGraph(GraphDSL.create() {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test12.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test12.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test12.scala
index a9e8b08..b4f4bce 100644
--- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test12.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test12.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -37,9 +37,9 @@ object Test12 extends AkkaApp with ArgumentsParser{
     import scala.concurrent.duration._
 
     implicit val system = ActorSystem("Test12", akkaConfig)
-//    implicit val materializer = ActorMaterializer(
-//      ActorMaterializerSettings(system).withAutoFusing(false)
-//    )
+    // implicit val materializer = ActorMaterializer(
+    //   ActorMaterializerSettings(system).withAutoFusing(false)
+    //   )
     implicit val materializer = GearpumpMaterializer()
     implicit val ec = system.dispatcher
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test13.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test13.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test13.scala
index 984c861..2e036cb 100644
--- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test13.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test13.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test14.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test14.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test14.scala
index 0542f43..c436130 100644
--- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test14.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test14.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test15.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test15.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test15.scala
index c2f8d5f..f4e4dbd 100644
--- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test15.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test15.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test16.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test16.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test16.scala
index eb0b5c7..9691496 100644
--- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test16.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test16.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,12 +18,11 @@
 
 package org.apache.gearpump.akkastream.example
 
-import akka.actor.{Actor, ActorSystem, Props}
+import akka.actor.ActorSystem
 import org.apache.gearpump.akkastream.GearpumpMaterializer
 import org.apache.gearpump.akkastream.scaladsl.{GearSink, GearSource}
-import akka.stream.scaladsl.Sink
 import org.apache.gearpump.cluster.main.ArgumentsParser
-import org.apache.gearpump.streaming.dsl.{CollectionDataSource, LoggerSink}
+import org.apache.gearpump.streaming.dsl.scalaapi.{CollectionDataSource, LoggerSink}
 import org.apache.gearpump.util.AkkaApp
 
 import scala.concurrent.Await

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test2.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test2.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test2.scala
index 21f1b8c..a6049cd 100644
--- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test2.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test2.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test3.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test3.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test3.scala
index 0a51078..24faeb3 100644
--- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test3.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test3.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -24,7 +24,7 @@ import org.apache.gearpump.akkastream.GearpumpMaterializer
 import org.apache.gearpump.akkastream.scaladsl.GearSource
 import akka.stream.scaladsl.Sink
 import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption}
-import org.apache.gearpump.streaming.dsl.CollectionDataSource
+import org.apache.gearpump.streaming.dsl.scalaapi.CollectionDataSource
 import org.apache.gearpump.util.AkkaApp
 
 import scala.concurrent.Await

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test4.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test4.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test4.scala
index 3cb69ce..6a44a35 100644
--- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test4.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test4.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -23,7 +23,7 @@ import akka.stream.scaladsl.Source
 import org.apache.gearpump.akkastream.GearpumpMaterializer
 import org.apache.gearpump.akkastream.scaladsl.GearSink
 import org.apache.gearpump.cluster.main.ArgumentsParser
-import org.apache.gearpump.streaming.dsl.LoggerSink
+import org.apache.gearpump.streaming.dsl.scalaapi.LoggerSink
 import org.apache.gearpump.util.AkkaApp
 
 import scala.concurrent.Await

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test5.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test5.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test5.scala
index 72e21c7..ad87a97 100644
--- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test5.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test5.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test6.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test6.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test6.scala
index 6f54933..a525471 100644
--- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test6.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test6.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -24,7 +24,7 @@ import akka.stream.{ActorMaterializer, ActorMaterializerSettings}
 import org.apache.gearpump.akkastream.GearpumpMaterializer
 import org.apache.gearpump.akkastream.scaladsl.GearSource
 import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption}
-import org.apache.gearpump.streaming.dsl.CollectionDataSource
+import org.apache.gearpump.streaming.dsl.scalaapi.CollectionDataSource
 import org.apache.gearpump.util.AkkaApp
 
 import scala.concurrent.Await

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test7.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test7.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test7.scala
index be91610..8c837af 100644
--- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test7.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test7.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test8.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test8.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test8.scala
index 434aa33..ad2ac61 100644
--- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test8.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test8.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test9.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test9.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test9.scala
index 63f9e2d..66414e0 100644
--- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test9.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test9.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/WikipediaApp.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/WikipediaApp.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/WikipediaApp.scala
index 830f278..2a1e7ff 100644
--- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/WikipediaApp.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/WikipediaApp.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/GraphPartitioner.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/GraphPartitioner.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/GraphPartitioner.scala
index c1e95bb..f7919c0 100644
--- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/GraphPartitioner.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/GraphPartitioner.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,