You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/09/21 20:27:28 UTC
[3/3] git commit: [FLINK-1038] added RemoteCollectorOutputFormat
[FLINK-1038] added RemoteCollectorOutputFormat
This closes #94
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/7f946cee
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/7f946cee
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/7f946cee
Branch: refs/heads/master
Commit: 7f946cee6735be076ec9b07ac0b67f183df95a40
Parents: 9463e27
Author: Fabian Tschirschnitz <fa...@googlemail.com>
Authored: Thu Sep 11 14:16:24 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Sep 21 19:55:26 2014 +0200
----------------------------------------------------------------------
.../RemoteCollectorOutputFormatExample.java | 113 +++++++++++
.../flink/api/java/io/RemoteCollector.java | 40 ++++
.../api/java/io/RemoteCollectorConsumer.java | 26 +++
.../flink/api/java/io/RemoteCollectorImpl.java | 194 +++++++++++++++++++
.../java/io/RemoteCollectorOutputFormat.java | 170 ++++++++++++++++
5 files changed, 543 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/7f946cee/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/remotecollectoroutputformat/RemoteCollectorOutputFormatExample.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/remotecollectoroutputformat/RemoteCollectorOutputFormatExample.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/remotecollectoroutputformat/RemoteCollectorOutputFormatExample.java
new file mode 100644
index 0000000..b9fa921
--- /dev/null
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/remotecollectoroutputformat/RemoteCollectorOutputFormatExample.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.example.java.remotecollectoroutputformat;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.aggregation.Aggregations;
+import org.apache.flink.api.java.io.RemoteCollectorConsumer;
+import org.apache.flink.api.java.io.RemoteCollectorImpl;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.Collector;
+
+/**
+ * Implements the "WordCount" program that computes a simple word occurrence
+ * histogram over some sample data and collects the results with an
+ * implementation of a {@link RemoteCollectorConsumer}.
+ */
+@SuppressWarnings("serial")
+public class RemoteCollectorOutputFormatExample {
+
+ public static void main(String[] args) throws Exception {
+
+ /**
+ * We create a remote {@link ExecutionEnvironment} here, because this
+ * OutputFormat is designed for use in a distributed setting. For local
+ * use you should consider using the {@link LocalCollectionOutputFormat
+ * <T>}.
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment
+ .createRemoteEnvironment("<remote>", 6124,
+ "/path/to/your/file.jar");
+
+ // get input data
+ DataSet<String> text = env.fromElements(
+ "To be, or not to be,--that is the question:--",
+ "Whether 'tis nobler in the mind to suffer",
+ "The slings and arrows of outrageous fortune",
+ "Or to take arms against a sea of troubles,");
+
+ DataSet<Tuple2<String, Integer>> counts =
+ // split up the lines in pairs (2-tuples) containing: (word,1)
+ text.flatMap(new LineSplitter())
+ // group by the tuple field "0" and sum up tuple field "1"
+ .groupBy(0).aggregate(Aggregations.SUM, 1);
+
+ // emit result
+ RemoteCollectorImpl.collectLocal(counts,
+ new RemoteCollectorConsumer<Tuple2<String, Integer>>() {
+ // user defined IRemoteCollectorConsumer
+ @Override
+ public void collect(Tuple2<String, Integer> element) {
+ System.out.println("word/occurrences:" + element);
+ }
+ });
+
+ // local collection to store results in
+ Set<Tuple2<String, Integer>> collection = new HashSet<Tuple2<String, Integer>>();
+ // collect results from remote in local collection
+ RemoteCollectorImpl.collectLocal(counts, collection);
+
+ // execute program
+ env.execute("WordCount Example with RemoteCollectorOutputFormat");
+
+ System.out.println(collection);
+ }
+
+ //
+ // User Functions
+ //
+
+ /**
+ * Implements the string tokenizer that splits sentences into words as a
+ * user-defined FlatMapFunction. The function takes a line (String) and
+ * splits it into multiple pairs in the form of "(word,1)" (Tuple2<String,
+ * Integer>).
+ */
+ public static final class LineSplitter implements
+ FlatMapFunction<String, Tuple2<String, Integer>> {
+
+ @Override
+ public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
+ // normalize and split the line
+ String[] tokens = value.toLowerCase().split("\\W+");
+
+ // emit the pairs
+ for (String token : tokens) {
+ if (token.length() > 0) {
+ out.collect(new Tuple2<String, Integer>(token, 1));
+ }
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/7f946cee/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollector.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollector.java b/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollector.java
new file mode 100644
index 0000000..af021e5
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollector.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io;
+
+import java.rmi.Remote;
+import java.rmi.RemoteException;
+
+/**
+ * This interface is the counterpart to the {@link RemoteCollectorOutputFormat}
+ * and implementations will receive remote results through the collect function.
+ *
+ * @param <T>
+ * The type of the records the collector will receive
+ */
+public interface RemoteCollector<T> extends Remote {
+
+ public void collect(T element) throws RemoteException;
+
+ public RemoteCollectorConsumer<T> getConsumer() throws RemoteException;
+
+ public void setConsumer(RemoteCollectorConsumer<T> consumer)
+ throws RemoteException;
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/7f946cee/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollectorConsumer.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollectorConsumer.java b/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollectorConsumer.java
new file mode 100644
index 0000000..25564dc
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollectorConsumer.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io;
+
+/**
+ * This interface describes consumers of {@link RemoteCollector} implementations.
+ */
+public interface RemoteCollectorConsumer<T> {
+ public void collect(T element);
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/7f946cee/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollectorImpl.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollectorImpl.java b/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollectorImpl.java
new file mode 100644
index 0000000..f79833f
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollectorImpl.java
@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io;
+
+import java.net.Inet4Address;
+import java.net.InetAddress;
+import java.net.NetworkInterface;
+import java.net.ServerSocket;
+import java.rmi.AlreadyBoundException;
+import java.rmi.RemoteException;
+import java.rmi.registry.LocateRegistry;
+import java.rmi.registry.Registry;
+import java.rmi.server.UnicastRemoteObject;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.UUID;
+
+import org.apache.flink.api.common.io.OutputFormat;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.operators.DataSink;
+
+/**
+ * This class provides a counterpart implementation for the
+ * {@link RemoteCollectorOutputFormat}.
+ */
+
+public class RemoteCollectorImpl<T> extends UnicastRemoteObject implements
+ RemoteCollector<T> {
+
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * Instance of an implementation of a {@link RemoteCollectorConsumer}. This
+ * instance will get the records passed.
+ */
+
+ private RemoteCollectorConsumer<T> consumer;
+
+ /**
+ * This factory method creates an instance of the
+ * {@link RemoteCollectorImpl} and binds it in the local RMI
+ * {@link Registry}.
+ *
+ * @param port
+ * The port where the local colector is listening.
+ * @param consumer
+ * The consumer instance.
+ * @param rmiId
+ * An ID to register the collector in the RMI registry.
+ * @return
+ */
+ public static <T> void createAndBind(Integer port, RemoteCollectorConsumer<T> consumer, String rmiId) {
+ RemoteCollectorImpl<T> collectorInstance = null;
+
+ try {
+ collectorInstance = new RemoteCollectorImpl<T>();
+
+ Registry registry;
+
+ registry = LocateRegistry.createRegistry(port);
+ registry.bind(rmiId, collectorInstance);
+ } catch (RemoteException e) {
+ e.printStackTrace();
+ } catch (AlreadyBoundException e) {
+ e.printStackTrace();
+ }
+
+ collectorInstance.setConsumer(consumer);
+ }
+
+ /**
+ * Writes a DataSet to a {@link RemoteCollectorConsumer} through an
+ * {@link RemoteCollector} remotely called from the
+ * {@link RemoteCollectorOutputFormat}.<br/>
+ *
+ * @return The DataSink that writes the DataSet.
+ */
+ public static <T> DataSink<T> collectLocal(DataSet<T> source,
+ RemoteCollectorConsumer<T> consumer) {
+ // if the RMI parameter was not set by the user make a "good guess"
+ String ip = System.getProperty("java.rmi.server.hostname");
+ if (ip == null) {
+ Enumeration<NetworkInterface> networkInterfaces = null;
+ try {
+ networkInterfaces = NetworkInterface.getNetworkInterfaces();
+ } catch (Throwable t) {
+ throw new RuntimeException(t);
+ }
+ while (networkInterfaces.hasMoreElements()) {
+ NetworkInterface networkInterface = (NetworkInterface) networkInterfaces
+ .nextElement();
+ Enumeration<InetAddress> inetAddresses = networkInterface
+ .getInetAddresses();
+ while (inetAddresses.hasMoreElements()) {
+ InetAddress inetAddress = (InetAddress) inetAddresses
+ .nextElement();
+ if (!inetAddress.isLoopbackAddress()
+ && inetAddress instanceof Inet4Address) {
+ ip = inetAddress.getHostAddress();
+ System.setProperty("java.rmi.server.hostname", ip);
+ }
+ }
+ }
+ }
+
+ // get some random free port
+ Integer randomPort = 0;
+ try {
+ ServerSocket tmp = new ServerSocket(0);
+ randomPort = tmp.getLocalPort();
+ tmp.close();
+ } catch (Throwable t) {
+ throw new RuntimeException(t);
+ }
+
+ // create an ID for this output format instance
+ String rmiId = String.format("%s-%s", RemoteCollectorOutputFormat.class.getName(), UUID.randomUUID());
+
+ // create the local listening object and bind it to the RMI registry
+ RemoteCollectorImpl.createAndBind(randomPort, consumer, rmiId);
+
+ // create and configure the output format
+ OutputFormat<T> remoteCollectorOutputFormat = new RemoteCollectorOutputFormat<T>(ip, randomPort, rmiId);
+
+ // create sink
+ return source.output(remoteCollectorOutputFormat);
+ }
+
+ /**
+ * Writes a DataSet to a local {@link Collection} through an
+ * {@link RemoteCollector} and a standard {@link RemoteCollectorConsumer}
+ * implementation remotely called from the
+ * {@link RemoteCollectorOutputFormat}.<br/>
+ *
+ * @param local
+ * @param port
+ * @param collection
+ */
+ public static <T> void collectLocal(DataSet<T> source,
+ Collection<T> collection) {
+ final Collection<T> synchronizedCollection = Collections
+ .synchronizedCollection(collection);
+ collectLocal(source, new RemoteCollectorConsumer<T>() {
+ @Override
+ public void collect(T element) {
+ synchronizedCollection.add(element);
+ }
+ });
+ }
+
+ /**
+ * Necessary private default constructor.
+ *
+ * @throws RemoteException
+ */
+ private RemoteCollectorImpl() throws RemoteException {
+ super();
+ }
+
+ /**
+ * This method is called by the remote to collect records.
+ */
+ @Override
+ public void collect(T element) throws RemoteException {
+ this.consumer.collect(element);
+ }
+
+ @Override
+ public RemoteCollectorConsumer<T> getConsumer() {
+ return this.consumer;
+ }
+
+ @Override
+ public void setConsumer(RemoteCollectorConsumer<T> consumer) {
+ this.consumer = consumer;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/7f946cee/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollectorOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollectorOutputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollectorOutputFormat.java
new file mode 100644
index 0000000..b13b2ff
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollectorOutputFormat.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io;
+
+import java.io.IOException;
+import java.rmi.AccessException;
+import java.rmi.NotBoundException;
+import java.rmi.RemoteException;
+import java.rmi.registry.LocateRegistry;
+import java.rmi.registry.Registry;
+
+import org.apache.flink.api.common.io.OutputFormat;
+import org.apache.flink.configuration.Configuration;
+
+/**
+ * An output format that sends results through JAVA RMI to an
+ * {@link RemoteCollector} implementation. The client has to provide an
+ * implementation of {@link RemoteCollector} and has to write it's plan's output
+ * into an instance of {@link RemoteCollectorOutputFormat}. Further in the
+ * client's VM parameters -Djava.rmi.server.hostname should be set to the own IP
+ * address.
+ */
+public class RemoteCollectorOutputFormat<T> implements OutputFormat<T> {
+
+ private static final long serialVersionUID = 1922744224032398102L;
+
+ /**
+ * The reference of the {@link RemoteCollector} object
+ */
+ private transient RemoteCollector<T> remoteCollector;
+
+ transient private Registry registry;
+
+ /**
+ * Config parameter for the remote's port number
+ */
+ public static final String PORT = "port";
+ /**
+ * Config parameter for the remote's address
+ */
+ public static final String REMOTE = "remote";
+ /**
+ * An id used necessary for Java RMI
+ */
+ public static final String RMI_ID = "rmiId";
+
+ private String remote;
+
+ private int port;
+
+ private String rmiId;
+
+ /**
+ * Create a new {@link RemoteCollectorOutputFormat} instance. The remote and
+ * port for this output are by default localhost:8888 but can be configured
+ * via a {@link Configuration} object.
+ *
+ * @see RemoteCollectorOutputFormat#REMOTE
+ * @see RemoteCollectorOutputFormat#PORT
+ */
+ public RemoteCollectorOutputFormat() {
+ this("localhost", 8888, null);
+ }
+
+ /**
+ * Creates a new {@link RemoteCollectorOutputFormat} instance for the
+ * specified remote and port.
+ *
+ * @param rmiId
+ */
+ public RemoteCollectorOutputFormat(String remote, int port, String rmiId) {
+ super();
+ this.remote = remote;
+ this.port = port;
+ this.rmiId = rmiId;
+
+ if (this.remote == null) {
+ throw new IllegalStateException(String.format(
+ "No remote configured for %s.", this));
+ }
+
+ if (this.rmiId == null) {
+ throw new IllegalStateException(String.format(
+ "No registry ID configured for %s.", this));
+ }
+ }
+
+ @Override
+ /**
+ * This method receives the Configuration object, where the fields "remote" and "port" must be set.
+ */
+ public void configure(Configuration parameters) {
+ this.remote = parameters.getString(REMOTE, this.remote);
+ this.port = parameters.getInteger(PORT, this.port);
+ this.rmiId = parameters.getString(RMI_ID, this.rmiId);
+
+ if (this.remote == null) {
+ throw new IllegalStateException(String.format(
+ "No remote configured for %s.", this));
+ }
+
+ if (this.rmiId == null) {
+ throw new IllegalStateException(String.format(
+ "No registry ID configured for %s.", this));
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void open(int taskNumber, int numTasks) throws IOException {
+ // get the remote's RMI Registry
+ try {
+ registry = LocateRegistry.getRegistry(this.remote, this.port);
+ } catch (RemoteException e) {
+ throw new IllegalStateException(e);
+ }
+
+ // try to get an intance of an IRemoteCollector implementation
+ try {
+ this.remoteCollector = (RemoteCollector<T>) registry
+ .lookup(this.rmiId);
+ } catch (AccessException e) {
+ throw new IllegalStateException(e);
+ } catch (RemoteException e) {
+ throw new IllegalStateException(e);
+ } catch (NotBoundException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ /**
+ * This method forwards records simply to the remote's
+ * {@link RemoteCollector} implementation
+ */
+ @Override
+ public void writeRecord(T record) throws IOException {
+ remoteCollector.collect(record);
+ }
+
+ /**
+ * This method unbinds the reference of the implementation of
+ * {@link RemoteCollector}.
+ */
+ @Override
+ public void close() throws IOException {
+ }
+
+ @Override
+ public String toString() {
+ return "RemoteCollectorOutputFormat(" + remote + ":" + port + ", "
+ + rmiId + ")";
+ }
+
+}