You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by mbalassi <gi...@git.apache.org> on 2014/07/17 12:09:55 UTC

[GitHub] incubator-flink pull request: Streaming addon prototype

GitHub user mbalassi opened a pull request:

    https://github.com/apache/incubator-flink/pull/72

    Streaming addon prototype

    Support for low latency streaming jobs.
    
    The API provided is fairly similar to the batch one, for example see our [WordCount](https://github.com/mbalassi/incubator-flink/blob/streaming-ready/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCountLocal.java).
    
    Legal issues were extensively discussed on the mailing list. 
    
    Further work is needed for fault-tolerance, state management and cluster performance optimization.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/mbalassi/incubator-flink streaming-ready

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/incubator-flink/pull/72.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #72
    
----
commit 64580abe83c284bc82407f8079d18ad2caf9040e
Author: Yingjun Wu <wu...@gmail.com>
Date:   2014-07-14T14:29:11Z

    [streaming] minor bug fixed

commit ed35783978f327667708c61984479e164ca67511
Author: gyfora <gy...@gmail.com>
Date:   2014-07-14T14:29:11Z

    [streaming] basic RabbitMQ tolopolgy add to test future support for RabbitMQ

commit 9d6f69505d8bc67e9fe4a8601cc0653068424778
Author: gaborhermann <re...@gmail.com>
Date:   2014-07-14T14:29:11Z

    [streaming] FaultTolerance set via JobGraphBuilder config

commit 37adc7c7ed1e08f919dad077ca206e22817ee5d7
Author: gaborhermann <re...@gmail.com>
Date:   2014-07-14T14:29:11Z

    [streaming] Merged RMQTopology

commit 27dcac7a859e5037222c45ea18ecc7f7e8204a4b
Author: gyfora <gy...@gmail.com>
Date:   2014-07-14T14:29:11Z

    [streaming] added support for udf objects in jobgraphbuilder

commit 05ef21e6e64b589e452a7b8e82bbca72bc7f809a
Author: gyfora <gy...@gmail.com>
Date:   2014-07-14T14:29:11Z

    [streaming] JavaDoc and api update

commit f5c1eb0cc535da69f29544478859c5c878f6db20
Author: ghermann <re...@gmail.com>
Date:   2014-07-14T14:29:11Z

    [streaming] Fixed double logging, added FieldTypeMismatchException

commit dbc3be1971f1c53e3da2cc9c8030a32fbea4b849
Author: gyfora <gy...@gmail.com>
Date:   2014-07-14T14:29:12Z

    [streaming] api refactor

commit 7c668966b7e4d793992a3ac7e43f6ee4584aa977
Author: Yingjun Wu <wu...@gmail.com>
Date:   2014-07-14T14:29:12Z

    [streaming] add clear function for StreamRecord, add framework of checkpointer.

commit e795794b9390d865d12cc944771c0670903405cc
Author: Márton Balassi <ba...@gmail.com>
Date:   2014-07-14T14:29:12Z

    [streaming] License fix

commit 01b8e3ebd5a2c73b6d98b3c0c17d1fee4ee6abff
Author: Márton Balassi <ba...@gmail.com>
Date:   2014-07-14T14:29:12Z

    [streaming] Version set to 0.5

commit 17f6e9ae40762a963e4b7675722b7a6835c8f45f
Author: gyfora <gy...@gmail.com>
Date:   2014-07-14T14:29:12Z

    [streaming] api cleanup

commit 39f4e64ffa94e35e239dde7586258872ba9a31b4
Author: Yingjun Wu <wu...@gmail.com>
Date:   2014-07-14T14:29:12Z

    [streaming] add frameworks for iterative computation.

commit f224993173bbd8974ae2b7b35f367fe168e22429
Author: Yingjun Wu <wu...@gmail.com>
Date:   2014-07-14T14:29:12Z

    [streaming] add stream join and stream window join example, refactor window state

commit 8fb83534d14a3c1de29b2fd6a6ac0ce851d4eb0a
Author: Yingjun Wu <wu...@gmail.com>
Date:   2014-07-14T14:29:12Z

    [streaming] add license & implement iterative processing

commit c67db12c2aa83745fc29e4a71f6b66b38b322719
Author: Márton Balassi <ba...@gmail.com>
Date:   2014-07-14T14:29:12Z

    [streaming] Version set to 0.5

commit d8271fa429686b019352059604ba92d325a5dcf2
Author: Yingjun Wu <wu...@gmail.com>
Date:   2014-07-14T14:29:12Z

    [streaming] add kafka topology

commit 693e338949beeee6ebb462e83c6bcf549c1cf924
Author: ghermann <re...@gmail.com>
Date:   2014-07-14T14:29:12Z

    [streaming] Replaced List<RecordReader> with UnionRecordReader

commit c8f2e151b4f874d88d619ab76a9d8a6313340c3d
Author: ghermann <re...@gmail.com>
Date:   2014-07-14T14:29:13Z

    [streaming] Refactored StreamComponentHelper and StreamComponentTest

commit 697a747f9c832b395b94c2b044a0ec4b4607eaf9
Author: ghermann <re...@gmail.com>
Date:   2014-07-14T14:29:13Z

    [streaming] Refactored logging to avoid expensive String concatenation

commit d0943c1e5f208cf6a980efc472ba2355979391a8
Author: gyfora <gy...@gmail.com>
Date:   2014-07-14T14:29:13Z

    [streaming] branch added for new api

commit f9b3c12357c3b76488e39cced160b06fd6b20193
Author: gyfora <gy...@gmail.com>
Date:   2014-07-14T14:29:13Z

    [streaming] StreamCollector added

commit baca005ee950f919c5ee379167f2bcc482209b7f
Author: gyfora <gy...@gmail.com>
Date:   2014-07-14T14:29:13Z

    [streaming] streamrecord reworked

commit eb35e02e15d515ca8505423cee62a562d4aa4e36
Author: ghermann <re...@gmail.com>
Date:   2014-07-14T14:29:13Z

    [streaming] datastream added

commit 292ca5f3f1cc024f72b7e194e8c69634a3071628
Author: Márton Balassi <ba...@gmail.com>
Date:   2014-07-14T14:29:13Z

    [streaming] Project version set to 0.1

commit 3050b6f1d0aa4fbd1c77dcaea98952831112139b
Author: Márton Balassi <ba...@gmail.com>
Date:   2014-07-14T14:29:13Z

    [streaming] Project version set to 0.2-SNAPSHOT

commit 019ff793f949fbf9fcafa52e0e3b6885374d3e27
Author: gyfora <gy...@gmail.com>
Date:   2014-07-14T14:29:13Z

    [streaming] array based streamrecord added

commit cea6d66511046f3d1d2f4947d1c73ede5a33719b
Author: gyfora <gy...@gmail.com>
Date:   2014-07-14T14:29:13Z

    [streaming] license added

commit 5e510e3dca98d2f426cb9c1409f9ce8f08851037
Author: gyfora <gy...@gmail.com>
Date:   2014-07-14T14:29:13Z

    [streaming] Standard and Union StreamRecordReader added for improved serialization, not yet added to StreamTask and Sink

commit 27a8f6ae7a78a1d57c8a14e74fcf0f2293bfe3c3
Author: gyfora <gy...@gmail.com>
Date:   2014-07-14T14:29:14Z

    [streaming] created abstract streamrecord class + added StreamRecordReader and UnionStreamRecordReader to streamcomponenthelper

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Streaming addon prototype

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/72#discussion_r15466098
  
    --- Diff: flink-addons/flink-streaming/LICENSE ---
    @@ -0,0 +1,191 @@
    +Apache License
    +Version 2.0, January 2004
    --- End diff --
    
    This file is also not neccesary


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Streaming addon prototype

Posted by mbalassi <gi...@git.apache.org>.
Github user mbalassi commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/72#discussion_r15576614
  
    --- Diff: flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/DataStream.java ---
    @@ -0,0 +1,700 @@
    +/**
    + *
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + *
    + */
    +
    +package org.apache.flink.streaming.api;
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +import org.apache.flink.api.java.functions.FilterFunction;
    +import org.apache.flink.api.java.functions.FlatMapFunction;
    +import org.apache.flink.api.java.functions.GroupReduceFunction;
    +import org.apache.flink.api.java.functions.MapFunction;
    +import org.apache.flink.api.java.tuple.Tuple;
    +import org.apache.flink.streaming.api.StreamExecutionEnvironment.ConnectionType;
    +import org.apache.flink.streaming.api.collector.OutputSelector;
    +import org.apache.flink.streaming.api.function.co.CoMapFunction;
    +import org.apache.flink.streaming.api.function.sink.SinkFunction;
    +import org.apache.flink.streaming.api.function.sink.WriteFormatAsCsv;
    +import org.apache.flink.streaming.api.function.sink.WriteFormatAsText;
    +import org.apache.flink.streaming.api.invokable.operator.BatchReduceInvokable;
    +import org.apache.flink.streaming.api.invokable.operator.FilterInvokable;
    +import org.apache.flink.streaming.api.invokable.operator.FlatMapInvokable;
    +import org.apache.flink.streaming.api.invokable.operator.MapInvokable;
    +import org.apache.flink.streaming.api.invokable.operator.WindowReduceInvokable;
    +import org.apache.flink.streaming.api.invokable.operator.co.CoMapInvokable;
    +
    +/**
    + * A DataStream represents a stream of elements of the same type. A DataStream
    + * can be transformed into another DataStream by applying a transformation as
    + * for example
    + * <ul>
    + * <li>{@link DataStream#map},</li>
    + * <li>{@link DataStream#filter}, or</li>
    + * <li>{@link DataStream#batchReduce}.</li>
    + * </ul>
    + * 
    + * @param <T>
    + *            The type of the DataStream, i.e., the type of the elements of the
    + *            DataStream.
    + */
    +public class DataStream<T extends Tuple> {
    +
    +	protected static Integer counter = 0;
    +	protected final StreamExecutionEnvironment environment;
    +	protected String id;
    +	protected int degreeOfParallelism;
    +	protected String userDefinedName;
    +	protected OutputSelector<T> outputSelector;
    +	protected List<String> connectIDs;
    +	protected List<ConnectionType> ctypes;
    +	protected List<Integer> cparams;
    +	protected boolean iterationflag;
    +	protected Integer iterationID;
    +
    +	/**
    +	 * Create a new {@link DataStream} in the given execution environment with
    +	 * partitioning set to shuffle by default.
    +	 * 
    +	 * @param environment
    +	 *            StreamExecutionEnvironment
    +	 * @param operatorType
    +	 *            The type of the operator in the component
    +	 */
    +	protected DataStream(StreamExecutionEnvironment environment, String operatorType) {
    +		if (environment == null) {
    +			throw new NullPointerException("context is null");
    +		}
    +
    +		// TODO add name based on component number an preferable sequential id
    +		counter++;
    +		this.id = operatorType + "-" + counter.toString();
    +		this.environment = environment;
    +		this.degreeOfParallelism = environment.getDegreeOfParallelism();
    +		initConnections();
    +
    +	}
    +
    +	/**
    +	 * Create a new DataStream by creating a copy of another DataStream
    +	 * 
    +	 * @param dataStream
    +	 *            The DataStream that will be copied.
    +	 */
    +	protected DataStream(DataStream<T> dataStream) {
    +		this.environment = dataStream.environment;
    +		this.id = dataStream.id;
    +		this.degreeOfParallelism = dataStream.degreeOfParallelism;
    +		this.userDefinedName = dataStream.userDefinedName;
    +		this.outputSelector = dataStream.outputSelector;
    +		this.connectIDs = new ArrayList<String>(dataStream.connectIDs);
    +		this.ctypes = new ArrayList<StreamExecutionEnvironment.ConnectionType>(dataStream.ctypes);
    +		this.cparams = new ArrayList<Integer>(dataStream.cparams);
    +		this.iterationflag = dataStream.iterationflag;
    +		this.iterationID = dataStream.iterationID;
    +	}
    +
    +	/**
    +	 * Initialize the connection and partitioning among the connected
    +	 * {@link DataStream}s.
    +	 */
    +	private void initConnections() {
    +		connectIDs = new ArrayList<String>();
    +		connectIDs.add(getId());
    +		ctypes = new ArrayList<StreamExecutionEnvironment.ConnectionType>();
    +		ctypes.add(ConnectionType.SHUFFLE);
    +		cparams = new ArrayList<Integer>();
    +		cparams.add(0);
    +
    +	}
    +
    +	/**
    +	 * Returns the ID of the {@link DataStream}.
    +	 * 
    +	 * @return ID of the DataStream
    +	 */
    +	public String getId() {
    +		return id;
    +	}
    +
    +	/**
    +	 * Sets the mutability of the operator represented by the DataStream. If the
    +	 * operator is set to mutable, the tuples received in the user defined
    +	 * functions, will be reused after the function call. Setting an operator to
    +	 * mutable greatly reduces garbage collection overhead and thus scalability.
    +	 * 
    +	 * @param isMutable
    +	 *            The mutability of the operator.
    +	 * @return The DataStream with mutability set.
    +	 */
    +	public DataStream<T> setMutability(boolean isMutable) {
    +		environment.setMutability(this, isMutable);
    +		return this;
    +	}
    +
    +	/**
    +	 * Sets the degree of parallelism for this operator. The degree must be 1 or
    +	 * more.
    +	 * 
    +	 * @param dop
    +	 *            The degree of parallelism for this operator.
    +	 * @return The operator with set degree of parallelism.
    +	 */
    +	public DataStream<T> setParallelism(int dop) {
    +		if (dop < 1) {
    +			throw new IllegalArgumentException("The parallelism of an operator must be at least 1.");
    +		}
    +		this.degreeOfParallelism = dop;
    +
    +		environment.setOperatorParallelism(this);
    +
    +		return new DataStream<T>(this);
    +
    +	}
    +
    +	/**
    +	 * Gets the degree of parallelism for this operator.
    +	 * 
    +	 * @return The parallelism set for this operator.
    +	 */
    +	public int getParallelism() {
    +		return this.degreeOfParallelism;
    +	}
    +
    +	/**
    +	 * Gives the data transformation(vertex) a user defined name in order to use
    +	 * at directed outputs. The {@link OutputSelector} of the input vertex
    +	 * should use this name for directed emits.
    +	 * 
    +	 * @param name
    +	 *            The name to set
    +	 * @return The named DataStream.
    +	 */
    +	public DataStream<T> name(String name) {
    +		// copy?
    +		if (name == "") {
    --- End diff --
    
    Sure, pesky mistake. Thanks. :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Streaming addon prototype

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/incubator-flink/pull/72#issuecomment-52489006
  
    Now that all legal requirements are cleared and we are in past-release-candidate mode (again merging stuff), I will add this to the main line...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Streaming addon prototype

Posted by mbalassi <gi...@git.apache.org>.
Github user mbalassi commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/72#discussion_r15576424
  
    --- Diff: flink-addons/flink-streaming/.travis.yml ---
    @@ -0,0 +1,11 @@
    +
    +language: java
    --- End diff --
    
    Deleted the unnecessary flies.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Streaming addon prototype

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/72#discussion_r15466173
  
    --- Diff: flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTopology.java ---
    @@ -0,0 +1,98 @@
    +/**
    + *
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + *
    + */
    +
    +package org.apache.flink.streaming.connectors.kafka;
    +
    +import org.apache.flink.streaming.api.DataStream;
    +import org.apache.flink.streaming.api.StreamExecutionEnvironment;
    +import org.apache.flink.streaming.api.function.source.SourceFunction;
    +import org.apache.flink.api.java.tuple.Tuple1;
    +import org.apache.flink.util.Collector;
    +
    +public class KafkaTopology {
    +
    +	public static final class MySource extends SourceFunction<Tuple1<String>> {
    +		private static final long serialVersionUID = 1L;
    +
    +		@Override
    +		public void invoke(Collector<Tuple1<String>> collector) throws Exception {
    +			// TODO Auto-generated method stub
    --- End diff --
    
    unnecessary


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Streaming addon prototype

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/72#discussion_r15466695
  
    --- Diff: flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java ---
    @@ -0,0 +1,825 @@
    +/**
    + *
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + *
    + */
    +
    +package org.apache.flink.streaming.api;
    +
    +import java.io.ByteArrayOutputStream;
    +import java.io.ObjectOutputStream;
    +import java.io.Serializable;
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.apache.commons.lang3.SerializationUtils;
    +import org.apache.commons.logging.Log;
    +import org.apache.commons.logging.LogFactory;
    +import org.apache.flink.api.java.tuple.Tuple;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.runtime.io.network.channels.ChannelType;
    +import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
    +import org.apache.flink.runtime.jobgraph.DistributionPattern;
    +import org.apache.flink.runtime.jobgraph.JobGraph;
    +import org.apache.flink.runtime.jobgraph.JobGraphDefinitionException;
    +import org.apache.flink.runtime.jobgraph.JobInputVertex;
    +import org.apache.flink.runtime.jobgraph.JobOutputVertex;
    +import org.apache.flink.runtime.jobgraph.JobTaskVertex;
    +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
    +import org.apache.flink.streaming.api.collector.OutputSelector;
    +import org.apache.flink.streaming.api.invokable.SinkInvokable;
    +import org.apache.flink.streaming.api.invokable.StreamComponentInvokable;
    +import org.apache.flink.streaming.api.invokable.UserSourceInvokable;
    +import org.apache.flink.streaming.api.invokable.UserTaskInvokable;
    +import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable;
    +import org.apache.flink.streaming.api.streamcomponent.CoStreamTask;
    +import org.apache.flink.streaming.api.streamcomponent.StreamIterationSink;
    +import org.apache.flink.streaming.api.streamcomponent.StreamIterationSource;
    +import org.apache.flink.streaming.api.streamcomponent.StreamSink;
    +import org.apache.flink.streaming.api.streamcomponent.StreamSource;
    +import org.apache.flink.streaming.api.streamcomponent.StreamTask;
    +import org.apache.flink.streaming.partitioner.BroadcastPartitioner;
    +import org.apache.flink.streaming.partitioner.DistributePartitioner;
    +import org.apache.flink.streaming.partitioner.FieldsPartitioner;
    +import org.apache.flink.streaming.partitioner.ForwardPartitioner;
    +import org.apache.flink.streaming.partitioner.GlobalPartitioner;
    +import org.apache.flink.streaming.partitioner.ShufflePartitioner;
    +import org.apache.flink.streaming.partitioner.StreamPartitioner;
    +
    +/**
    + * Object for building Flink stream processing job graphs
    + */
    +public class JobGraphBuilder {
    +
    +	private static final Log LOG = LogFactory.getLog(JobGraphBuilder.class);
    +	private final JobGraph jobGraph;
    +
    +	// Graph attributes
    +	private Map<String, AbstractJobVertex> components;
    +	private Map<String, Integer> componentParallelism;
    +	private Map<String, ArrayList<String>> outEdgeList;
    +	private Map<String, ArrayList<Integer>> outEdgeType;
    +	private Map<String, Boolean> mutability;
    +	private Map<String, List<String>> inEdgeList;
    +	private Map<String, List<StreamPartitioner<? extends Tuple>>> connectionTypes;
    +	private Map<String, String> userDefinedNames;
    +	private Map<String, String> operatorNames;
    +	private Map<String, StreamComponentInvokable> invokableObjects;
    +	private Map<String, byte[]> serializedFunctions;
    +	private Map<String, byte[]> outputSelectors;
    +	private Map<String, Class<? extends AbstractInvokable>> componentClasses;
    +	private Map<String, String> iterationIds;
    +	private Map<String, String> iterationHeadNames;
    +	private Map<String, Integer> iterationTailCount;
    +
    +	private String maxParallelismVertexName;
    +	private int maxParallelism;
    +
    +	/**
    +	 * Creates an new {@link JobGraph} with the given name. A JobGraph is a DAG
    +	 * and consists of sources, tasks (intermediate vertices) and sinks. A
    +	 * JobGraph must contain at least a source and a sink.
    +	 * 
    +	 * @param jobGraphName
    +	 *            Name of the JobGraph
    +	 */
    +	public JobGraphBuilder(String jobGraphName) {
    +
    +		jobGraph = new JobGraph(jobGraphName);
    +
    +		components = new HashMap<String, AbstractJobVertex>();
    +		componentParallelism = new HashMap<String, Integer>();
    +		outEdgeList = new HashMap<String, ArrayList<String>>();
    +		outEdgeType = new HashMap<String, ArrayList<Integer>>();
    +		mutability = new HashMap<String, Boolean>();
    +		inEdgeList = new HashMap<String, List<String>>();
    +		connectionTypes = new HashMap<String, List<StreamPartitioner<? extends Tuple>>>();
    +		userDefinedNames = new HashMap<String, String>();
    +		operatorNames = new HashMap<String, String>();
    +		invokableObjects = new HashMap<String, StreamComponentInvokable>();
    +		serializedFunctions = new HashMap<String, byte[]>();
    +		outputSelectors = new HashMap<String, byte[]>();
    +		componentClasses = new HashMap<String, Class<? extends AbstractInvokable>>();
    +		iterationIds = new HashMap<String, String>();
    +		iterationHeadNames = new HashMap<String, String>();
    +		iterationTailCount = new HashMap<String, Integer>();
    +
    +		maxParallelismVertexName = "";
    +		maxParallelism = 0;
    +		if (LOG.isDebugEnabled()) {
    +			LOG.debug("JobGraph created");
    +		}
    +	}
    +
    +	/**
    +	 * Adds source to the JobGraph with the given parameters
    +	 * 
    +	 * @param componentName
    +	 *            Name of the component
    +	 * @param InvokableObject
    +	 *            User defined operator
    +	 * @param operatorName
    +	 *            Operator type
    +	 * @param serializedFunction
    +	 *            Serialized udf
    +	 * @param parallelism
    +	 *            Number of parallel instances created
    +	 */
    +	public void addSource(String componentName,
    +			UserSourceInvokable<? extends Tuple> InvokableObject, String operatorName,
    +			byte[] serializedFunction, int parallelism) {
    +
    +		addComponent(componentName, StreamSource.class, InvokableObject, operatorName,
    +				serializedFunction, parallelism);
    +
    +		if (LOG.isDebugEnabled()) {
    +			LOG.debug("SOURCE: " + componentName);
    +		}
    +	}
    +
    +	/**
    +	 * Adds a source to the iteration head to the {@link JobGraph}. The iterated
    +	 * tuples will be fed from this component back to the graph.
    +	 * 
    +	 * @param componentName
    +	 *            Name of the component
    +	 * @param iterationHead
    +	 *            Id of the iteration head
    +	 * @param iterationID
    +	 *            ID of iteration for multiple iterations
    +	 * @param parallelism
    +	 *            Number of parallel instances created
    +	 */
    +	public void addIterationSource(String componentName, String iterationHead, String iterationID,
    +			int parallelism) {
    +
    +		addComponent(componentName, StreamIterationSource.class, null, null, null, parallelism);
    +		iterationIds.put(componentName, iterationID);
    +		iterationHeadNames.put(iterationID, componentName);
    +
    +		setBytesFrom(iterationHead, componentName);
    +
    +		setEdge(componentName, iterationHead,
    +				connectionTypes.get(inEdgeList.get(iterationHead).get(0)).get(0), 0);
    +
    +		if (LOG.isDebugEnabled()) {
    +			LOG.debug("ITERATION SOURCE: " + componentName);
    +		}
    +	}
    +
    +	/**
    +	 * Adds a task to the JobGraph with the given parameters
    +	 * 
    +	 * @param componentName
    +	 *            Name of the component
    +	 * @param taskInvokableObject
    +	 *            User defined operator
    +	 * @param operatorName
    +	 *            Operator type
    +	 * @param serializedFunction
    +	 *            Serialized udf
    +	 * @param parallelism
    +	 *            Number of parallel instances created
    +	 */
    +	public <IN extends Tuple, OUT extends Tuple> void addTask(String componentName,
    +			UserTaskInvokable<IN, OUT> taskInvokableObject, String operatorName,
    +			byte[] serializedFunction, int parallelism) {
    +
    +		addComponent(componentName, StreamTask.class, taskInvokableObject, operatorName,
    +				serializedFunction, parallelism);
    +
    +		if (LOG.isDebugEnabled()) {
    +			LOG.debug("TASK: " + componentName);
    +		}
    +	}
    +
    +	public <IN1 extends Tuple, IN2 extends Tuple, OUT extends Tuple> void addCoTask(
    +			String componentName, CoInvokable<IN1, IN2, OUT> taskInvokableObject,
    +			String operatorName, byte[] serializedFunction, int parallelism) {
    +
    +		addComponent(componentName, CoStreamTask.class, taskInvokableObject, operatorName,
    +				serializedFunction, parallelism);
    +
    +		if (LOG.isDebugEnabled()) {
    +			LOG.debug("CO-TASK: " + componentName);
    +		}
    +	}
    +
    +	/**
    +	 * Adds sink to the JobGraph with the given parameters
    +	 * 
    +	 * @param componentName
    +	 *            Name of the component
    +	 * @param InvokableObject
    +	 *            User defined operator
    +	 * @param operatorName
    +	 *            Operator type
    +	 * @param serializedFunction
    +	 *            Serialized udf
    +	 * @param parallelism
    +	 *            Number of parallel instances created
    +	 */
    +	public void addSink(String componentName, SinkInvokable<? extends Tuple> InvokableObject,
    +			String operatorName, byte[] serializedFunction, int parallelism) {
    +
    +		addComponent(componentName, StreamSink.class, InvokableObject, operatorName,
    +				serializedFunction, parallelism);
    +
    +		if (LOG.isDebugEnabled()) {
    +			LOG.debug("SINK: " + componentName);
    +		}
    +
    +	}
    +
    +	/**
    +	 * Adds a sink to an iteration tail to the {@link JobGraph}. The tuples
    +	 * intended to be iterated will be sent to this sink from the iteration
    +	 * head.
    +	 * 
    +	 * @param componentName
    +	 *            Name of the component
    +	 * @param iterationTail
    +	 *            Id of the iteration tail
    +	 * @param iterationID
    +	 *            ID of iteration for mulitple iterations
    +	 * @param parallelism
    +	 *            Number of parallel instances created
    +	 * @param directName
    +	 *            Id of the output direction
    +	 */
    +	public void addIterationSink(String componentName, String iterationTail, String iterationID,
    +			int parallelism, String directName) {
    +
    +		addComponent(componentName, StreamIterationSink.class, null, null, null, parallelism);
    +		iterationIds.put(componentName, iterationID);
    +		setBytesFrom(iterationTail, componentName);
    +
    +		if (directName != null) {
    +			setUserDefinedName(componentName, directName);
    +		} else {
    +			setUserDefinedName(componentName, "iterate");
    +		}
    +
    +		if (LOG.isDebugEnabled()) {
    +			LOG.debug("ITERATION SINK: " + componentName);
    +		}
    +
    +	}
    +
    +	/**
    +	 * Sets component parameters in the JobGraph
    +	 * 
    +	 * @param componentName
    +	 *            Name of the component
    +	 * @param componentClass
    +	 *            The class of the vertex
    +	 * @param invokableObject
    +	 *            The user defined invokable object
    +	 * @param operatorName
    +	 *            Type of the user defined operator
    +	 * @param serializedFunction
    +	 *            Serialized operator
    +	 * @param parallelism
    +	 *            Number of parallel instances created
    +	 */
    +	private void addComponent(String componentName,
    +			Class<? extends AbstractInvokable> componentClass,
    +			StreamComponentInvokable invokableObject, String operatorName,
    +			byte[] serializedFunction, int parallelism) {
    +
    +		componentClasses.put(componentName, componentClass);
    +		setParallelism(componentName, parallelism);
    +		mutability.put(componentName, false);
    +		invokableObjects.put(componentName, invokableObject);
    +		operatorNames.put(componentName, operatorName);
    +		serializedFunctions.put(componentName, serializedFunction);
    +		outEdgeList.put(componentName, new ArrayList<String>());
    +		outEdgeType.put(componentName, new ArrayList<Integer>());
    +		inEdgeList.put(componentName, new ArrayList<String>());
    +		connectionTypes.put(componentName, new ArrayList<StreamPartitioner<? extends Tuple>>());
    +		iterationTailCount.put(componentName, 0);
    +	}
    +
    +	/**
    +	 * Creates an {@link AbstractJobVertex} in the {@link JobGraph} and sets its
    +	 * config parameters using the ones set previously.
    +	 * 
    +	 * @param componentName
    +	 *            Name of the component for which the vertex will be created.
    +	 */
    +	private void createVertex(String componentName) {
    +
    +		// Get vertex attributes
    +		Class<? extends AbstractInvokable> componentClass = componentClasses.get(componentName);
    +		StreamComponentInvokable invokableObject = invokableObjects.get(componentName);
    +		String operatorName = operatorNames.get(componentName);
    +		byte[] serializedFunction = serializedFunctions.get(componentName);
    +		int parallelism = componentParallelism.get(componentName);
    +		byte[] outputSelector = outputSelectors.get(componentName);
    +		String userDefinedName = userDefinedNames.get(componentName);
    +
    +		// Create vertex object
    +		AbstractJobVertex component = null;
    +		if (componentClass.equals(StreamSource.class)
    +				|| componentClass.equals(StreamIterationSource.class)) {
    +			component = new JobInputVertex(componentName, this.jobGraph);
    +		} else if (componentClass.equals(StreamTask.class)
    +				|| componentClass.equals(CoStreamTask.class)) {
    +			component = new JobTaskVertex(componentName, this.jobGraph);
    +		} else if (componentClass.equals(StreamSink.class)
    +				|| componentClass.equals(StreamIterationSink.class)) {
    +			component = new JobOutputVertex(componentName, this.jobGraph);
    +		}
    +
    +		component.setInvokableClass(componentClass);
    +		component.setNumberOfSubtasks(parallelism);
    +		if (LOG.isDebugEnabled()) {
    +			LOG.debug("Parallelism set: " + parallelism + " for " + componentName);
    +		}
    +
    +		Configuration config = component.getConfiguration();
    +
    +		config.setBoolean("isMutable", mutability.get(componentName));
    +
    +		// Set vertex config
    +		if (invokableObject != null) {
    +			config.setClass("userfunction", invokableObject.getClass());
    +			addSerializedObject(invokableObject, config);
    +		}
    +		config.setString("componentName", componentName);
    +		if (serializedFunction != null) {
    +			config.setBytes("operator", serializedFunction);
    +			config.setString("operatorName", operatorName);
    +		}
    +
    +		if (userDefinedName != null) {
    +			config.setString("userDefinedName", userDefinedName);
    +		}
    +
    +		if (outputSelector != null) {
    +			config.setBoolean("directedEmit", true);
    +			config.setBytes("outputSelector", outputSelector);
    +		}
    +
    +		if (componentClass.equals(StreamIterationSource.class)
    +				|| componentClass.equals(StreamIterationSink.class)) {
    +			config.setString("iteration-id", iterationIds.get(componentName));
    +		}
    +
    +		components.put(componentName, component);
    +
    +		if (parallelism > maxParallelism) {
    +			maxParallelism = parallelism;
    +			maxParallelismVertexName = componentName;
    +		}
    +	}
    +
    +	/**
    +	 * Adds serialized invokable object to the JobVertex configuration
    +	 * 
    +	 * @param invokableObject
    +	 *            Invokable object to serialize
    +	 * @param config
    +	 *            JobVertex configuration to which the serialized invokable will
    +	 *            be added
    +	 */
    +	private void addSerializedObject(Serializable invokableObject, Configuration config) {
    +
    +		ByteArrayOutputStream baos = null;
    +		ObjectOutputStream oos = null;
    +		try {
    +			baos = new ByteArrayOutputStream();
    +
    +			oos = new ObjectOutputStream(baos);
    +
    +			oos.writeObject(invokableObject);
    +
    +			config.setBytes("serializedudf", baos.toByteArray());
    --- End diff --
    
    I think its better to use a constant string here (final static String ...)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Streaming addon prototype

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/incubator-flink/pull/72


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Streaming addon prototype

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/72#discussion_r15466188
  
    --- Diff: flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTopology.java ---
    @@ -0,0 +1,98 @@
    +/**
    + *
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + *
    + */
    +
    +package org.apache.flink.streaming.connectors.kafka;
    +
    +import org.apache.flink.streaming.api.DataStream;
    +import org.apache.flink.streaming.api.StreamExecutionEnvironment;
    +import org.apache.flink.streaming.api.function.source.SourceFunction;
    +import org.apache.flink.api.java.tuple.Tuple1;
    +import org.apache.flink.util.Collector;
    +
    +public class KafkaTopology {
    +
    +	public static final class MySource extends SourceFunction<Tuple1<String>> {
    +		private static final long serialVersionUID = 1L;
    +
    +		@Override
    +		public void invoke(Collector<Tuple1<String>> collector) throws Exception {
    +			// TODO Auto-generated method stub
    +			for (int i = 0; i < 10; i++) {
    +				collector.collect(new Tuple1<String>(Integer.toString(i)));
    +			}
    +			collector.collect(new Tuple1<String>("q"));
    +
    +		}
    +	}
    +
    +	public static final class MyKafkaSource extends KafkaSource<Tuple1<String>, String> {
    +		private static final long serialVersionUID = 1L;
    +
    +		public MyKafkaSource(String zkQuorum, String groupId, String topicId, int numThreads) {
    +			super(zkQuorum, groupId, topicId, numThreads);
    +			// TODO Auto-generated constructor stub
    --- End diff --
    
    unnecessary


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Streaming addon prototype

Posted by mbalassi <gi...@git.apache.org>.
Github user mbalassi commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/72#discussion_r15576637
  
    --- Diff: flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java ---
    @@ -0,0 +1,825 @@
    +/**
    + *
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + *
    + */
    +
    +package org.apache.flink.streaming.api;
    +
    +import java.io.ByteArrayOutputStream;
    +import java.io.ObjectOutputStream;
    +import java.io.Serializable;
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.apache.commons.lang3.SerializationUtils;
    +import org.apache.commons.logging.Log;
    +import org.apache.commons.logging.LogFactory;
    +import org.apache.flink.api.java.tuple.Tuple;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.runtime.io.network.channels.ChannelType;
    +import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
    +import org.apache.flink.runtime.jobgraph.DistributionPattern;
    +import org.apache.flink.runtime.jobgraph.JobGraph;
    +import org.apache.flink.runtime.jobgraph.JobGraphDefinitionException;
    +import org.apache.flink.runtime.jobgraph.JobInputVertex;
    +import org.apache.flink.runtime.jobgraph.JobOutputVertex;
    +import org.apache.flink.runtime.jobgraph.JobTaskVertex;
    +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
    +import org.apache.flink.streaming.api.collector.OutputSelector;
    +import org.apache.flink.streaming.api.invokable.SinkInvokable;
    +import org.apache.flink.streaming.api.invokable.StreamComponentInvokable;
    +import org.apache.flink.streaming.api.invokable.UserSourceInvokable;
    +import org.apache.flink.streaming.api.invokable.UserTaskInvokable;
    +import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable;
    +import org.apache.flink.streaming.api.streamcomponent.CoStreamTask;
    +import org.apache.flink.streaming.api.streamcomponent.StreamIterationSink;
    +import org.apache.flink.streaming.api.streamcomponent.StreamIterationSource;
    +import org.apache.flink.streaming.api.streamcomponent.StreamSink;
    +import org.apache.flink.streaming.api.streamcomponent.StreamSource;
    +import org.apache.flink.streaming.api.streamcomponent.StreamTask;
    +import org.apache.flink.streaming.partitioner.BroadcastPartitioner;
    +import org.apache.flink.streaming.partitioner.DistributePartitioner;
    +import org.apache.flink.streaming.partitioner.FieldsPartitioner;
    +import org.apache.flink.streaming.partitioner.ForwardPartitioner;
    +import org.apache.flink.streaming.partitioner.GlobalPartitioner;
    +import org.apache.flink.streaming.partitioner.ShufflePartitioner;
    +import org.apache.flink.streaming.partitioner.StreamPartitioner;
    +
    +/**
    + * Object for building Flink stream processing job graphs
    + */
    +public class JobGraphBuilder {
    +
    +	private static final Log LOG = LogFactory.getLog(JobGraphBuilder.class);
    +	private final JobGraph jobGraph;
    +
    +	// Graph attributes
    +	private Map<String, AbstractJobVertex> components;
    +	private Map<String, Integer> componentParallelism;
    +	private Map<String, ArrayList<String>> outEdgeList;
    +	private Map<String, ArrayList<Integer>> outEdgeType;
    +	private Map<String, Boolean> mutability;
    +	private Map<String, List<String>> inEdgeList;
    +	private Map<String, List<StreamPartitioner<? extends Tuple>>> connectionTypes;
    +	private Map<String, String> userDefinedNames;
    +	private Map<String, String> operatorNames;
    +	private Map<String, StreamComponentInvokable> invokableObjects;
    +	private Map<String, byte[]> serializedFunctions;
    +	private Map<String, byte[]> outputSelectors;
    +	private Map<String, Class<? extends AbstractInvokable>> componentClasses;
    +	private Map<String, String> iterationIds;
    +	private Map<String, String> iterationHeadNames;
    +	private Map<String, Integer> iterationTailCount;
    +
    +	private String maxParallelismVertexName;
    +	private int maxParallelism;
    +
    +	/**
    +	 * Creates an new {@link JobGraph} with the given name. A JobGraph is a DAG
    +	 * and consists of sources, tasks (intermediate vertices) and sinks. A
    +	 * JobGraph must contain at least a source and a sink.
    +	 * 
    +	 * @param jobGraphName
    +	 *            Name of the JobGraph
    +	 */
    +	public JobGraphBuilder(String jobGraphName) {
    +
    +		jobGraph = new JobGraph(jobGraphName);
    +
    +		components = new HashMap<String, AbstractJobVertex>();
    +		componentParallelism = new HashMap<String, Integer>();
    +		outEdgeList = new HashMap<String, ArrayList<String>>();
    +		outEdgeType = new HashMap<String, ArrayList<Integer>>();
    +		mutability = new HashMap<String, Boolean>();
    +		inEdgeList = new HashMap<String, List<String>>();
    +		connectionTypes = new HashMap<String, List<StreamPartitioner<? extends Tuple>>>();
    +		userDefinedNames = new HashMap<String, String>();
    +		operatorNames = new HashMap<String, String>();
    +		invokableObjects = new HashMap<String, StreamComponentInvokable>();
    +		serializedFunctions = new HashMap<String, byte[]>();
    +		outputSelectors = new HashMap<String, byte[]>();
    +		componentClasses = new HashMap<String, Class<? extends AbstractInvokable>>();
    +		iterationIds = new HashMap<String, String>();
    +		iterationHeadNames = new HashMap<String, String>();
    +		iterationTailCount = new HashMap<String, Integer>();
    +
    +		maxParallelismVertexName = "";
    +		maxParallelism = 0;
    +		if (LOG.isDebugEnabled()) {
    +			LOG.debug("JobGraph created");
    +		}
    +	}
    +
    +	/**
    +	 * Adds source to the JobGraph with the given parameters
    +	 * 
    +	 * @param componentName
    +	 *            Name of the component
    +	 * @param InvokableObject
    +	 *            User defined operator
    +	 * @param operatorName
    +	 *            Operator type
    +	 * @param serializedFunction
    +	 *            Serialized udf
    +	 * @param parallelism
    +	 *            Number of parallel instances created
    +	 */
    +	public void addSource(String componentName,
    +			UserSourceInvokable<? extends Tuple> InvokableObject, String operatorName,
    +			byte[] serializedFunction, int parallelism) {
    +
    +		addComponent(componentName, StreamSource.class, InvokableObject, operatorName,
    +				serializedFunction, parallelism);
    +
    +		if (LOG.isDebugEnabled()) {
    +			LOG.debug("SOURCE: " + componentName);
    +		}
    +	}
    +
    +	/**
    +	 * Adds a source to the iteration head to the {@link JobGraph}. The iterated
    +	 * tuples will be fed from this component back to the graph.
    +	 * 
    +	 * @param componentName
    +	 *            Name of the component
    +	 * @param iterationHead
    +	 *            Id of the iteration head
    +	 * @param iterationID
    +	 *            ID of iteration for multiple iterations
    +	 * @param parallelism
    +	 *            Number of parallel instances created
    +	 */
    +	public void addIterationSource(String componentName, String iterationHead, String iterationID,
    +			int parallelism) {
    +
    +		addComponent(componentName, StreamIterationSource.class, null, null, null, parallelism);
    +		iterationIds.put(componentName, iterationID);
    +		iterationHeadNames.put(iterationID, componentName);
    +
    +		setBytesFrom(iterationHead, componentName);
    +
    +		setEdge(componentName, iterationHead,
    +				connectionTypes.get(inEdgeList.get(iterationHead).get(0)).get(0), 0);
    +
    +		if (LOG.isDebugEnabled()) {
    +			LOG.debug("ITERATION SOURCE: " + componentName);
    +		}
    +	}
    +
    +	/**
    +	 * Adds a task to the JobGraph with the given parameters
    +	 * 
    +	 * @param componentName
    +	 *            Name of the component
    +	 * @param taskInvokableObject
    +	 *            User defined operator
    +	 * @param operatorName
    +	 *            Operator type
    +	 * @param serializedFunction
    +	 *            Serialized udf
    +	 * @param parallelism
    +	 *            Number of parallel instances created
    +	 */
    +	public <IN extends Tuple, OUT extends Tuple> void addTask(String componentName,
    +			UserTaskInvokable<IN, OUT> taskInvokableObject, String operatorName,
    +			byte[] serializedFunction, int parallelism) {
    +
    +		addComponent(componentName, StreamTask.class, taskInvokableObject, operatorName,
    +				serializedFunction, parallelism);
    +
    +		if (LOG.isDebugEnabled()) {
    +			LOG.debug("TASK: " + componentName);
    +		}
    +	}
    +
    +	public <IN1 extends Tuple, IN2 extends Tuple, OUT extends Tuple> void addCoTask(
    +			String componentName, CoInvokable<IN1, IN2, OUT> taskInvokableObject,
    +			String operatorName, byte[] serializedFunction, int parallelism) {
    +
    +		addComponent(componentName, CoStreamTask.class, taskInvokableObject, operatorName,
    +				serializedFunction, parallelism);
    +
    +		if (LOG.isDebugEnabled()) {
    +			LOG.debug("CO-TASK: " + componentName);
    +		}
    +	}
    +
    +	/**
    +	 * Adds sink to the JobGraph with the given parameters
    +	 * 
    +	 * @param componentName
    +	 *            Name of the component
    +	 * @param InvokableObject
    +	 *            User defined operator
    +	 * @param operatorName
    +	 *            Operator type
    +	 * @param serializedFunction
    +	 *            Serialized udf
    +	 * @param parallelism
    +	 *            Number of parallel instances created
    +	 */
    +	public void addSink(String componentName, SinkInvokable<? extends Tuple> InvokableObject,
    +			String operatorName, byte[] serializedFunction, int parallelism) {
    +
    +		addComponent(componentName, StreamSink.class, InvokableObject, operatorName,
    +				serializedFunction, parallelism);
    +
    +		if (LOG.isDebugEnabled()) {
    +			LOG.debug("SINK: " + componentName);
    +		}
    +
    +	}
    +
    +	/**
    +	 * Adds a sink to an iteration tail to the {@link JobGraph}. The tuples
    +	 * intended to be iterated will be sent to this sink from the iteration
    +	 * head.
    +	 * 
    +	 * @param componentName
    +	 *            Name of the component
    +	 * @param iterationTail
    +	 *            Id of the iteration tail
    +	 * @param iterationID
    +	 *            ID of iteration for mulitple iterations
    +	 * @param parallelism
    +	 *            Number of parallel instances created
    +	 * @param directName
    +	 *            Id of the output direction
    +	 */
    +	public void addIterationSink(String componentName, String iterationTail, String iterationID,
    +			int parallelism, String directName) {
    +
    +		addComponent(componentName, StreamIterationSink.class, null, null, null, parallelism);
    +		iterationIds.put(componentName, iterationID);
    +		setBytesFrom(iterationTail, componentName);
    +
    +		if (directName != null) {
    +			setUserDefinedName(componentName, directName);
    +		} else {
    +			setUserDefinedName(componentName, "iterate");
    +		}
    +
    +		if (LOG.isDebugEnabled()) {
    +			LOG.debug("ITERATION SINK: " + componentName);
    +		}
    +
    +	}
    +
    +	/**
    +	 * Sets component parameters in the JobGraph
    +	 * 
    +	 * @param componentName
    +	 *            Name of the component
    +	 * @param componentClass
    +	 *            The class of the vertex
    +	 * @param invokableObject
    +	 *            The user defined invokable object
    +	 * @param operatorName
    +	 *            Type of the user defined operator
    +	 * @param serializedFunction
    +	 *            Serialized operator
    +	 * @param parallelism
    +	 *            Number of parallel instances created
    +	 */
    +	private void addComponent(String componentName,
    +			Class<? extends AbstractInvokable> componentClass,
    +			StreamComponentInvokable invokableObject, String operatorName,
    +			byte[] serializedFunction, int parallelism) {
    +
    +		componentClasses.put(componentName, componentClass);
    +		setParallelism(componentName, parallelism);
    +		mutability.put(componentName, false);
    +		invokableObjects.put(componentName, invokableObject);
    +		operatorNames.put(componentName, operatorName);
    +		serializedFunctions.put(componentName, serializedFunction);
    +		outEdgeList.put(componentName, new ArrayList<String>());
    +		outEdgeType.put(componentName, new ArrayList<Integer>());
    +		inEdgeList.put(componentName, new ArrayList<String>());
    +		connectionTypes.put(componentName, new ArrayList<StreamPartitioner<? extends Tuple>>());
    +		iterationTailCount.put(componentName, 0);
    +	}
    +
    +	/**
    +	 * Creates an {@link AbstractJobVertex} in the {@link JobGraph} and sets its
    +	 * config parameters using the ones set previously.
    +	 * 
    +	 * @param componentName
    +	 *            Name of the component for which the vertex will be created.
    +	 */
    +	private void createVertex(String componentName) {
    +
    +		// Get vertex attributes
    +		Class<? extends AbstractInvokable> componentClass = componentClasses.get(componentName);
    +		StreamComponentInvokable invokableObject = invokableObjects.get(componentName);
    +		String operatorName = operatorNames.get(componentName);
    +		byte[] serializedFunction = serializedFunctions.get(componentName);
    +		int parallelism = componentParallelism.get(componentName);
    +		byte[] outputSelector = outputSelectors.get(componentName);
    +		String userDefinedName = userDefinedNames.get(componentName);
    +
    +		// Create vertex object
    +		AbstractJobVertex component = null;
    +		if (componentClass.equals(StreamSource.class)
    +				|| componentClass.equals(StreamIterationSource.class)) {
    +			component = new JobInputVertex(componentName, this.jobGraph);
    +		} else if (componentClass.equals(StreamTask.class)
    +				|| componentClass.equals(CoStreamTask.class)) {
    +			component = new JobTaskVertex(componentName, this.jobGraph);
    +		} else if (componentClass.equals(StreamSink.class)
    +				|| componentClass.equals(StreamIterationSink.class)) {
    +			component = new JobOutputVertex(componentName, this.jobGraph);
    +		}
    +
    +		component.setInvokableClass(componentClass);
    +		component.setNumberOfSubtasks(parallelism);
    +		if (LOG.isDebugEnabled()) {
    +			LOG.debug("Parallelism set: " + parallelism + " for " + componentName);
    +		}
    +
    +		Configuration config = component.getConfiguration();
    +
    +		config.setBoolean("isMutable", mutability.get(componentName));
    +
    +		// Set vertex config
    +		if (invokableObject != null) {
    +			config.setClass("userfunction", invokableObject.getClass());
    +			addSerializedObject(invokableObject, config);
    +		}
    +		config.setString("componentName", componentName);
    +		if (serializedFunction != null) {
    +			config.setBytes("operator", serializedFunction);
    +			config.setString("operatorName", operatorName);
    +		}
    +
    +		if (userDefinedName != null) {
    +			config.setString("userDefinedName", userDefinedName);
    +		}
    +
    +		if (outputSelector != null) {
    +			config.setBoolean("directedEmit", true);
    +			config.setBytes("outputSelector", outputSelector);
    +		}
    +
    +		if (componentClass.equals(StreamIterationSource.class)
    +				|| componentClass.equals(StreamIterationSink.class)) {
    +			config.setString("iteration-id", iterationIds.get(componentName));
    +		}
    +
    +		components.put(componentName, component);
    +
    +		if (parallelism > maxParallelism) {
    +			maxParallelism = parallelism;
    +			maxParallelismVertexName = componentName;
    +		}
    +	}
    +
    +	/**
    +	 * Adds serialized invokable object to the JobVertex configuration
    +	 * 
    +	 * @param invokableObject
    +	 *            Invokable object to serialize
    +	 * @param config
    +	 *            JobVertex configuration to which the serialized invokable will
    +	 *            be added
    +	 */
    +	private void addSerializedObject(Serializable invokableObject, Configuration config) {
    +
    +		ByteArrayOutputStream baos = null;
    +		ObjectOutputStream oos = null;
    +		try {
    +			baos = new ByteArrayOutputStream();
    +
    +			oos = new ObjectOutputStream(baos);
    +
    +			oos.writeObject(invokableObject);
    +
    +			config.setBytes("serializedudf", baos.toByteArray());
    --- End diff --
    
    @gaborhermann extracted these statements to a StreamConfig. It is under review as of now. Thanks for the suggestion.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Streaming addon prototype

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/72#discussion_r15466083
  
    --- Diff: flink-addons/flink-streaming/.travis.yml ---
    @@ -0,0 +1,11 @@
    +
    +language: java
    --- End diff --
    
    I think you can remove this file.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Streaming addon prototype

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/72#discussion_r15466553
  
    --- Diff: flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/DataStream.java ---
    @@ -0,0 +1,700 @@
    +/**
    + *
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + *
    + */
    +
    +package org.apache.flink.streaming.api;
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +import org.apache.flink.api.java.functions.FilterFunction;
    +import org.apache.flink.api.java.functions.FlatMapFunction;
    +import org.apache.flink.api.java.functions.GroupReduceFunction;
    +import org.apache.flink.api.java.functions.MapFunction;
    +import org.apache.flink.api.java.tuple.Tuple;
    +import org.apache.flink.streaming.api.StreamExecutionEnvironment.ConnectionType;
    +import org.apache.flink.streaming.api.collector.OutputSelector;
    +import org.apache.flink.streaming.api.function.co.CoMapFunction;
    +import org.apache.flink.streaming.api.function.sink.SinkFunction;
    +import org.apache.flink.streaming.api.function.sink.WriteFormatAsCsv;
    +import org.apache.flink.streaming.api.function.sink.WriteFormatAsText;
    +import org.apache.flink.streaming.api.invokable.operator.BatchReduceInvokable;
    +import org.apache.flink.streaming.api.invokable.operator.FilterInvokable;
    +import org.apache.flink.streaming.api.invokable.operator.FlatMapInvokable;
    +import org.apache.flink.streaming.api.invokable.operator.MapInvokable;
    +import org.apache.flink.streaming.api.invokable.operator.WindowReduceInvokable;
    +import org.apache.flink.streaming.api.invokable.operator.co.CoMapInvokable;
    +
    +/**
    + * A DataStream represents a stream of elements of the same type. A DataStream
    + * can be transformed into another DataStream by applying a transformation as
    + * for example
    + * <ul>
    + * <li>{@link DataStream#map},</li>
    + * <li>{@link DataStream#filter}, or</li>
    + * <li>{@link DataStream#batchReduce}.</li>
    + * </ul>
    + * 
    + * @param <T>
    + *            The type of the DataStream, i.e., the type of the elements of the
    + *            DataStream.
    + */
    +public class DataStream<T extends Tuple> {
    +
    +	protected static Integer counter = 0;
    +	protected final StreamExecutionEnvironment environment;
    +	protected String id;
    +	protected int degreeOfParallelism;
    +	protected String userDefinedName;
    +	protected OutputSelector<T> outputSelector;
    +	protected List<String> connectIDs;
    +	protected List<ConnectionType> ctypes;
    +	protected List<Integer> cparams;
    +	protected boolean iterationflag;
    +	protected Integer iterationID;
    +
    +	/**
    +	 * Create a new {@link DataStream} in the given execution environment with
    +	 * partitioning set to shuffle by default.
    +	 * 
    +	 * @param environment
    +	 *            StreamExecutionEnvironment
    +	 * @param operatorType
    +	 *            The type of the operator in the component
    +	 */
    +	protected DataStream(StreamExecutionEnvironment environment, String operatorType) {
    +		if (environment == null) {
    +			throw new NullPointerException("context is null");
    +		}
    +
    +		// TODO add name based on component number an preferable sequential id
    +		counter++;
    +		this.id = operatorType + "-" + counter.toString();
    +		this.environment = environment;
    +		this.degreeOfParallelism = environment.getDegreeOfParallelism();
    +		initConnections();
    +
    +	}
    +
    +	/**
    +	 * Create a new DataStream by creating a copy of another DataStream
    +	 * 
    +	 * @param dataStream
    +	 *            The DataStream that will be copied.
    +	 */
    +	protected DataStream(DataStream<T> dataStream) {
    +		this.environment = dataStream.environment;
    +		this.id = dataStream.id;
    +		this.degreeOfParallelism = dataStream.degreeOfParallelism;
    +		this.userDefinedName = dataStream.userDefinedName;
    +		this.outputSelector = dataStream.outputSelector;
    +		this.connectIDs = new ArrayList<String>(dataStream.connectIDs);
    +		this.ctypes = new ArrayList<StreamExecutionEnvironment.ConnectionType>(dataStream.ctypes);
    +		this.cparams = new ArrayList<Integer>(dataStream.cparams);
    +		this.iterationflag = dataStream.iterationflag;
    +		this.iterationID = dataStream.iterationID;
    +	}
    +
    +	/**
    +	 * Initialize the connection and partitioning among the connected
    +	 * {@link DataStream}s.
    +	 */
    +	private void initConnections() {
    +		connectIDs = new ArrayList<String>();
    +		connectIDs.add(getId());
    +		ctypes = new ArrayList<StreamExecutionEnvironment.ConnectionType>();
    +		ctypes.add(ConnectionType.SHUFFLE);
    +		cparams = new ArrayList<Integer>();
    +		cparams.add(0);
    +
    +	}
    +
    +	/**
    +	 * Returns the ID of the {@link DataStream}.
    +	 * 
    +	 * @return ID of the DataStream
    +	 */
    +	public String getId() {
    +		return id;
    +	}
    +
    +	/**
    +	 * Sets the mutability of the operator represented by the DataStream. If the
    +	 * operator is set to mutable, the tuples received in the user defined
    +	 * functions, will be reused after the function call. Setting an operator to
    +	 * mutable greatly reduces garbage collection overhead and thus scalability.
    +	 * 
    +	 * @param isMutable
    +	 *            The mutability of the operator.
    +	 * @return The DataStream with mutability set.
    +	 */
    +	public DataStream<T> setMutability(boolean isMutable) {
    +		environment.setMutability(this, isMutable);
    +		return this;
    +	}
    +
    +	/**
    +	 * Sets the degree of parallelism for this operator. The degree must be 1 or
    +	 * more.
    +	 * 
    +	 * @param dop
    +	 *            The degree of parallelism for this operator.
    +	 * @return The operator with set degree of parallelism.
    +	 */
    +	public DataStream<T> setParallelism(int dop) {
    +		if (dop < 1) {
    +			throw new IllegalArgumentException("The parallelism of an operator must be at least 1.");
    +		}
    +		this.degreeOfParallelism = dop;
    +
    +		environment.setOperatorParallelism(this);
    +
    +		return new DataStream<T>(this);
    +
    +	}
    +
    +	/**
    +	 * Gets the degree of parallelism for this operator.
    +	 * 
    +	 * @return The parallelism set for this operator.
    +	 */
    +	public int getParallelism() {
    +		return this.degreeOfParallelism;
    +	}
    +
    +	/**
    +	 * Gives the data transformation(vertex) a user defined name in order to use
    +	 * at directed outputs. The {@link OutputSelector} of the input vertex
    +	 * should use this name for directed emits.
    +	 * 
    +	 * @param name
    +	 *            The name to set
    +	 * @return The named DataStream.
    +	 */
    +	public DataStream<T> name(String name) {
    +		// copy?
    +		if (name == "") {
    --- End diff --
    
    This is afaik not a check for an empty string. It checks if the reference of `name` is the same as the "".
    I think `name.equals("")` is correct here. 
    http://stackoverflow.com/questions/3321526/should-i-use-string-isempty-or-equalsstring


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Streaming addon prototype

Posted by mbalassi <gi...@git.apache.org>.
Github user mbalassi commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/72#discussion_r15576479
  
    --- Diff: flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTopology.java ---
    @@ -0,0 +1,98 @@
    +/**
    + *
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + *
    + */
    +
    +package org.apache.flink.streaming.connectors.kafka;
    +
    +import org.apache.flink.streaming.api.DataStream;
    +import org.apache.flink.streaming.api.StreamExecutionEnvironment;
    +import org.apache.flink.streaming.api.function.source.SourceFunction;
    +import org.apache.flink.api.java.tuple.Tuple1;
    +import org.apache.flink.util.Collector;
    +
    +public class KafkaTopology {
    +
    +	public static final class MySource extends SourceFunction<Tuple1<String>> {
    +		private static final long serialVersionUID = 1L;
    +
    +		@Override
    +		public void invoke(Collector<Tuple1<String>> collector) throws Exception {
    +			// TODO Auto-generated method stub
    --- End diff --
    
    The connectors package is under heavy development right now, the refactored code is going to be pushed to the pull request during the end of the week.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Streaming addon prototype

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/72#discussion_r15466266
  
    --- Diff: flink-addons/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTest.java ---
    @@ -0,0 +1,74 @@
    +/**
    + *
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + *
    + */
    +
    +package org.apache.flink.streaming.connectors.rabbitmq;
    +
    +
    +import java.util.HashSet;
    +import java.util.Set;
    +
    +import org.apache.flink.api.java.tuple.Tuple1;
    +import org.apache.flink.streaming.api.function.sink.SinkFunction;
    +import org.junit.Test;
    +
    +public class RMQTest {
    +	
    +	public static final class MySink extends SinkFunction<Tuple1<String>> {
    +		private static final long serialVersionUID = 1L;
    +
    +		@Override
    +		public void invoke(Tuple1<String> tuple) {
    +			result.add(tuple.f0);
    +		}
    +
    +		
    +	}
    +	
    +	private static Set<String> expected = new HashSet<String>();
    +	private static Set<String> result = new HashSet<String>();
    +	
    +	@SuppressWarnings("unused")
    +  private static void fillExpected() {
    --- End diff --
    
    broken indentation 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---