You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/04/07 18:17:12 UTC

[1/2] flink git commit: [runtime] Remove old unused classes

Repository: flink
Updated Branches:
  refs/heads/master 4363f9884 -> 4b89855aa


[runtime] Remove old unused classes


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/09bd1f89
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/09bd1f89
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/09bd1f89

Branch: refs/heads/master
Commit: 09bd1f89f4fc70318250463e653cf862d07c1b28
Parents: 4363f98
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Apr 7 11:01:58 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Apr 7 18:13:00 2015 +0200

----------------------------------------------------------------------
 .../apache/flink/runtime/util/EnumUtils.java    | 79 --------------------
 .../runtime/util/UnmodifiableIterator.java      | 72 ------------------
 2 files changed, 151 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/09bd1f89/flink-runtime/src/main/java/org/apache/flink/runtime/util/EnumUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/EnumUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/EnumUtils.java
deleted file mode 100644
index 3512e13..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/EnumUtils.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.util;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.flink.types.StringValue;
-
-/**
- * Auxiliary class to (de)serialize enumeration values.
- */
-public final class EnumUtils {
-
-	/**
-	 * Private constructor to overwrite public one.
-	 */
-	private EnumUtils() {
-	}
-
-	/**
-	 * Reads a value from the given enumeration from the specified input stream.
-	 * 
-	 * @param <T>
-	 *        the type of the enumeration
-	 * @param in
-	 *        the input stream to read from
-	 * @param enumType
-	 *        the class of the enumeration
-	 * @return the value of the given enumeration read from the input stream
-	 * @throws IOException
-	 *         thrown if any error occurred while reading data from the stream
-	 */
-	public static <T extends Enum<T>> T readEnum(final DataInput in, final Class<T> enumType) throws IOException {
-
-		if (!in.readBoolean()) {
-			return null;
-		}
-
-		return T.valueOf(enumType, StringValue.readString(in));
-	}
-
-	/**
-	 * Writes a value of an enumeration to the given output stream.
-	 * 
-	 * @param out
-	 *        the output stream to write to
-	 * @param enumVal
-	 *        the value of a enumeration to be written to the output stream
-	 * @throws IOException
-	 *         thrown if any error occurred while writing data to the stream
-	 */
-	public static void writeEnum(final DataOutput out, final Enum<?> enumVal) throws IOException {
-
-		if (enumVal == null) {
-			out.writeBoolean(false);
-		} else {
-			out.writeBoolean(true);
-			StringValue.writeString(enumVal.name(), out);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/09bd1f89/flink-runtime/src/main/java/org/apache/flink/runtime/util/UnmodifiableIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/UnmodifiableIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/UnmodifiableIterator.java
deleted file mode 100644
index caa36c5..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/UnmodifiableIterator.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.util;
-
-import java.util.Iterator;
-
-/**
- * An auxiliary implementation of an iterator which protects the underlying collection from being modified. As a result,
- * calling the remove method on this iterator will result in an {@link UnsupportedOperationException}.
- * <p>
- * This class is thread-safe.
- * 
- * @param <T>
- *        the type of the encapsulated iterator
- */
-public final class UnmodifiableIterator<T> implements Iterator<T> {
-
-	/**
-	 * The encapsulated iterator.
-	 */
-	private final Iterator<T> encapsulatedIterator;
-
-	/**
-	 * Constructs a new unmodifiable iterator.
-	 * 
-	 * @param encapsulatedIterator
-	 *        the encapsulated iterator
-	 */
-	public UnmodifiableIterator(final Iterator<T> encapsulatedIterator) {
-
-		this.encapsulatedIterator = encapsulatedIterator;
-	}
-
-
-	@Override
-	public boolean hasNext() {
-
-		return this.encapsulatedIterator.hasNext();
-	}
-
-
-	@Override
-	public T next() {
-
-		return this.encapsulatedIterator.next();
-	}
-
-
-	@Override
-	public void remove() {
-
-		throw new UnsupportedOperationException("Calling the remove method on this iterator is not allowed");
-	}
-
-}


[2/2] flink git commit: [runtime] Fix TaskManager's BLOB service host lookup when connecting to the JobManager

Posted by se...@apache.org.
[runtime] Fix TaskManager's BLOB service host lookup when connecting to the JobManager


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4b89855a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4b89855a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4b89855a

Branch: refs/heads/master
Commit: 4b89855aaab50ec785a0c5e0e19124f8f9ea9440
Parents: 09bd1f8
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Apr 7 17:40:01 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Apr 7 18:13:01 2015 +0200

----------------------------------------------------------------------
 .../flink/runtime/taskmanager/TaskManager.scala      | 15 ++++++++++++---
 .../flink/runtime/taskmanager/TaskManagerTest.java   |  2 +-
 2 files changed, 13 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4b89855a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 7d60c00..d6b91ec 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -608,6 +608,16 @@ extends Actor with ActorLogMessages with ActorLogging {
                                       id: InstanceID,
                                       blobPort: Int): Unit = {
 
+    if (jobManager == null) {
+      throw new NullPointerException("jobManager may not be null")
+    }
+    if (id == null) {
+      throw new NullPointerException("instance ID may not be null")
+    }
+    if (blobPort <= 0 || blobPort > 65535) {
+      throw new IllegalArgumentException("blob port is out of range: " + blobPort)
+    }
+
     // sanity check that we are not currently registered with a different JobManager
     if (isConnected) {
       if (currentJobManager.get == jobManager) {
@@ -644,9 +654,8 @@ extends Actor with ActorLogMessages with ActorLogging {
 
     // start a blob service, if a blob server is specified
     if (blobPort > 0) {
-      val address = new InetSocketAddress(
-        currentJobManager.flatMap(_.path.address.host).getOrElse("localhost"),
-        blobPort)
+      val jmHost = jobManager.path.address.host.getOrElse("localhost")
+      val address = new InetSocketAddress(jmHost, blobPort)
 
       LOG.info("Determined BLOB server address to be {}. Starting BLOB cache.", address)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4b89855a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index e736a55..760b14e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -566,7 +566,7 @@ public class TaskManagerTest {
 			if (message instanceof RegistrationMessages.RegisterTaskManager) {
 				final InstanceID iid = new InstanceID();
 				final ActorRef self = getSelf();
-				getSender().tell(new RegistrationMessages.AcknowledgeRegistration(self, iid, -1), self);
+				getSender().tell(new RegistrationMessages.AcknowledgeRegistration(self, iid, 12345), self);
 			}
 			else if(message instanceof TaskMessages.UpdateTaskExecutionState){
 				getSender().tell(true, getSelf());