You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/09/26 16:04:47 UTC

[5/9] flink git commit: [FLINK-7647] [flip6] Port JobManagerConfigHandler to new REST endpoint

[FLINK-7647] [flip6] Port JobManagerConfigHandler to new REST endpoint

This commit lets the JobManagerConfigHandler implement the
LegacyRestHandler interface in order to be ported to the new REST
endpoint. This includes the introduction of ClusterConfiguration
response body and ClusterConfigurationHeaders.

The DispatcherRestEndpoint now also registers the
JobManagerConfigHandler.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/97ff043f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/97ff043f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/97ff043f

Branch: refs/heads/master
Commit: 97ff043fda06147bfd6441495d893fce7baf882e
Parents: 6c97230
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Wed Sep 20 18:25:28 2017 +0200
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Tue Sep 26 18:04:07 2017 +0200

----------------------------------------------------------------------
 .../dispatcher/DispatcherRestEndpoint.java      | 21 ++++-
 .../entrypoint/SessionClusterEntrypoint.java    |  1 +
 .../handler/legacy/JobManagerConfigHandler.java | 97 ++++++++++++--------
 .../messages/ClusterConfigurationInfo.java      | 59 ++++++++++++
 .../messages/ClusterConfigurationInfoEntry.java | 81 ++++++++++++++++
 .../ClusterConfigurationInfoHeaders.java        | 71 ++++++++++++++
 .../legacy/JobManagerConfigHandlerTest.java     |  3 +-
 .../messages/ClusterConfigurationInfoTest.java  | 53 +++++++++++
 8 files changed, 345 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/97ff043f/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
index dff5df8..3e8a4cb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.dispatcher;
 
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
 import org.apache.flink.runtime.rest.RestServerEndpoint;
 import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
@@ -29,10 +30,13 @@ import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
 import org.apache.flink.runtime.rest.handler.legacy.ClusterOverviewHandler;
 import org.apache.flink.runtime.rest.handler.legacy.CurrentJobsOverviewHandler;
 import org.apache.flink.runtime.rest.handler.legacy.DashboardConfigHandler;
+import org.apache.flink.runtime.rest.handler.legacy.JobManagerConfigHandler;
 import org.apache.flink.runtime.rest.handler.legacy.files.StaticFileServerHandler;
 import org.apache.flink.runtime.rest.handler.legacy.files.WebContentHandlerSpecification;
+import org.apache.flink.runtime.rest.handler.legacy.messages.ClusterConfigurationInfo;
 import org.apache.flink.runtime.rest.handler.legacy.messages.DashboardConfiguration;
 import org.apache.flink.runtime.rest.handler.legacy.messages.StatusOverviewWithVersion;
+import org.apache.flink.runtime.rest.messages.ClusterConfigurationInfoHeaders;
 import org.apache.flink.runtime.rest.messages.ClusterOverviewHeaders;
 import org.apache.flink.runtime.rest.messages.CurrentJobsOverviewHandlerHeaders;
 import org.apache.flink.runtime.rest.messages.DashboardConfigurationHeaders;
@@ -58,16 +62,19 @@ import java.util.concurrent.Executor;
 public class DispatcherRestEndpoint extends RestServerEndpoint {
 
 	private final GatewayRetriever<DispatcherGateway> leaderRetriever;
+	private final Configuration clusterConfiguration;
 	private final RestHandlerConfiguration restConfiguration;
 	private final Executor executor;
 
 	public DispatcherRestEndpoint(
-			RestServerEndpointConfiguration configuration,
+			RestServerEndpointConfiguration endpointConfiguration,
 			GatewayRetriever<DispatcherGateway> leaderRetriever,
+			Configuration clusterConfiguration,
 			RestHandlerConfiguration restConfiguration,
 			Executor executor) {
-		super(configuration);
+		super(endpointConfiguration);
 		this.leaderRetriever = Preconditions.checkNotNull(leaderRetriever);
+		this.clusterConfiguration = Preconditions.checkNotNull(clusterConfiguration);
 		this.restConfiguration = Preconditions.checkNotNull(restConfiguration);
 		this.executor = Preconditions.checkNotNull(executor);
 	}
@@ -107,6 +114,15 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
 				true,
 				true));
 
