You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/09/21 20:27:28 UTC

[3/3] git commit: [FLINK-1038] added RemoteCollectorOutputFormat

[FLINK-1038] added RemoteCollectorOutputFormat

This closes #94


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/7f946cee
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/7f946cee
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/7f946cee

Branch: refs/heads/master
Commit: 7f946cee6735be076ec9b07ac0b67f183df95a40
Parents: 9463e27
Author: Fabian Tschirschnitz <fa...@googlemail.com>
Authored: Thu Sep 11 14:16:24 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Sep 21 19:55:26 2014 +0200

----------------------------------------------------------------------
 .../RemoteCollectorOutputFormatExample.java     | 113 +++++++++++
 .../flink/api/java/io/RemoteCollector.java      |  40 ++++
 .../api/java/io/RemoteCollectorConsumer.java    |  26 +++
 .../flink/api/java/io/RemoteCollectorImpl.java  | 194 +++++++++++++++++++
 .../java/io/RemoteCollectorOutputFormat.java    | 170 ++++++++++++++++
 5 files changed, 543 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/7f946cee/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/remotecollectoroutputformat/RemoteCollectorOutputFormatExample.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/remotecollectoroutputformat/RemoteCollectorOutputFormatExample.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/remotecollectoroutputformat/RemoteCollectorOutputFormatExample.java
