You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/08/05 16:24:36 UTC
[2/4] flink git commit: [FLINK-1882] Removed RemotedCollector classes
[FLINK-1882] Removed RemotedCollector classes
This closes #985
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/100e8c5f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/100e8c5f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/100e8c5f
Branch: refs/heads/master
Commit: 100e8c5ff9f6d25b3d5db326a5f31b9c4432e334
Parents: 5546a1e
Author: zentol <s....@web.de>
Authored: Sat Jul 25 15:17:55 2015 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed Aug 5 14:46:35 2015 +0200
----------------------------------------------------------------------
.../RemoteCollectorOutputFormatExample.java | 114 ----------
.../flink/api/java/io/RemoteCollector.java | 46 ----
.../api/java/io/RemoteCollectorConsumer.java | 26 ---
.../flink/api/java/io/RemoteCollectorImpl.java | 228 -------------------
.../java/io/RemoteCollectorOutputFormat.java | 175 --------------
5 files changed, 589 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/100e8c5f/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/misc/RemoteCollectorOutputFormatExample.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/misc/RemoteCollectorOutputFormatExample.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/misc/RemoteCollectorOutputFormatExample.java
deleted file mode 100644
index f524718..0000000
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/misc/RemoteCollectorOutputFormatExample.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.examples.java.misc;
-
-import java.util.HashSet;
-import java.util.Set;
-
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.aggregation.Aggregations;
-import org.apache.flink.api.java.io.RemoteCollectorConsumer;
-import org.apache.flink.api.java.io.RemoteCollectorImpl;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.util.Collector;
-
-/**
- * Implements the "WordCount" program that computes a simple word occurrence
- * histogram over some sample data and collects the results with an
- * implementation of a {@link RemoteCollectorConsumer}.
- */
-@SuppressWarnings("serial")
-public class RemoteCollectorOutputFormatExample {
-
- public static void main(String[] args) throws Exception {
-
- /**
- * We create a remote {@link ExecutionEnvironment} here, because this
- * OutputFormat is designed for use in a distributed setting. For local
- * use you should consider using the {@link LocalCollectionOutputFormat
- * <T>}.
- */
- final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment("<remote>", 6124,
- "/path/to/your/file.jar");
-
- // get input data
- DataSet<String> text = env.fromElements(
- "To be, or not to be,--that is the question:--",
- "Whether 'tis nobler in the mind to suffer",
- "The slings and arrows of outrageous fortune",
- "Or to take arms against a sea of troubles,");
-
- DataSet<Tuple2<String, Integer>> counts =
- // split up the lines in pairs (2-tuples) containing: (word,1)
- text.flatMap(new LineSplitter())
- // group by the tuple field "0" and sum up tuple field "1"
- .groupBy(0).aggregate(Aggregations.SUM, 1);
-
- // emit result
- RemoteCollectorImpl.collectLocal(counts,
- new RemoteCollectorConsumer<Tuple2<String, Integer>>() {
- // user defined IRemoteCollectorConsumer
- @Override
- public void collect(Tuple2<String, Integer> element) {
- System.out.println("word/occurrences:" + element);
- }
- });
-
- // local collection to store results in
- Set<Tuple2<String, Integer>> collection = new HashSet<Tuple2<String, Integer>>();
- // collect results from remote in local collection
- RemoteCollectorImpl.collectLocal(counts, collection);
-
- // execute program
- env.execute("WordCount Example with RemoteCollectorOutputFormat");
-
- System.out.println(collection);
-
- RemoteCollectorImpl.shutdownAll();
- }
-
- //
- // User Functions
- //
-
- /**
- * Implements the string tokenizer that splits sentences into words as a
- * user-defined FlatMapFunction. The function takes a line (String) and
- * splits it into multiple pairs in the form of "(word,1)" (Tuple2<String,
- * Integer>).
- */
- public static final class LineSplitter implements
- FlatMapFunction<String, Tuple2<String, Integer>> {
-
- @Override
- public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
- // normalize and split the line
- String[] tokens = value.toLowerCase().split("\\W+");
-
- // emit the pairs
- for (String token : tokens) {
- if (token.length() > 0) {
- out.collect(new Tuple2<String, Integer>(token, 1));
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/100e8c5f/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollector.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollector.java b/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollector.java
deleted file mode 100644
index bcfc332..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollector.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.io;
-
-import org.apache.flink.api.java.DataSet;
-
-import java.rmi.Remote;
-import java.rmi.RemoteException;
-
-/**
- * This interface is the counterpart to the {@link RemoteCollectorOutputFormat}
- * and implementations will receive remote results through the collect function.
- *
- * @param <T>
- * The type of the records the collector will receive
- *
- * @deprecated Results are retrieved through {@link org.apache.flink.api.common.accumulators.Accumulator}
- * and the {@link DataSet#collect()} method respectively.
- */
-@Deprecated
-public interface RemoteCollector<T> extends Remote {
-
- public void collect(T element) throws RemoteException;
-
- public RemoteCollectorConsumer<T> getConsumer() throws RemoteException;
-
- public void setConsumer(RemoteCollectorConsumer<T> consumer)
- throws RemoteException;
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/100e8c5f/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollectorConsumer.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollectorConsumer.java b/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollectorConsumer.java
deleted file mode 100644
index 439c6af..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollectorConsumer.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.io;
-
-/**
- * This interface describes consumers of {@link RemoteCollector} implementations.
- */
-public interface RemoteCollectorConsumer<T> {
- public void collect(T element);
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/100e8c5f/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollectorImpl.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollectorImpl.java b/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollectorImpl.java
deleted file mode 100644
index 2d080ab..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollectorImpl.java
+++ /dev/null
@@ -1,228 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.io;
-
-import java.net.Inet4Address;
-import java.net.InetAddress;
-import java.net.NetworkInterface;
-import java.net.ServerSocket;
-import java.rmi.AccessException;
-import java.rmi.AlreadyBoundException;
-import java.rmi.NotBoundException;
-import java.rmi.Remote;
-import java.rmi.RemoteException;
-import java.rmi.registry.LocateRegistry;
-import java.rmi.registry.Registry;
-import java.rmi.server.UnicastRemoteObject;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Enumeration;
-import java.util.List;
-import java.util.UUID;
-
-import org.apache.flink.api.common.io.OutputFormat;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.operators.DataSink;
-
-/**
- * This class provides a counterpart implementation for the
- * {@link RemoteCollectorOutputFormat}.
- *
- * @deprecated Results are retrieved through {@link org.apache.flink.api.common.accumulators.Accumulator}
- * and the {@link DataSet#collect()} method respectively.
- */
-
-@Deprecated
-public class RemoteCollectorImpl<T> extends UnicastRemoteObject implements
- RemoteCollector<T> {
-
- private static final long serialVersionUID = 1L;
-
- /**
- * Instance of an implementation of a {@link RemoteCollectorConsumer}. This
- * instance will get the records passed.
- */
-
- private RemoteCollectorConsumer<T> consumer;
-
- /**
- * This list stores all created {@link Registry}s to unbind and unexport all
- * exposed {@link Remote} objects ({@link RemoteCollectorConsumer} in our
- * case) in the shutdown phase.
- */
- private static List<Registry> registries = new ArrayList<Registry>();
-
- /**
- * This factory method creates an instance of the
- * {@link RemoteCollectorImpl} and binds it in the local RMI
- * {@link Registry}.
- *
- * @param port
- * The port where the local colector is listening.
- * @param consumer
- * The consumer instance.
- * @param rmiId
- * An ID to register the collector in the RMI registry.
- */
- public static <T> void createAndBind(Integer port, RemoteCollectorConsumer<T> consumer, String rmiId) {
- RemoteCollectorImpl<T> collectorInstance = null;
-
- try {
- collectorInstance = new RemoteCollectorImpl<T>();
-
- Registry registry;
-
- registry = LocateRegistry.createRegistry(port);
- registry.bind(rmiId, collectorInstance);
-
- registries.add(registry);
- } catch (RemoteException e) {
- e.printStackTrace();
- } catch (AlreadyBoundException e) {
- e.printStackTrace();
- }
-
- collectorInstance.setConsumer(consumer);
- }
-
- /**
- * Writes a DataSet to a {@link RemoteCollectorConsumer} through an
- * {@link RemoteCollector} remotely called from the
- * {@link RemoteCollectorOutputFormat}.<br/>
- *
- * @return The DataSink that writes the DataSet.
- */
- public static <T> DataSink<T> collectLocal(DataSet<T> source,
- RemoteCollectorConsumer<T> consumer) {
- // if the RMI parameter was not set by the user make a "good guess"
- String ip = System.getProperty("java.rmi.server.hostname");
- if (ip == null) {
- Enumeration<NetworkInterface> networkInterfaces = null;
- try {
- networkInterfaces = NetworkInterface.getNetworkInterfaces();
- } catch (Throwable t) {
- throw new RuntimeException(t);
- }
- while (networkInterfaces.hasMoreElements()) {
- NetworkInterface networkInterface = (NetworkInterface) networkInterfaces
- .nextElement();
- Enumeration<InetAddress> inetAddresses = networkInterface
- .getInetAddresses();
- while (inetAddresses.hasMoreElements()) {
- InetAddress inetAddress = (InetAddress) inetAddresses
- .nextElement();
- if (!inetAddress.isLoopbackAddress()
- && inetAddress instanceof Inet4Address) {
- ip = inetAddress.getHostAddress();
- System.setProperty("java.rmi.server.hostname", ip);
- }
- }
- }
- }
-
- // get some random free port
- Integer randomPort = 0;
- try {
- ServerSocket tmp = new ServerSocket(0);
- randomPort = tmp.getLocalPort();
- tmp.close();
- } catch (Throwable t) {
- throw new RuntimeException(t);
- }
-
- // create an ID for this output format instance
- String rmiId = String.format("%s-%s", RemoteCollectorOutputFormat.class.getName(), UUID.randomUUID());
-
- // create the local listening object and bind it to the RMI registry
- RemoteCollectorImpl.createAndBind(randomPort, consumer, rmiId);
-
- // create and configure the output format
- OutputFormat<T> remoteCollectorOutputFormat = new RemoteCollectorOutputFormat<T>(ip, randomPort, rmiId);
-
- // create sink
- return source.output(remoteCollectorOutputFormat);
- }
-
- /**
- * Writes a DataSet to a local {@link Collection} through an
- * {@link RemoteCollector} and a standard {@link RemoteCollectorConsumer}
- * implementation remotely called from the
- * {@link RemoteCollectorOutputFormat}.<br/>
- *
- * @param source the source data set
- * @param collection the local collection
- */
- public static <T> void collectLocal(DataSet<T> source,
- Collection<T> collection) {
- final Collection<T> synchronizedCollection = Collections
- .synchronizedCollection(collection);
- collectLocal(source, new RemoteCollectorConsumer<T>() {
- @Override
- public void collect(T element) {
- synchronizedCollection.add(element);
- }
- });
- }
-
- /**
- * Necessary private default constructor.
- *
- * @throws RemoteException
- */
- private RemoteCollectorImpl() throws RemoteException {
- super();
- }
-
- /**
- * This method is called by the remote to collect records.
- */
- @Override
- public void collect(T element) throws RemoteException {
- this.consumer.collect(element);
- }
-
- @Override
- public RemoteCollectorConsumer<T> getConsumer() {
- return this.consumer;
- }
-
- @Override
- public void setConsumer(RemoteCollectorConsumer<T> consumer) {
- this.consumer = consumer;
- }
-
- /**
- * This method unbinds and unexports all exposed {@link Remote} objects
- *
- * @throws AccessException
- * @throws RemoteException
- * @throws NotBoundException
- */
- public static void shutdownAll() throws AccessException, RemoteException, NotBoundException {
- for (Registry registry : registries) {
- for (String id : registry.list()) {
- Remote remote = registry.lookup(id);
- registry.unbind(id);
- UnicastRemoteObject.unexportObject(remote, true);
- }
-
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/100e8c5f/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollectorOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollectorOutputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollectorOutputFormat.java
deleted file mode 100644
index 3fe5cef..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollectorOutputFormat.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.io;
-
-import java.io.IOException;
-import java.rmi.AccessException;
-import java.rmi.NotBoundException;
-import java.rmi.RemoteException;
-import java.rmi.registry.LocateRegistry;
-import java.rmi.registry.Registry;
-
-import org.apache.flink.api.common.io.OutputFormat;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.configuration.Configuration;
-
-/**
- * An output format that sends results through JAVA RMI to an
- * {@link RemoteCollector} implementation. The client has to provide an
- * implementation of {@link RemoteCollector} and has to write it's plan's output
- * into an instance of {@link RemoteCollectorOutputFormat}. Further in the
- * client's VM parameters -Djava.rmi.server.hostname should be set to the own IP
- * address.
- *
- * @deprecated Results are retrieved through {@link org.apache.flink.api.common.accumulators.Accumulator}
- * and the {@link DataSet#collect()} method respectively.
- */
-@Deprecated
-public class RemoteCollectorOutputFormat<T> implements OutputFormat<T> {
-
- private static final long serialVersionUID = 1922744224032398102L;
-
- /**
- * The reference of the {@link RemoteCollector} object
- */
- private transient RemoteCollector<T> remoteCollector;
-
- transient private Registry registry;
-
- /**
- * Config parameter for the remote's port number
- */
- public static final String PORT = "port";
- /**
- * Config parameter for the remote's address
- */
- public static final String REMOTE = "remote";
- /**
- * An id used necessary for Java RMI
- */
- public static final String RMI_ID = "rmiId";
-
- private String remote;
-
- private int port;
-
- private String rmiId;
-
- /**
- * Create a new {@link RemoteCollectorOutputFormat} instance. The remote and
- * port for this output are by default localhost:8888 but can be configured
- * via a {@link Configuration} object.
- *
- * @see RemoteCollectorOutputFormat#REMOTE
- * @see RemoteCollectorOutputFormat#PORT
- */
- public RemoteCollectorOutputFormat() {
- this("localhost", 8888, null);
- }
-
- /**
- * Creates a new {@link RemoteCollectorOutputFormat} instance for the
- * specified remote and port.
- *
- * @param rmiId
- */
- public RemoteCollectorOutputFormat(String remote, int port, String rmiId) {
- super();
- this.remote = remote;
- this.port = port;
- this.rmiId = rmiId;
-
- if (this.remote == null) {
- throw new IllegalStateException(String.format(
- "No remote configured for %s.", this));
- }
-
- if (this.rmiId == null) {
- throw new IllegalStateException(String.format(
- "No registry ID configured for %s.", this));
- }
- }
-
- @Override
- /**
- * This method receives the Configuration object, where the fields "remote" and "port" must be set.
- */
- public void configure(Configuration parameters) {
- this.remote = parameters.getString(REMOTE, this.remote);
- this.port = parameters.getInteger(PORT, this.port);
- this.rmiId = parameters.getString(RMI_ID, this.rmiId);
-
- if (this.remote == null) {
- throw new IllegalStateException(String.format(
- "No remote configured for %s.", this));
- }
-
- if (this.rmiId == null) {
- throw new IllegalStateException(String.format(
- "No registry ID configured for %s.", this));
- }
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public void open(int taskNumber, int numTasks) throws IOException {
- // get the remote's RMI Registry
- try {
- registry = LocateRegistry.getRegistry(this.remote, this.port);
- } catch (RemoteException e) {
- throw new IllegalStateException(e);
- }
-
- // try to get an intance of an IRemoteCollector implementation
- try {
- this.remoteCollector = (RemoteCollector<T>) registry
- .lookup(this.rmiId);
- } catch (AccessException e) {
- throw new IllegalStateException(e);
- } catch (RemoteException e) {
- throw new IllegalStateException(e);
- } catch (NotBoundException e) {
- throw new IllegalStateException(e);
- }
- }
-
- /**
- * This method forwards records simply to the remote's
- * {@link RemoteCollector} implementation
- */
- @Override
- public void writeRecord(T record) throws IOException {
- remoteCollector.collect(record);
- }
-
- /**
- * This method unbinds the reference of the implementation of
- * {@link RemoteCollector}.
- */
- @Override
- public void close() throws IOException {
- }
-
- @Override
- public String toString() {
- return "RemoteCollectorOutputFormat(" + remote + ":" + port + ", "
- + rmiId + ")";
- }
-
-}