You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/07/01 08:01:36 UTC

[1/5] flink git commit: [FLINK-6655] Add validateAndNormalizeUri method to MemoryArchivist

Repository: flink
Updated Branches:
  refs/heads/release-1.3 db7f0ffd4 -> d02e688e7


[FLINK-6655] Add validateAndNormalizeUri method to MemoryArchivist


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d02e688e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d02e688e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d02e688e

Branch: refs/heads/release-1.3
Commit: d02e688e72c3d7a9a91a85f5816209a5d8d9aa60
Parents: c0ad469
Author: zhangminglei <zm...@163.com>
Authored: Tue Jun 27 21:26:18 2017 +0800
Committer: zentol <ch...@apache.org>
Committed: Fri Jun 30 08:51:49 2017 +0200

----------------------------------------------------------------------
 .../runtime/jobmanager/MemoryArchivist.scala    | 66 ++++++++++++++++----
 1 file changed, 55 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d02e688e/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
index d83f2cd..327e2a3 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
@@ -18,13 +18,14 @@
 
 package org.apache.flink.runtime.jobmanager
 
+import java.io.IOException
+import java.net.URI
 import java.util
 
 import akka.actor.ActorRef
 import grizzled.slf4j.Logger
 import org.apache.flink.api.common.JobID
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.core.fs.Path
+import org.apache.flink.core.fs.{FileSystem, Path}
 import org.apache.flink.runtime.jobgraph.JobStatus
 import org.apache.flink.runtime.messages.accumulators._
 import org.apache.flink.runtime.webmonitor.WebMonitorUtils
