You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/08/05 16:24:36 UTC

[2/4] flink git commit: [FLINK-1882] Removed RemotedCollector classes

[FLINK-1882] Removed RemotedCollector classes

This closes #985


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/100e8c5f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/100e8c5f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/100e8c5f

Branch: refs/heads/master
Commit: 100e8c5ff9f6d25b3d5db326a5f31b9c4432e334
Parents: 5546a1e
Author: zentol <s....@web.de>
Authored: Sat Jul 25 15:17:55 2015 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed Aug 5 14:46:35 2015 +0200

----------------------------------------------------------------------
 .../RemoteCollectorOutputFormatExample.java     | 114 ----------
 .../flink/api/java/io/RemoteCollector.java      |  46 ----
 .../api/java/io/RemoteCollectorConsumer.java    |  26 ---
 .../flink/api/java/io/RemoteCollectorImpl.java  | 228 -------------------
 .../java/io/RemoteCollectorOutputFormat.java    | 175 --------------
 5 files changed, 589 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/100e8c5f/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/misc/RemoteCollectorOutputFormatExample.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/misc/RemoteCollectorOutputFormatExample.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/misc/RemoteCollectorOutputFormatExample.java
deleted file mode 100644
index f524718..0000000
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/misc/RemoteCollectorOutputFormatExample.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.examples.java.misc;
-
-import java.util.HashSet;
-import java.util.Set;
-
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.aggregation.Aggregations;
-import org.apache.flink.api.java.io.RemoteCollectorConsumer;
-import org.apache.flink.api.java.io.RemoteCollectorImpl;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.util.Collector;
-
-/**
- * Implements the "WordCount" program that computes a simple word occurrence
- * histogram over some sample data and collects the results with an
- * implementation of a {@link RemoteCollectorConsumer}.
- */
-@SuppressWarnings("serial")
-public class RemoteCollectorOutputFormatExample {
-
-	public static void main(String[] args) throws Exception {
-
-		/**
-		 * We create a remote {@link ExecutionEnvironment} here, because this
-		 * OutputFormat is designed for use in a distributed setting. For local
-		 * use you should consider using the {@link LocalCollectionOutputFormat
-		 * <T>}.
-		 */
-		final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment("<remote>", 6124,
-				"/path/to/your/file.jar");
-
-		// get input data
-		DataSet<String> text = env.fromElements(
-				"To be, or not to be,--that is the question:--",
-				"Whether 'tis nobler in the mind to suffer",
-				"The slings and arrows of outrageous fortune",
-				"Or to take arms against a sea of troubles,");
-
-		DataSet<Tuple2<String, Integer>> counts =
-		// split up the lines in pairs (2-tuples) containing: (word,1)
-		text.flatMap(new LineSplitter())
-		// group by the tuple field "0" and sum up tuple field "1"
-				.groupBy(0).aggregate(Aggregations.SUM, 1);
-
-		// emit result
-		RemoteCollectorImpl.collectLocal(counts,
-				new RemoteCollectorConsumer<Tuple2<String, Integer>>() {
-					// user defined IRemoteCollectorConsumer
-					@Override
-					public void collect(Tuple2<String, Integer> element) {
-						System.out.println("word/occurrences:" + element);
-					}
-				});
-
-		// local collection to store results in
-		Set<Tuple2<String, Integer>> collection = new HashSet<Tuple2<String, Integer>>();
-		// collect results from remote in local collection
-		RemoteCollectorImpl.collectLocal(counts, collection);
-
-		// execute program
-		env.execute("WordCount Example with RemoteCollectorOutputFormat");
-
-		System.out.println(collection);
-		
-		RemoteCollectorImpl.shutdownAll();
-	}
-
-	//
-	// User Functions
-	//
-
-	/**
-	 * Implements the string tokenizer that splits sentences into words as a
-	 * user-defined FlatMapFunction. The function takes a line (String) and
-	 * splits it into multiple pairs in the form of "(word,1)" (Tuple2<String,
-	 * Integer>).
-	 */
-	public static final class LineSplitter implements
-			FlatMapFunction<String, Tuple2<String, Integer>> {
-
-		@Override
-		public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
-			// normalize and split the line
-			String[] tokens = value.toLowerCase().split("\\W+");
-
-			// emit the pairs
-			for (String token : tokens) {
-				if (token.length() > 0) {
-					out.collect(new Tuple2<String, Integer>(token, 1));
-				}
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/100e8c5f/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollector.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollector.java b/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollector.java
deleted file mode 100644
index bcfc332..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollector.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.io;
-
-import org.apache.flink.api.java.DataSet;
-
-import java.rmi.Remote;
-import java.rmi.RemoteException;
-
-/**
- * This interface is the counterpart to the {@link RemoteCollectorOutputFormat}
- * and implementations will receive remote results through the collect function.
- * 
- * @param <T>
- *            The type of the records the collector will receive
- *
- * @deprecated Results are retrieved through {@link org.apache.flink.api.common.accumulators.Accumulator}
- * and the {@link DataSet#collect()} method respectively.
- */
-@Deprecated
-public interface RemoteCollector<T> extends Remote {
-
-	public void collect(T element) throws RemoteException;
-
-	public RemoteCollectorConsumer<T> getConsumer() throws RemoteException;
-
-	public void setConsumer(RemoteCollectorConsumer<T> consumer)
-			throws RemoteException;
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/100e8c5f/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollectorConsumer.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollectorConsumer.java b/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollectorConsumer.java
deleted file mode 100644
index 439c6af..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollectorConsumer.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.io;
-
-/**
- * This interface describes consumers of {@link RemoteCollector} implementations.
- */
-public interface RemoteCollectorConsumer<T> {
-	public void collect(T element);
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/100e8c5f/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollectorImpl.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollectorImpl.java b/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollectorImpl.java
deleted file mode 100644
index 2d080ab..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollectorImpl.java
+++ /dev/null
@@ -1,228 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.io;
-
-import java.net.Inet4Address;
-import java.net.InetAddress;
-import java.net.NetworkInterface;
-import java.net.ServerSocket;
-import java.rmi.AccessException;
-import java.rmi.AlreadyBoundException;
-import java.rmi.NotBoundException;
-import java.rmi.Remote;
-import java.rmi.RemoteException;
-import java.rmi.registry.LocateRegistry;
-import java.rmi.registry.Registry;
-import java.rmi.server.UnicastRemoteObject;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Enumeration;
-import java.util.List;
-import java.util.UUID;
-
-import org.apache.flink.api.common.io.OutputFormat;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.operators.DataSink;
-
-/**
- * This class provides a counterpart implementation for the
- * {@link RemoteCollectorOutputFormat}.
- *
- * @deprecated Results are retrieved through {@link org.apache.flink.api.common.accumulators.Accumulator}
- * and the {@link DataSet#collect()} method respectively.
- */
-
-@Deprecated
-public class RemoteCollectorImpl<T> extends UnicastRemoteObject implements
-		RemoteCollector<T> {
-
-	private static final long serialVersionUID = 1L;
-
-	/**
-	 * Instance of an implementation of a {@link RemoteCollectorConsumer}. This
-	 * instance will get the records passed.
-	 */
-
-	private RemoteCollectorConsumer<T> consumer;
-	
-        /**
-         * This list stores all created {@link Registry}s to unbind and unexport all
-         * exposed {@link Remote} objects ({@link RemoteCollectorConsumer} in our
-         * case) in the shutdown phase.
-         */
-	private static List<Registry> registries = new ArrayList<Registry>();
-
-	/**
-	 * This factory method creates an instance of the
-	 * {@link RemoteCollectorImpl} and binds it in the local RMI
-	 * {@link Registry}.
-	 * 
-	 * @param port
-	 *            The port where the local colector is listening.
-	 * @param consumer
-	 *            The consumer instance.
-	 * @param rmiId 
-	 * 	          An ID to register the collector in the RMI registry.
-	 */
-	public static <T> void createAndBind(Integer port, RemoteCollectorConsumer<T> consumer, String rmiId) {
-		RemoteCollectorImpl<T> collectorInstance = null;
-
-		try {
-			collectorInstance = new RemoteCollectorImpl<T>();
-
-			Registry registry;
-
-			registry = LocateRegistry.createRegistry(port);
-			registry.bind(rmiId, collectorInstance);
-			
-			registries.add(registry);
-		} catch (RemoteException e) {
-			e.printStackTrace();
-		} catch (AlreadyBoundException e) {
-			e.printStackTrace();
-		}
-
-		collectorInstance.setConsumer(consumer);
-	}
-
-	/**
-	 * Writes a DataSet to a {@link RemoteCollectorConsumer} through an
-	 * {@link RemoteCollector} remotely called from the
-	 * {@link RemoteCollectorOutputFormat}.<br/>
-	 * 
-	 * @return The DataSink that writes the DataSet.
-	 */
-	public static <T> DataSink<T> collectLocal(DataSet<T> source,
-			RemoteCollectorConsumer<T> consumer) {
-		// if the RMI parameter was not set by the user make a "good guess"
-		String ip = System.getProperty("java.rmi.server.hostname");
-		if (ip == null) {
-			Enumeration<NetworkInterface> networkInterfaces = null;
-			try {
-				networkInterfaces = NetworkInterface.getNetworkInterfaces();
-			} catch (Throwable t) {
-				throw new RuntimeException(t);
-			}
-			while (networkInterfaces.hasMoreElements()) {
-				NetworkInterface networkInterface = (NetworkInterface) networkInterfaces
-						.nextElement();
-				Enumeration<InetAddress> inetAddresses = networkInterface
-						.getInetAddresses();
-				while (inetAddresses.hasMoreElements()) {
-					InetAddress inetAddress = (InetAddress) inetAddresses
-							.nextElement();
-					if (!inetAddress.isLoopbackAddress()
-							&& inetAddress instanceof Inet4Address) {
-						ip = inetAddress.getHostAddress();
-						System.setProperty("java.rmi.server.hostname", ip);
-					}
-				}
-			}
-		}
-
-		// get some random free port
-		Integer randomPort = 0;
-		try {
-			ServerSocket tmp = new ServerSocket(0);
-			randomPort = tmp.getLocalPort();
-			tmp.close();
-		} catch (Throwable t) {
-			throw new RuntimeException(t);
-		}
-
-		// create an ID for this output format instance
-		String rmiId = String.format("%s-%s", RemoteCollectorOutputFormat.class.getName(), UUID.randomUUID());
-		
-		// create the local listening object and bind it to the RMI registry
-		RemoteCollectorImpl.createAndBind(randomPort, consumer, rmiId);
-
-		// create and configure the output format
-		OutputFormat<T> remoteCollectorOutputFormat = new RemoteCollectorOutputFormat<T>(ip, randomPort, rmiId);
-
-		// create sink
-		return source.output(remoteCollectorOutputFormat);
-	}
-
-	/**
-	 * Writes a DataSet to a local {@link Collection} through an
-	 * {@link RemoteCollector} and a standard {@link RemoteCollectorConsumer}
-	 * implementation remotely called from the
-	 * {@link RemoteCollectorOutputFormat}.<br/>
-	 * 
-	 * @param source the source data set
-	 * @param collection the local collection
-	 */
-	public static <T> void collectLocal(DataSet<T> source,
-			Collection<T> collection) {
-		final Collection<T> synchronizedCollection = Collections
-				.synchronizedCollection(collection);
-		collectLocal(source, new RemoteCollectorConsumer<T>() {
-			@Override
-			public void collect(T element) {
-				synchronizedCollection.add(element);
-			}
-		});
-	}
-
-	/**
-	 * Necessary private default constructor.
-	 * 
-	 * @throws RemoteException
-	 */
-	private RemoteCollectorImpl() throws RemoteException {
-		super();
-	}
-
-	/**
-	 * This method is called by the remote to collect records.
-	 */
-	@Override
-	public void collect(T element) throws RemoteException {
-		this.consumer.collect(element);
-	}
-
-	@Override
-	public RemoteCollectorConsumer<T> getConsumer() {
-		return this.consumer;
-	}
-
-	@Override
-	public void setConsumer(RemoteCollectorConsumer<T> consumer) {
-		this.consumer = consumer;
-	}
-
-	/**
-	 * This method unbinds and unexports all exposed {@link Remote} objects
-	 * 
-	 * @throws AccessException
-	 * @throws RemoteException
-	 * @throws NotBoundException
-	 */
-	public static void shutdownAll() throws AccessException, RemoteException, NotBoundException {
-		for (Registry registry : registries) {
-			for (String id : registry.list()) {
-				Remote remote = registry.lookup(id);
-				registry.unbind(id);
-				UnicastRemoteObject.unexportObject(remote, true);
-			}
-
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/100e8c5f/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollectorOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollectorOutputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollectorOutputFormat.java
deleted file mode 100644
index 3fe5cef..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollectorOutputFormat.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.io;
-
-import java.io.IOException;
-import java.rmi.AccessException;
-import java.rmi.NotBoundException;
-import java.rmi.RemoteException;
-import java.rmi.registry.LocateRegistry;
-import java.rmi.registry.Registry;
-
-import org.apache.flink.api.common.io.OutputFormat;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.configuration.Configuration;
-
-/**
- * An output format that sends results through JAVA RMI to an
- * {@link RemoteCollector} implementation. The client has to provide an
- * implementation of {@link RemoteCollector} and has to write it's plan's output
- * into an instance of {@link RemoteCollectorOutputFormat}. Further in the
- * client's VM parameters -Djava.rmi.server.hostname should be set to the own IP
- * address.
- *
- * @deprecated Results are retrieved through {@link org.apache.flink.api.common.accumulators.Accumulator}
- * and the {@link DataSet#collect()} method respectively.
- */
-@Deprecated
-public class RemoteCollectorOutputFormat<T> implements OutputFormat<T> {
-
-	private static final long serialVersionUID = 1922744224032398102L;
-
-	/**
-	 * The reference of the {@link RemoteCollector} object
-	 */
-	private transient RemoteCollector<T> remoteCollector;
-
-	transient private Registry registry;
-
-	/**
-	 * Config parameter for the remote's port number
-	 */
-	public static final String PORT = "port";
-	/**
-	 * Config parameter for the remote's address
-	 */
-	public static final String REMOTE = "remote";
-	/**
-	 * An id used necessary for Java RMI
-	 */
-	public static final String RMI_ID = "rmiId";
-
-	private String remote;
-
-	private int port;
-
-	private String rmiId;
-
-	/**
-	 * Create a new {@link RemoteCollectorOutputFormat} instance. The remote and
-	 * port for this output are by default localhost:8888 but can be configured
-	 * via a {@link Configuration} object.
-	 * 
-	 * @see RemoteCollectorOutputFormat#REMOTE
-	 * @see RemoteCollectorOutputFormat#PORT
-	 */
-	public RemoteCollectorOutputFormat() {
-		this("localhost", 8888, null);
-	}
-
-	/**
-	 * Creates a new {@link RemoteCollectorOutputFormat} instance for the
-	 * specified remote and port.
-	 * 
-	 * @param rmiId
-	 */
-	public RemoteCollectorOutputFormat(String remote, int port, String rmiId) {
-		super();
-		this.remote = remote;
-		this.port = port;
-		this.rmiId = rmiId;
-		
-		if (this.remote == null) {
-			throw new IllegalStateException(String.format(
-					"No remote configured for %s.", this));
-		}
-
-		if (this.rmiId == null) {
-			throw new IllegalStateException(String.format(
-					"No registry ID configured for %s.", this));
-		}
-	}
-
-	@Override
-	/**
-	 * This method receives the Configuration object, where the fields "remote" and "port" must be set.
-	 */
-	public void configure(Configuration parameters) {
-		this.remote = parameters.getString(REMOTE, this.remote);
-		this.port = parameters.getInteger(PORT, this.port);
-		this.rmiId = parameters.getString(RMI_ID, this.rmiId);
-
-		if (this.remote == null) {
-			throw new IllegalStateException(String.format(
-					"No remote configured for %s.", this));
-		}
-
-		if (this.rmiId == null) {
-			throw new IllegalStateException(String.format(
-					"No registry ID configured for %s.", this));
-		}
-	}
-
-	@SuppressWarnings("unchecked")
-	@Override
-	public void open(int taskNumber, int numTasks) throws IOException {
-		// get the remote's RMI Registry
-		try {
-			registry = LocateRegistry.getRegistry(this.remote, this.port);
-		} catch (RemoteException e) {
-			throw new IllegalStateException(e);
-		}
-
-		// try to get an intance of an IRemoteCollector implementation
-		try {
-			this.remoteCollector = (RemoteCollector<T>) registry
-					.lookup(this.rmiId);
-		} catch (AccessException e) {
-			throw new IllegalStateException(e);
-		} catch (RemoteException e) {
-			throw new IllegalStateException(e);
-		} catch (NotBoundException e) {
-			throw new IllegalStateException(e);
-		}
-	}
-
-	/**
-	 * This method forwards records simply to the remote's
-	 * {@link RemoteCollector} implementation
-	 */
-	@Override
-	public void writeRecord(T record) throws IOException {
-		remoteCollector.collect(record);
-	}
-
-	/**
-	 * This method unbinds the reference of the implementation of
-	 * {@link RemoteCollector}.
-	 */
-	@Override
-	public void close() throws IOException {
-	}
-
-	@Override
-	public String toString() {
-		return "RemoteCollectorOutputFormat(" + remote + ":" + port + ", "
-				+ rmiId + ")";
-	}
-
-}