new file mode 100644
index 0000000..b9fa921
--- /dev/null
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/remotecollectoroutputformat/RemoteCollectorOutputFormatExample.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.example.java.remotecollectoroutputformat;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.aggregation.Aggregations;
+import org.apache.flink.api.java.io.RemoteCollectorConsumer;
+import org.apache.flink.api.java.io.RemoteCollectorImpl;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.Collector;
+
+/**
+ * Implements the "WordCount" program that computes a simple word occurrence
+ * histogram over some sample data and collects the results with an
+ * implementation of a {@link RemoteCollectorConsumer}.
+ */
+@SuppressWarnings("serial")
+public class RemoteCollectorOutputFormatExample {
+
+	public static void main(String[] args) throws Exception {
+
+		/**
+		 * We create a remote {@link ExecutionEnvironment} here, because this
+		 * OutputFormat is designed for use in a distributed setting. For local
+		 * use you should consider using the {@link LocalCollectionOutputFormat
+		 * <T>}.
+		 */
+		final ExecutionEnvironment env = ExecutionEnvironment
+				.createRemoteEnvironment("<remote>", 6124,
+						"/path/to/your/file.jar");
+
+		// get input data
+		DataSet<String> text = env.fromElements(
+				"To be, or not to be,--that is the question:--",
+				"Whether 'tis nobler in the mind to suffer",
+				"The slings and arrows of outrageous fortune",
+				"Or to take arms against a sea of troubles,");
+
+		DataSet<Tuple2<String, Integer>> counts =
+		// split up the lines in pairs (2-tuples) containing: (word,1)
+		text.flatMap(new LineSplitter())
+		// group by the tuple field "0" and sum up tuple field "1"
+				.groupBy(0).aggregate(Aggregations.SUM, 1);
+
+		// emit result
+		RemoteCollectorImpl.collectLocal(counts,
+				new RemoteCollectorConsumer<Tuple2<String, Integer>>() {
+					// user defined IRemoteCollectorConsumer
+					@Override
+					public void collect(Tuple2<String, Integer> element) {
+						System.out.println("word/occurrences:" + element);
+					}
+				});
+
+		// local collection to store results in
+		Set<Tuple2<String, Integer>> collection = new HashSet<Tuple2<String, Integer>>();
+		// collect results from remote in local collection
+		RemoteCollectorImpl.collectLocal(counts, collection);
+
+		// execute program
+		env.execute("WordCount Example with RemoteCollectorOutputFormat");
+
+		System.out.println(collection);
+	}
+
+	//
+	// User Functions
+	//
+
+	/**
+	 * Implements the string tokenizer that splits sentences into words as a
+	 * user-defined FlatMapFunction. The function takes a line (String) and
+	 * splits it into multiple pairs in the form of "(word,1)" (Tuple2<String,
+	 * Integer>).
+	 */
+	public static final class LineSplitter implements
+			FlatMapFunction<String, Tuple2<String, Integer>> {
+
+		@Override
+		public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
+			// normalize and split the line
+			String[] tokens = value.toLowerCase().split("\\W+");
+
+			// emit the pairs
+			for (String token : tokens) {
+				if (token.length() > 0) {
+					out.collect(new Tuple2<String, Integer>(token, 1));
+				}
+			}
+		}
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/7f946cee/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollector.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollector.java b/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollector.java
new file mode 100644
index 0000000..af021e5
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollector.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io;
+
+import java.rmi.Remote;
+import java.rmi.RemoteException;
+
+/**
+ * This interface is the counterpart to the {@link RemoteCollectorOutputFormat}
+ * and implementations will receive remote results through the collect function.
+ * 
+ * @param <T>
+ *            The type of the records the collector will receive
+ */
+public interface RemoteCollector<T> extends Remote {
+
+	public void collect(T element) throws RemoteException;
+
+	public RemoteCollectorConsumer<T> getConsumer() throws RemoteException;
+
+	public void setConsumer(RemoteCollectorConsumer<T> consumer)
+			throws RemoteException;
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/7f946cee/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollectorConsumer.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollectorConsumer.java b/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollectorConsumer.java
new file mode 100644
index 0000000..25564dc
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollectorConsumer.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io;
+
+/**
+ * This interface describes consumers of {@link RemoteCollector} implementations.
+ */
+public interface RemoteCollectorConsumer<T> {
+	public void collect(T element);
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/7f946cee/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollectorImpl.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollectorImpl.java b/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollectorImpl.java
new file mode 100644
index 0000000..f79833f
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollectorImpl.java
@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io;
+
+import java.net.Inet4Address;
+import java.net.InetAddress;
+import java.net.NetworkInterface;
+import java.net.ServerSocket;
+import java.rmi.AlreadyBoundException;
+import java.rmi.RemoteException;
+import java.rmi.registry.LocateRegistry;
+import java.rmi.registry.Registry;
+import java.rmi.server.UnicastRemoteObject;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.UUID;
+
+import org.apache.flink.api.common.io.OutputFormat;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.operators.DataSink;
+
+/**
+ * This class provides a counterpart implementation for the
+ * {@link RemoteCollectorOutputFormat}.
+ */
+
+public class RemoteCollectorImpl<T> extends UnicastRemoteObject implements
+		RemoteCollector<T> {
+
+	private static final long serialVersionUID = 1L;
+
+	/**
+	 * Instance of an implementation of a {@link RemoteCollectorConsumer}. This
+	 * instance will get the records passed.
+	 */
+
+	private RemoteCollectorConsumer<T> consumer;
+
+	/**
+	 * This factory method creates an instance of the
+	 * {@link RemoteCollectorImpl} and binds it in the local RMI
+	 * {@link Registry}.
+	 * 
+	 * @param port
+	 *            The port where the local colector is listening.
+	 * @param consumer
+	 *            The consumer instance.
+	 * @param rmiId 
+	 * 	          An ID to register the collector in the RMI registry.
+	 * @return
+	 */
+	public static <T> void createAndBind(Integer port, RemoteCollectorConsumer<T> consumer, String rmiId) {
+		RemoteCollectorImpl<T> collectorInstance = null;
+
+		try {
+			collectorInstance = new RemoteCollectorImpl<T>();
+
+			Registry registry;
+
+			registry = LocateRegistry.createRegistry(port);
+			registry.bind(rmiId, collectorInstance);
+		} catch (RemoteException e) {
+			e.printStackTrace();
+		} catch (AlreadyBoundException e) {
+			e.printStackTrace();
+		}
+
+		collectorInstance.setConsumer(consumer);
+	}
+
+	/**
+	 * Writes a DataSet to a {@link RemoteCollectorConsumer} through an
+	 * {@link RemoteCollector} remotely called from the
+	 * {@link RemoteCollectorOutputFormat}.<br/>
+	 * 
+	 * @return The DataSink that writes the DataSet.
+	 */
+	public static <T> DataSink<T> collectLocal(DataSet<T> source,
+			RemoteCollectorConsumer<T> consumer) {
+		// if the RMI parameter was not set by the user make a "good guess"
+		String ip = System.getProperty("java.rmi.server.hostname");
+		if (ip == null) {
+			Enumeration<NetworkInterface> networkInterfaces = null;
+			try {
+				networkInterfaces = NetworkInterface.getNetworkInterfaces();
+			} catch (Throwable t) {
+				throw new RuntimeException(t);
+			}
+			while (networkInterfaces.hasMoreElements()) {
+				NetworkInterface networkInterface = (NetworkInterface) networkInterfaces
+						.nextElement();
+				Enumeration<InetAddress> inetAddresses = networkInterface
+						.getInetAddresses();
+				while (inetAddresses.hasMoreElements()) {
+					InetAddress inetAddress = (InetAddress) inetAddresses
+							.nextElement();
+					if (!inetAddress.isLoopbackAddress()
+							&& inetAddress instanceof Inet4Address) {
+						ip = inetAddress.getHostAddress();
+						System.setProperty("java.rmi.server.hostname", ip);
+					}
+				}
+			}
+		}
+
+		// get some random free port
+		Integer randomPort = 0;
+		try {
+			ServerSocket tmp = new ServerSocket(0);
+			randomPort = tmp.getLocalPort();
+			tmp.close();
+		} catch (Throwable t) {
+			throw new RuntimeException(t);
+		}
+
+		// create an ID for this output format instance
+		String rmiId = String.format("%s-%s", RemoteCollectorOutputFormat.class.getName(), UUID.randomUUID());
+		
+		// create the local listening object and bind it to the RMI registry
+		RemoteCollectorImpl.createAndBind(randomPort, consumer, rmiId);
+
+		// create and configure the output format
+		OutputFormat<T> remoteCollectorOutputFormat = new RemoteCollectorOutputFormat<T>(ip, randomPort, rmiId);
+
+		// create sink
+		return source.output(remoteCollectorOutputFormat);
+	}
+
+	/**
+	 * Writes a DataSet to a local {@link Collection} through an
+	 * {@link RemoteCollector} and a standard {@link RemoteCollectorConsumer}
+	 * implementation remotely called from the
+	 * {@link RemoteCollectorOutputFormat}.<br/>
+	 * 
+	 * @param local
+	 * @param port
+	 * @param collection
+	 */
+	public static <T> void collectLocal(DataSet<T> source,
+			Collection<T> collection) {
+		final Collection<T> synchronizedCollection = Collections
+				.synchronizedCollection(collection);
+		collectLocal(source, new RemoteCollectorConsumer<T>() {
+			@Override
+			public void collect(T element) {
+				synchronizedCollection.add(element);
+			}
+		});
+	}
+
+	/**
+	 * Necessary private default constructor.
+	 * 
+	 * @throws RemoteException
+	 */
+	private RemoteCollectorImpl() throws RemoteException {
+		super();
+	}
+
+	/**
+	 * This method is called by the remote to collect records.
+	 */
+	@Override
+	public void collect(T element) throws RemoteException {
+		this.consumer.collect(element);
+	}
+
+	@Override
+	public RemoteCollectorConsumer<T> getConsumer() {
+		return this.consumer;
+	}
+
+	@Override
+	public void setConsumer(RemoteCollectorConsumer<T> consumer) {
+		this.consumer = consumer;
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/7f946cee/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollectorOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollectorOutputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollectorOutputFormat.java
new file mode 100644
index 0000000..b13b2ff
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollectorOutputFormat.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io;
+
+import java.io.IOException;
+import java.rmi.AccessException;
+import java.rmi.NotBoundException;
+import java.rmi.RemoteException;
+import java.rmi.registry.LocateRegistry;
+import java.rmi.registry.Registry;
+
+import org.apache.flink.api.common.io.OutputFormat;
+import org.apache.flink.configuration.Configuration;
+
+/**
+ * An output format that sends results through JAVA RMI to an
+ * {@link RemoteCollector} implementation. The client has to provide an
+ * implementation of {@link RemoteCollector} and has to write it's plan's output
+ * into an instance of {@link RemoteCollectorOutputFormat}. Further in the
+ * client's VM parameters -Djava.rmi.server.hostname should be set to the own IP
+ * address.
+ */
+public class RemoteCollectorOutputFormat<T> implements OutputFormat<T> {
+
+	private static final long serialVersionUID = 1922744224032398102L;
+
+	/**
+	 * The reference of the {@link RemoteCollector} object
+	 */
+	private transient RemoteCollector<T> remoteCollector;
+
+	transient private Registry registry;
+
+	/**
+	 * Config parameter for the remote's port number
+	 */
+	public static final String PORT = "port";
+	/**
+	 * Config parameter for the remote's address
+	 */
+	public static final String REMOTE = "remote";
+	/**
+	 * An id used necessary for Java RMI
+	 */
+	public static final String RMI_ID = "rmiId";
+
+	private String remote;
+
+	private int port;
+
+	private String rmiId;
+
+	/**
+	 * Create a new {@link RemoteCollectorOutputFormat} instance. The remote and
+	 * port for this output are by default localhost:8888 but can be configured
+	 * via a {@link Configuration} object.
+	 * 
+	 * @see RemoteCollectorOutputFormat#REMOTE
+	 * @see RemoteCollectorOutputFormat#PORT
+	 */
+	public RemoteCollectorOutputFormat() {
+		this("localhost", 8888, null);
+	}
+
+	/**
+	 * Creates a new {@link RemoteCollectorOutputFormat} instance for the
+	 * specified remote and port.
+	 * 
+	 * @param rmiId
+	 */
+	public RemoteCollectorOutputFormat(String remote, int port, String rmiId) {
+		super();
+		this.remote = remote;
+		this.port = port;
+		this.rmiId = rmiId;
+		
+		if (this.remote == null) {
+			throw new IllegalStateException(String.format(
+					"No remote configured for %s.", this));
+		}
+
+		if (this.rmiId == null) {
+			throw new IllegalStateException(String.format(
+					"No registry ID configured for %s.", this));
+		}
+	}
+
+	@Override
+	/**
+	 * This method receives the Configuration object, where the fields "remote" and "port" must be set.
+	 */
+	public void configure(Configuration parameters) {
+		this.remote = parameters.getString(REMOTE, this.remote);
+		this.port = parameters.getInteger(PORT, this.port);
+		this.rmiId = parameters.getString(RMI_ID, this.rmiId);
+
+		if (this.remote == null) {
+			throw new IllegalStateException(String.format(
+					"No remote configured for %s.", this));
+		}
+
+		if (this.rmiId == null) {
+			throw new IllegalStateException(String.format(
+					"No registry ID configured for %s.", this));
+		}
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public void open(int taskNumber, int numTasks) throws IOException {
+		// get the remote's RMI Registry
+		try {
+			registry = LocateRegistry.getRegistry(this.remote, this.port);
+		} catch (RemoteException e) {
+			throw new IllegalStateException(e);
+		}
+
+		// try to get an intance of an IRemoteCollector implementation
+		try {
+			this.remoteCollector = (RemoteCollector<T>) registry
+					.lookup(this.rmiId);
+		} catch (AccessException e) {
+			throw new IllegalStateException(e);
+		} catch (RemoteException e) {
+			throw new IllegalStateException(e);
+		} catch (NotBoundException e) {
+			throw new IllegalStateException(e);
+		}
+	}
+
+	/**
+	 * This method forwards records simply to the remote's
+	 * {@link RemoteCollector} implementation
+	 */
+	@Override
+	public void writeRecord(T record) throws IOException {
+		remoteCollector.collect(record);
+	}
+
+	/**
+	 * This method unbinds the reference of the implementation of
+	 * {@link RemoteCollector}.
+	 */
+	@Override
+	public void close() throws IOException {
+	}
+
+	@Override
+	public String toString() {
+		return "RemoteCollectorOutputFormat(" + remote + ":" + port + ", "
+				+ rmiId + ")";
+	}
+
+}