@@ -34,7 +35,6 @@ import org.apache.flink.runtime.executiongraph.{ArchivedExecutionGraph, Executio
 import org.apache.flink.runtime.history.FsJobArchivist
 import org.apache.flink.runtime.messages.ArchiveMessages._
 import org.apache.flink.runtime.messages.JobManagerMessages._
-import org.apache.flink.runtime.state.filesystem.FsStateBackend
 
 import scala.collection.mutable
 import scala.concurrent.future
@@ -86,7 +86,7 @@ class MemoryArchivist(
   }
 
   override def handleMessage: Receive = {
-    
+
     /* Receive Execution Graph to archive */
     case ArchiveExecutionGraph(jobID, graph) =>
       // Keep lru order in case we override a graph (from multiple job submission in one session).
@@ -109,7 +109,7 @@ class MemoryArchivist(
       trimHistory()
 
     case msg : InfoMessage => handleWebServerInfoMessage(msg, sender())
-      
+
     case RequestArchivedJob(jobID: JobID) =>
       val graph = graphs.get(jobID)
       sender ! decorateMessage(ArchivedJob(graph))
@@ -165,7 +165,7 @@ class MemoryArchivist(
     throw new RuntimeException("Received unknown message " + message)
   }
 
-  
+
   private def handleWebServerInfoMessage(message: InfoMessage, theSender: ActorRef): Unit = {
     message match {
       case _ : RequestJobsOverview =>
@@ -175,7 +175,7 @@ class MemoryArchivist(
         catch {
           case t: Throwable => log.error("Exception while creating the jobs overview", t)
         }
-  
+
       case _ : RequestJobsWithIDsOverview =>
         try {
           sender ! decorateMessage(createJobsWithIDsOverview())
@@ -188,7 +188,7 @@ class MemoryArchivist(
         val details = graphs.values.map {
           v => WebMonitorUtils.createDetailsForJob(v)
         }.toArray[JobDetails]
-        
+
         theSender ! decorateMessage(new MultipleJobsDetails(null, details))
     }
   }
@@ -198,7 +198,7 @@ class MemoryArchivist(
     // so we aren't archiving it yet.
     if (archivePath.isDefined && graph.getState.isGloballyTerminalState) {
       try {
-        val p = FsStateBackend.validateAndNormalizeUri(archivePath.get.toUri)
+        val p = validateAndNormalizeUri(archivePath.get.toUri)
         future {
           try {
             FsJobArchivist.archiveJob(p, graph)
@@ -217,7 +217,7 @@ class MemoryArchivist(
   // --------------------------------------------------------------------------
   //  Request Responses
   // --------------------------------------------------------------------------
-  
+
   private def createJobsOverview() : JobsOverview = {
     new JobsOverview(0, finishedCnt, canceledCnt, failedCnt)
   }
@@ -239,7 +239,7 @@ class MemoryArchivist(
 
     new JobsWithIDsOverview(runningOrPending, finished, canceled, failed)
   }
-  
+
   // --------------------------------------------------------------------------
   //  Utilities
   // --------------------------------------------------------------------------
@@ -255,4 +255,48 @@ class MemoryArchivist(
       graphs.remove(jobID)
     }
   }
+
+  /**
+    * Checks and normalizes the archive path URI. This method first checks the validity of the
+    * URI (scheme, path, availability of a matching file system) and then normalizes the URL
+    * to a path.
+    *
+    * If the URI does not include an authority, but the file system configured for the URI has an
+    * authority, then the normalized path will include this authority.
+    *
+    * @param archivePathUri The URI to check and normalize.
+    * @return a normalized URI as a Path.
+    *
+    * @throws IllegalArgumentException Thrown, if the URI misses schema or path.
+    * @throws IOException Thrown, if no file system can be found for the URI's scheme.
+    */
+  @throws[IOException]
+  private def validateAndNormalizeUri(archivePathUri: URI): Path = {
+    val scheme = archivePathUri.getScheme
+    val path = archivePathUri.getPath
+
+    // some validity checks
+    if (scheme == null) {
+      throw new IllegalArgumentException("The scheme (hdfs://, file://, etc) is null. " +
+        "Please specify the file system scheme explicitly in the URI: " + archivePathUri)
+    }
+
+    if (path == null) {
+      throw new IllegalArgumentException("The path to store the job archives is null. " +
+        "Please specify a directory path for storing job archives. and the URI is: " +
+        archivePathUri)
+    }
+
+    if (path.length == 0 || path == "/") {
+      throw new IllegalArgumentException("Cannot use the root directory for storing job archives.")
+    }
+
+    if (!FileSystem.isFlinkSupportedScheme(archivePathUri.getScheme)) {
+      // skip verification checks for non-flink supported filesystem
+      // this is because the required filesystem classes may not be available to the flink client
+      throw new IllegalArgumentException("No file system found with scheme " + scheme
+        + ", referenced in file URI '" + archivePathUri.toString + "'.")
+    }
+    new Path(archivePathUri)
+  }
 }


[2/5] flink git commit: [FLINK-6742] Add eager checks for parallelism/chain-length change

Posted by ch...@apache.org.
[FLINK-6742] Add eager checks for parallelism/chain-length change


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c65317dc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c65317dc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c65317dc

Branch: refs/heads/release-1.3
Commit: c65317dc9619f2a5459c39278b2109137e94d79f
Parents: 1d2c615
Author: zentol <ch...@apache.org>
Authored: Mon Jun 26 13:38:54 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Fri Jun 30 08:51:49 2017 +0200

----------------------------------------------------------------------
 .../runtime/checkpoint/savepoint/SavepointV2.java      | 13 +++++++++++++
 1 file changed, 13 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c65317dc/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java
index 5e46f93..bd364a2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.state.ChainedStateHandle;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.util.Preconditions;
 
 import java.util.Collection;
 import java.util.Collections;
@@ -178,6 +179,18 @@ public class SavepointV2 implements Savepoint {
 
 			List<OperatorID> operatorIDs = jobVertex.getOperatorIDs();
 
+			Preconditions.checkArgument(
+				jobVertex.getParallelism() == taskState.getParallelism(),
+				"Detected change in parallelism during migration for task " + jobVertex.getJobVertexId() +"." +
+					"When migrating a savepoint from a version < 1.3 please make sure that no changes were made " +
+					"to the parallelism of stateful operators.");
+
+			Preconditions.checkArgument(
+				operatorIDs.size() == taskState.getChainLength(),
+				"Detected change in chain length during migration for task " + jobVertex.getJobVertexId() +". " +
+					"When migrating a savepoint from a version < 1.3 please make sure that the topology was not " +
+					"changed by modification of a chain containing a stateful operator.");
+
 			for (int subtaskIndex = 0; subtaskIndex < jobVertex.getParallelism(); subtaskIndex++) {
 				SubtaskState subtaskState;
 				try {


[4/5] flink git commit: [FLINK-6987] Fix TextInputFormatTest for paths with spaces

Posted by ch...@apache.org.
[FLINK-6987] Fix TextInputFormatTest for paths with spaces


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c0ad4699
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c0ad4699
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c0ad4699

Branch: refs/heads/release-1.3
Commit: c0ad4699af12454f54ff1dd977c6fc562f09a304
Parents: c65317d
Author: zhangminglei <zm...@163.com>
Authored: Tue Jun 27 11:34:37 2017 +0800
Committer: zentol <ch...@apache.org>
Committed: Fri Jun 30 08:51:49 2017 +0200

----------------------------------------------------------------------
 .../java/org/apache/flink/api/java/io/TextInputFormatTest.java     | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c0ad4699/flink-java/src/test/java/org/apache/flink/api/java/io/TextInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/TextInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/TextInputFormatTest.java
index 4a52eea..6bff9db 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/io/TextInputFormatTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/io/TextInputFormatTest.java
@@ -108,7 +108,7 @@ public class TextInputFormatTest {
 			}
 			File parentDir = new File("tmp");
 
-			TextInputFormat inputFormat = new TextInputFormat(new Path(parentDir.toURI().toString()));
+			TextInputFormat inputFormat = new TextInputFormat(new Path(parentDir.toURI()));
 			inputFormat.setNestedFileEnumeration(true);
 			inputFormat.setNumLineSamples(10);
 


[3/5] flink git commit: [FLINK-5488] Close YarnClient on error in AbstractYarnClusterDescriptor

Posted by ch...@apache.org.
[FLINK-5488] Close YarnClient on error in AbstractYarnClusterDescriptor


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/17c5de5a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/17c5de5a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/17c5de5a

Branch: refs/heads/release-1.3
Commit: 17c5de5a35b4fa72e3636539dfec2cd9135ef91d
Parents: db7f0ff
Author: zjureel <zj...@gmail.com>
Authored: Wed May 31 13:13:34 2017 +0800
Committer: zentol <ch...@apache.org>
Committed: Fri Jun 30 08:51:49 2017 +0200

----------------------------------------------------------------------
 .../flink/yarn/AbstractYarnClusterDescriptor.java    | 15 +++++++++++++--
 1 file changed, 13 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/17c5de5a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index b9a4416..bfb9625 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -385,6 +385,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 	@Override
 	public YarnClusterClient retrieve(String applicationID) {
 
+		YarnClient yarnClient = null;
 		try {
 			// check if required Hadoop environment variables are set. If not, warn user
 			if (System.getenv("HADOOP_CONF_DIR") == null &&
@@ -395,7 +396,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 			}
 
 			final ApplicationId yarnAppId = ConverterUtils.toApplicationId(applicationID);
-			final YarnClient yarnClient = getYarnClient();
+			yarnClient = getYarnClient();
 			final ApplicationReport appReport = yarnClient.getApplicationReport(yarnAppId);
 
 			if (appReport.getFinalApplicationStatus() != FinalApplicationStatus.UNDEFINED) {
@@ -413,6 +414,9 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 
 			return createYarnClusterClient(this, yarnClient, appReport, flinkConfiguration, false);
 		} catch (Exception e) {
+			if (null != yarnClient) {
+				yarnClient.stop();
+			}
 			throw new RuntimeException("Couldn't retrieve Yarn cluster", e);
 		}
 	}
@@ -533,7 +537,14 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 			"The allocation might take more time than usual because the Flink YARN client needs to wait until " +
 			"the resources become available.";
 		int totalMemoryRequired = jobManagerMemoryMb + taskManagerMemoryMb * taskManagerCount;
-		ClusterResourceDescription freeClusterMem = getCurrentFreeClusterResources(yarnClient);
+		ClusterResourceDescription freeClusterMem;
+		try {
+			freeClusterMem = getCurrentFreeClusterResources(yarnClient);
+		} catch (YarnException | IOException e) {
+			failSessionDuringDeployment(yarnClient, yarnApplication);
+			throw new YarnDeploymentException("Could not retrieve information about free cluster resources.", e);
+		}
+
 		if(freeClusterMem.totalFreeMemory < totalMemoryRequired) {
 			LOG.warn("This YARN session requires " + totalMemoryRequired + "MB of memory in the cluster. "
 				+ "There are currently only " + freeClusterMem.totalFreeMemory + "MB available." + NOTE_RSC);


[5/5] flink git commit: [FLINK-6898] [metrics] Limit size of operator component in metric name

Posted by ch...@apache.org.
[FLINK-6898] [metrics] Limit size of operator component in metric name


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1d2c6158
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1d2c6158
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1d2c6158

Branch: refs/heads/release-1.3
Commit: 1d2c615853ffa627a61de6c9880c53c3b00f1e31
Parents: 17c5de5
Author: zentol <ch...@apache.org>
Authored: Mon Jun 12 15:36:25 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Fri Jun 30 08:51:49 2017 +0200

----------------------------------------------------------------------
 .../metrics/groups/AbstractMetricGroup.java      |  2 +-
 .../runtime/metrics/groups/TaskMetricGroup.java  |  6 ++++++
 .../metrics/groups/TaskMetricGroupTest.java      | 19 +++++++++++++++++++
 3 files changed, 26 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1d2c6158/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
index a19970d..cf8a89c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
@@ -63,7 +63,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 public abstract class AbstractMetricGroup<A extends AbstractMetricGroup<?>> implements MetricGroup {
 
 	/** shared logger */
-	private static final Logger LOG = LoggerFactory.getLogger(MetricGroup.class);
+	protected static final Logger LOG = LoggerFactory.getLogger(MetricGroup.class);
 
 	// ------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1d2c6158/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java
index 43e8e1b..dfc0107 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java
@@ -41,6 +41,8 @@ public class TaskMetricGroup extends ComponentMetricGroup<TaskManagerJobMetricGr
 
 	private final Map<String, OperatorMetricGroup> operators = new HashMap<>();
 
+	static final int METRICS_OPERATOR_NAME_MAX_LENGTH = 80;
+
 	private final TaskIOMetricGroup ioMetrics;
 	
 	/** The execution Id uniquely identifying the executed task represented by this metrics group */
@@ -130,6 +132,10 @@ public class TaskMetricGroup extends ComponentMetricGroup<TaskManagerJobMetricGr
 	// ------------------------------------------------------------------------
 
 	public OperatorMetricGroup addOperator(String name) {
+		if (name != null && name.length() > METRICS_OPERATOR_NAME_MAX_LENGTH) {
+			LOG.warn("The operator name {} exceeded the {} characters length limit and was truncated.", name, METRICS_OPERATOR_NAME_MAX_LENGTH);
+			name = name.substring(0, METRICS_OPERATOR_NAME_MAX_LENGTH);
+		}
 		OperatorMetricGroup operator = new OperatorMetricGroup(this.registry, this, name);
 
 		synchronized (this) {

http://git-wip-us.apache.org/repos/asf/flink/blob/1d2c6158/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java
index 183237a..267c91c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java
@@ -25,9 +25,11 @@ import org.apache.flink.metrics.Metric;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
 import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
+import org.apache.flink.runtime.metrics.scope.ScopeFormat;
 import org.apache.flink.runtime.metrics.util.DummyCharacterFilter;
 import org.apache.flink.util.AbstractID;
 import org.apache.flink.util.TestLogger;
+import org.junit.Assert;
 import org.junit.Test;
 
 import static org.junit.Assert.assertArrayEquals;
@@ -146,6 +148,23 @@ public class TaskMetricGroupTest extends TestLogger {
 		registry.shutdown();
 	}
 
+	@Test
+	public void testOperatorNameTruncation() {
+		Configuration cfg = new Configuration();
+		cfg.setString(MetricOptions.SCOPE_NAMING_OPERATOR, ScopeFormat.SCOPE_OPERATOR_NAME);
+		MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(cfg));
+		TaskManagerMetricGroup tm = new TaskManagerMetricGroup(registry, "host", "id");
+		TaskManagerJobMetricGroup job = new TaskManagerJobMetricGroup(registry, tm, new JobID(), "jobname");
+		TaskMetricGroup taskMetricGroup = new TaskMetricGroup(registry, job, new AbstractID(), new AbstractID(), "task", 0, 0);
+		
+		String originalName = new String(new char[100]).replace("\0", "-");
+		OperatorMetricGroup operatorMetricGroup = taskMetricGroup.addOperator(originalName);
+		
+		String storedName = operatorMetricGroup.getScopeComponents()[0];
+		Assert.assertEquals(TaskMetricGroup.METRICS_OPERATOR_NAME_MAX_LENGTH, storedName.length());
+		Assert.assertEquals(originalName.substring(0, TaskMetricGroup.METRICS_OPERATOR_NAME_MAX_LENGTH), storedName);
+	}
+
 	private static class CountingMetricRegistry extends MetricRegistry {
 
 		private int counter = 0;