+		LegacyRestHandlerAdapter<DispatcherGateway, ClusterConfigurationInfo, EmptyMessageParameters> clusterConfigurationHandler = new LegacyRestHandlerAdapter<>(
+			restAddressFuture,
+			leaderRetriever,
+			timeout,
+			ClusterConfigurationInfoHeaders.getInstance(),
+			new JobManagerConfigHandler(
+				executor,
+				clusterConfiguration));
+
 		final File tmpDir = restConfiguration.getTmpDir();
 
 		Optional<StaticFileServerHandler<DispatcherGateway>> optWebContent;
@@ -123,6 +139,7 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
 		}
 
 		handlers.add(Tuple2.of(ClusterOverviewHeaders.getInstance(), clusterOverviewHandler));
+		handlers.add(Tuple2.of(ClusterConfigurationInfoHeaders.getInstance(), clusterConfigurationHandler));
 		handlers.add(Tuple2.of(DashboardConfigurationHeaders.getInstance(), dashboardConfigurationHandler));
 		handlers.add(Tuple2.of(CurrentJobsOverviewHandlerHeaders.getInstance(), currentJobsOverviewHandler));
 

http://git-wip-us.apache.org/repos/asf/flink/blob/97ff043f/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
index c4da6db..be34b0c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
@@ -159,6 +159,7 @@ public abstract class SessionClusterEntrypoint extends ClusterEntrypoint {
 		return new DispatcherRestEndpoint(
 			RestServerEndpointConfiguration.fromConfiguration(configuration),
 			dispatcherGatewayRetriever,
+			configuration,
 			RestHandlerConfiguration.fromConfiguration(configuration),
 			executor);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/97ff043f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobManagerConfigHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobManagerConfigHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobManagerConfigHandler.java
index d638467..6c0c158 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobManagerConfigHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobManagerConfigHandler.java
@@ -19,8 +19,17 @@
 package org.apache.flink.runtime.rest.handler.legacy;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.LegacyRestHandler;
+import org.apache.flink.runtime.rest.handler.legacy.messages.ClusterConfigurationInfo;
+import org.apache.flink.runtime.rest.handler.legacy.messages.ClusterConfigurationInfoEntry;
+import org.apache.flink.runtime.rest.messages.ClusterConfigurationInfoHeaders;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
 
 import com.fasterxml.jackson.core.JsonGenerator;
 
@@ -34,55 +43,67 @@ import java.util.concurrent.Executor;
 /**
  * Returns the Job Manager's configuration.
  */
-public class JobManagerConfigHandler extends AbstractJsonRequestHandler {
+public class JobManagerConfigHandler extends AbstractJsonRequestHandler
+		implements LegacyRestHandler<DispatcherGateway, ClusterConfigurationInfo, EmptyMessageParameters> {
 
-	private static final String JOBMANAGER_CONFIG_REST_PATH = "/jobmanager/config";
-
-	private final Configuration config;
+	private final ClusterConfigurationInfo clusterConfig;
+	private final String clusterConfigJson;
 
 	public JobManagerConfigHandler(Executor executor, Configuration config) {
 		super(executor);
-		this.config = config;
+
+		Preconditions.checkNotNull(config);
+		this.clusterConfig = ClusterConfigurationInfo.from(config);
+		this.clusterConfigJson = createConfigJson(config);
 	}
 
 	@Override
 	public String[] getPaths() {
-		return new String[]{JOBMANAGER_CONFIG_REST_PATH};
+		return new String[]{ClusterConfigurationInfoHeaders.CLUSTER_CONFIG_REST_PATH};
 	}
 
 	@Override
-	public CompletableFuture<String> handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) {
-		return CompletableFuture.supplyAsync(
-			() -> {
-				try {
-					StringWriter writer = new StringWriter();
-					JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
-
-					gen.writeStartArray();
-					for (String key : config.keySet()) {
-						gen.writeStartObject();
-						gen.writeStringField("key", key);
-
-						// Mask key values which contain sensitive information
-						if (key.toLowerCase().contains("password")) {
-							String value = config.getString(key, null);
-							if (value != null) {
-								value = "******";
-							}
-							gen.writeStringField("value", value);
-						} else {
-							gen.writeStringField("value", config.getString(key, null));
-						}
-						gen.writeEndObject();
-					}
-					gen.writeEndArray();
-
-					gen.close();
-					return writer.toString();
-				} catch (IOException e) {
-					throw new CompletionException(new FlinkException("Could not write configuration.", e));
+	public CompletableFuture<ClusterConfigurationInfo> handleRequest(
+			HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request,
+			DispatcherGateway gateway) {
+
+		return CompletableFuture.completedFuture(clusterConfig);
+	}
+
+	@Override
+	public CompletableFuture<String> handleJsonRequest(
+			Map<String, String> pathParams,
+			Map<String, String> queryParams,
+			JobManagerGateway jobManagerGateway) {
+
+		return CompletableFuture.completedFuture(clusterConfigJson);
+	}
+
+	private static String createConfigJson(Configuration config) {
+		try {
+			StringWriter writer = new StringWriter();
+			JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
+
+			gen.writeStartArray();
+			for (String key : config.keySet()) {
+				gen.writeStartObject();
+				gen.writeStringField(ClusterConfigurationInfoEntry.FIELD_NAME_CONFIG_KEY, key);
+
+				String value = config.getString(key, null);
+				// Mask key values which contain sensitive information
+				if (value != null && key.toLowerCase().contains("password")) {
+					value = "******";
 				}
-			},
-			executor);
+				gen.writeStringField(ClusterConfigurationInfoEntry.FIELD_NAME_CONFIG_VALUE, value);
+
+				gen.writeEndObject();
+			}
+			gen.writeEndArray();
+
+			gen.close();
+			return writer.toString();
+		} catch (IOException e) {
+			throw new CompletionException(new FlinkException("Could not write configuration.", e));
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/97ff043f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/ClusterConfigurationInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/ClusterConfigurationInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/ClusterConfigurationInfo.java
new file mode 100644
index 0000000..8b87e63
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/ClusterConfigurationInfo.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.messages;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.rest.handler.legacy.JobManagerConfigHandler;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+
+import java.util.ArrayList;
+
+/**
+ * Response of the {@link JobManagerConfigHandler}, respresented as a list
+ * of key-value pairs of the cluster {@link Configuration}.
+ */
+public class ClusterConfigurationInfo extends ArrayList<ClusterConfigurationInfoEntry> implements ResponseBody {
+
+	private static final long serialVersionUID = -1170348873871206964L;
+
+	// a default constructor is required for collection type marshalling
+	public ClusterConfigurationInfo() {}
+
+	public ClusterConfigurationInfo(int initialEntries) {
+		super(initialEntries);
+	}
+
+	public static ClusterConfigurationInfo from(Configuration config) {
+		ClusterConfigurationInfo clusterConfig = new ClusterConfigurationInfo(config.keySet().size());
+
+		for (String key : config.keySet()) {
+			String value = config.getString(key, null);
+
+			// Mask key values which contain sensitive information
+			if (value != null && key.toLowerCase().contains("password")) {
+				value = "******";
+			}
+
+			clusterConfig.add(new ClusterConfigurationInfoEntry(key, value));
+		}
+
+		return clusterConfig;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/97ff043f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/ClusterConfigurationInfoEntry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/ClusterConfigurationInfoEntry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/ClusterConfigurationInfoEntry.java
new file mode 100644
index 0000000..19e3821
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/ClusterConfigurationInfoEntry.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.messages;
+
+import org.apache.flink.util.Preconditions;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Objects;
+
+/**
+ * A single key-value pair entry in the {@link ClusterConfigurationInfo} response.
+ */
+public class ClusterConfigurationInfoEntry {
+
+	public static final String FIELD_NAME_CONFIG_KEY = "key";
+	public static final String FIELD_NAME_CONFIG_VALUE = "value";
+
+	@JsonProperty(FIELD_NAME_CONFIG_KEY)
+	private final String key;
+
+	@JsonProperty(FIELD_NAME_CONFIG_VALUE)
+	private final String value;
+
+	@JsonCreator
+	public ClusterConfigurationInfoEntry(
+			@JsonProperty(FIELD_NAME_CONFIG_KEY) String key,
+			@JsonProperty(FIELD_NAME_CONFIG_VALUE) String value) {
+		this.key = Preconditions.checkNotNull(key);
+		this.value = Preconditions.checkNotNull(value);
+	}
+
+	public String getKey() {
+		return key;
+	}
+
+	public String getValue() {
+		return value;
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+
+		ClusterConfigurationInfoEntry that = (ClusterConfigurationInfoEntry) o;
+		return key.equals(that.key) && value.equals(that.value);
+	}
+
+	@Override
+	public int hashCode() {
+		return Objects.hash(key, value);
+	}
+
+	@Override
+	public String toString() {
+		return "(" + key + "," + value + ")";
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/97ff043f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ClusterConfigurationInfoHeaders.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ClusterConfigurationInfoHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ClusterConfigurationInfoHeaders.java
new file mode 100644
index 0000000..764e497
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ClusterConfigurationInfoHeaders.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.handler.legacy.JobManagerConfigHandler;
+import org.apache.flink.runtime.rest.handler.legacy.messages.ClusterConfigurationInfo;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/**
+ * Message headers for the {@link JobManagerConfigHandler}.
+ */
+public final class ClusterConfigurationInfoHeaders implements MessageHeaders<EmptyRequestBody, ClusterConfigurationInfo, EmptyMessageParameters> {
+
+	private static final ClusterConfigurationInfoHeaders INSTANCE = new ClusterConfigurationInfoHeaders();
+
+	public static final String CLUSTER_CONFIG_REST_PATH = "/jobmanager/config";
+
+	private ClusterConfigurationInfoHeaders() {}
+
+	@Override
+	public Class<EmptyRequestBody> getRequestClass() {
+		return EmptyRequestBody.class;
+	}
+
+	@Override
+	public HttpMethodWrapper getHttpMethod() {
+		return HttpMethodWrapper.GET;
+	}
+
+	@Override
+	public String getTargetRestEndpointURL() {
+		return CLUSTER_CONFIG_REST_PATH;
+	}
+
+	@Override
+	public Class<ClusterConfigurationInfo> getResponseClass() {
+		return ClusterConfigurationInfo.class;
+	}
+
+	@Override
+	public HttpResponseStatus getResponseStatusCode() {
+		return HttpResponseStatus.OK;
+	}
+
+	@Override
+	public EmptyMessageParameters getUnresolvedMessageParameters() {
+		return EmptyMessageParameters.getInstance();
+	}
+
+	public static ClusterConfigurationInfoHeaders getInstance() {
+		return INSTANCE;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/97ff043f/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobManagerConfigHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobManagerConfigHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobManagerConfigHandlerTest.java
index 03ddb73..f21b1ed 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobManagerConfigHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobManagerConfigHandlerTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.rest.handler.legacy;
 
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.concurrent.Executors;
 
 import org.junit.Assert;
@@ -29,7 +30,7 @@ import org.junit.Test;
 public class JobManagerConfigHandlerTest {
 	@Test
 	public void testGetPaths() {
-		JobManagerConfigHandler handler = new JobManagerConfigHandler(Executors.directExecutor(), null);
+		JobManagerConfigHandler handler = new JobManagerConfigHandler(Executors.directExecutor(), new Configuration());
 		String[] paths = handler.getPaths();
 		Assert.assertEquals(1, paths.length);
 		Assert.assertEquals("/jobmanager/config", paths[0]);

http://git-wip-us.apache.org/repos/asf/flink/blob/97ff043f/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/ClusterConfigurationInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/ClusterConfigurationInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/ClusterConfigurationInfoTest.java
new file mode 100644
index 0000000..1d7d9a4
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/ClusterConfigurationInfoTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.messages;
+
+import org.apache.flink.runtime.rest.util.RestMapperUtils;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for the {@link ClusterConfigurationInfo}.
+ */
+public class ClusterConfigurationInfoTest {
+
+	/**
+	 * Tests that we can marshal and unmarshal {@link ClusterConfigurationInfo} objects.
+	 */
+	@Test
+	public void testJsonMarshalling() throws JsonProcessingException {
+		final ClusterConfigurationInfo expected = new ClusterConfigurationInfo(2);
+		expected.add(new ClusterConfigurationInfoEntry("key1", "value1"));
+		expected.add(new ClusterConfigurationInfoEntry("key2", "value2"));
+
+		final ObjectMapper objectMapper = RestMapperUtils.getStrictObjectMapper();
+
+		JsonNode marshaled = objectMapper.valueToTree(expected);
+
+		final ClusterConfigurationInfo unmarshaled = objectMapper.treeToValue(marshaled, ClusterConfigurationInfo.class);
+
+		assertEquals(expected, unmarshaled);
+	}
+}