You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gr...@apache.org on 2017/11/28 21:52:29 UTC

[1/2] flink git commit: [hotfix] [optimizer] Normalize job plan operator formatting

Repository: flink
Updated Branches:
  refs/heads/master 7f0c8344c -> 450b42414


[hotfix] [optimizer] Normalize job plan operator formatting

When printing the job plan the operator description is typically
formatted as the operator name followed by the user given or generated
function name in parenthesis. For example, "Reduce (My Function)".

This normalizes the node names to include a space between the operator
and function names.

This closes #4383


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/53f2c1c2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/53f2c1c2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/53f2c1c2

Branch: refs/heads/master
Commit: 53f2c1c25b7866bb494602f9e68c2b237b2cf2c4
Parents: 7f0c834
Author: Greg Hogan <co...@greghogan.com>
Authored: Fri Jul 21 13:47:13 2017 -0400
Committer: Greg Hogan <co...@greghogan.com>
Committed: Tue Nov 28 12:39:23 2017 -0500

----------------------------------------------------------------------
 .../main/java/org/apache/flink/optimizer/dag/SolutionSetNode.java  | 2 +-
 .../flink/optimizer/operators/AbstractSortMergeJoinDescriptor.java | 2 +-
 .../flink/optimizer/operators/CartesianProductDescriptor.java      | 2 +-
 .../optimizer/operators/GroupReduceWithCombineProperties.java      | 2 +-
 .../optimizer/operators/HashFullOuterJoinBuildFirstDescriptor.java | 2 +-
 .../operators/HashFullOuterJoinBuildSecondDescriptor.java          | 2 +-
 .../flink/optimizer/operators/HashJoinBuildFirstProperties.java    | 2 +-
 .../optimizer/operators/HashLeftOuterJoinBuildFirstDescriptor.java | 2 +-
 .../operators/HashLeftOuterJoinBuildSecondDescriptor.java          | 2 +-
 .../operators/HashRightOuterJoinBuildFirstDescriptor.java          | 2 +-
 .../operators/HashRightOuterJoinBuildSecondDescriptor.java         | 2 +-
 .../apache/flink/optimizer/operators/PartialGroupProperties.java   | 2 +-
 .../apache/flink/optimizer/plantranslate/JobGraphGenerator.java    | 2 +-
 13 files changed, 13 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/53f2c1c2/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/SolutionSetNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/SolutionSetNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/SolutionSetNode.java
index 9b53999..62cb4ca 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/SolutionSetNode.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/SolutionSetNode.java
@@ -46,7 +46,7 @@ public class SolutionSetNode extends AbstractPartialSolutionNode {
 	// --------------------------------------------------------------------------------------------
 	
 	public void setCandidateProperties(GlobalProperties gProps, LocalProperties lProps, Channel initialInput) {
-		this.cachedPlans = Collections.<PlanNode>singletonList(new SolutionSetPlanNode(this, "SolutionSet("+this.getOperator().getName()+")", gProps, lProps, initialInput));
+		this.cachedPlans = Collections.<PlanNode>singletonList(new SolutionSetPlanNode(this, "SolutionSet ("+this.getOperator().getName()+")", gProps, lProps, initialInput));
 	}
 	
 	public SolutionSetPlanNode getCurrentSolutionSetPlanNode() {

http://git-wip-us.apache.org/repos/asf/flink/blob/53f2c1c2/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/AbstractSortMergeJoinDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/AbstractSortMergeJoinDescriptor.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/AbstractSortMergeJoinDescriptor.java
index d54b5cf..4aced8b 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/AbstractSortMergeJoinDescriptor.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/AbstractSortMergeJoinDescriptor.java
@@ -67,7 +67,7 @@ public abstract class AbstractSortMergeJoinDescriptor extends AbstractJoinDescri
 			inputOrders = tmp;
 		}
 
-		String nodeName = String.format("%s(%s)", getNodeName(), node.getOperator().getName());
+		String nodeName = String.format("%s (%s)", getNodeName(), node.getOperator().getName());
 		return new DualInputPlanNode(node, nodeName, in1, in2, getStrategy(), this.keys1, this.keys2, inputOrders);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/53f2c1c2/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/CartesianProductDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/CartesianProductDescriptor.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/CartesianProductDescriptor.java
index f48e297..d50c9b4 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/CartesianProductDescriptor.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/CartesianProductDescriptor.java
@@ -93,7 +93,7 @@ public abstract class CartesianProductDescriptor extends OperatorDescriptorDual
 	
 	@Override
 	public DualInputPlanNode instantiate(Channel in1, Channel in2, TwoInputNode node) {
-		return new DualInputPlanNode(node, "Cross("+node.getOperator().getName()+")", in1, in2, getStrategy());
+		return new DualInputPlanNode(node, "Cross ("+node.getOperator().getName()+")", in1, in2, getStrategy());
 	}
 	
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/53f2c1c2/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java
index accd11b..5521be1 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java
@@ -116,7 +116,7 @@ public final class GroupReduceWithCombineProperties extends OperatorDescriptorSi
 			GroupReduceNode combinerNode = ((GroupReduceNode) node).getCombinerUtilityNode();
 			combinerNode.setParallelism(in.getSource().getParallelism());
 
-			SingleInputPlanNode combiner = new SingleInputPlanNode(combinerNode, "Combine("+node.getOperator()
+			SingleInputPlanNode combiner = new SingleInputPlanNode(combinerNode, "Combine ("+node.getOperator()
 					.getName()+")", toCombiner, DriverStrategy.SORTED_GROUP_COMBINE);
 			combiner.setCosts(new Costs(0, 0));
 			combiner.initProperties(toCombiner.getGlobalProperties(), toCombiner.getLocalProperties());

http://git-wip-us.apache.org/repos/asf/flink/blob/53f2c1c2/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashFullOuterJoinBuildFirstDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashFullOuterJoinBuildFirstDescriptor.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashFullOuterJoinBuildFirstDescriptor.java
index 4985248..5ad0c7e 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashFullOuterJoinBuildFirstDescriptor.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashFullOuterJoinBuildFirstDescriptor.java
@@ -54,7 +54,7 @@ public class HashFullOuterJoinBuildFirstDescriptor extends AbstractJoinDescripto
 	@Override
 	public DualInputPlanNode instantiate(Channel in1, Channel in2, TwoInputNode node) {
 
-		String nodeName = "FullOuterJoin("+node.getOperator().getName()+")";
+		String nodeName = "FullOuterJoin ("+node.getOperator().getName()+")";
 		return new DualInputPlanNode(node, nodeName, in1, in2, getStrategy(), this.keys1, this.keys2);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/53f2c1c2/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashFullOuterJoinBuildSecondDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashFullOuterJoinBuildSecondDescriptor.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashFullOuterJoinBuildSecondDescriptor.java
index d605a19..6468129 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashFullOuterJoinBuildSecondDescriptor.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashFullOuterJoinBuildSecondDescriptor.java
@@ -53,7 +53,7 @@ public class HashFullOuterJoinBuildSecondDescriptor extends AbstractJoinDescript
 	@Override
 	public DualInputPlanNode instantiate(Channel in1, Channel in2, TwoInputNode node) {
 
-		String nodeName = "FullOuterJoin("+node.getOperator().getName()+")";
+		String nodeName = "FullOuterJoin ("+node.getOperator().getName()+")";
 		return new DualInputPlanNode(node, nodeName, in1, in2, getStrategy(), this.keys1, this.keys2);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/53f2c1c2/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashJoinBuildFirstProperties.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashJoinBuildFirstProperties.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashJoinBuildFirstProperties.java
index fec72a9..fc74b39 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashJoinBuildFirstProperties.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashJoinBuildFirstProperties.java
@@ -79,7 +79,7 @@ public class HashJoinBuildFirstProperties extends AbstractJoinDescriptor {
 		else {
 			strategy = DriverStrategy.HYBRIDHASH_BUILD_FIRST;
 		}
-		return new DualInputPlanNode(node, "Join("+node.getOperator().getName()+")", in1, in2, strategy, this.keys1, this.keys2);
+		return new DualInputPlanNode(node, "Join ("+node.getOperator().getName()+")", in1, in2, strategy, this.keys1, this.keys2);
 	}
 	
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/53f2c1c2/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashLeftOuterJoinBuildFirstDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashLeftOuterJoinBuildFirstDescriptor.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashLeftOuterJoinBuildFirstDescriptor.java
index ab4e106..f62a85d 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashLeftOuterJoinBuildFirstDescriptor.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashLeftOuterJoinBuildFirstDescriptor.java
@@ -54,7 +54,7 @@ public class HashLeftOuterJoinBuildFirstDescriptor extends AbstractJoinDescripto
 	@Override
 	public DualInputPlanNode instantiate(Channel in1, Channel in2, TwoInputNode node) {
 
-		String nodeName = "LeftOuterJoin("+node.getOperator().getName()+")";
+		String nodeName = "LeftOuterJoin ("+node.getOperator().getName()+")";
 		return new DualInputPlanNode(node, nodeName, in1, in2, getStrategy(), this.keys1, this.keys2);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/53f2c1c2/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashLeftOuterJoinBuildSecondDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashLeftOuterJoinBuildSecondDescriptor.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashLeftOuterJoinBuildSecondDescriptor.java
index 8ed7969..85cf8f2 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashLeftOuterJoinBuildSecondDescriptor.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashLeftOuterJoinBuildSecondDescriptor.java
@@ -57,7 +57,7 @@ public class HashLeftOuterJoinBuildSecondDescriptor extends AbstractJoinDescript
 	@Override
 	public DualInputPlanNode instantiate(Channel in1, Channel in2, TwoInputNode node) {
 
-		String nodeName = "LeftOuterJoin("+node.getOperator().getName()+")";
+		String nodeName = "LeftOuterJoin ("+node.getOperator().getName()+")";
 		return new DualInputPlanNode(node, nodeName, in1, in2, getStrategy(), this.keys1, this.keys2);
 	}
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/53f2c1c2/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashRightOuterJoinBuildFirstDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashRightOuterJoinBuildFirstDescriptor.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashRightOuterJoinBuildFirstDescriptor.java
index 5ddba1c..ca19902 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashRightOuterJoinBuildFirstDescriptor.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashRightOuterJoinBuildFirstDescriptor.java
@@ -57,7 +57,7 @@ public class HashRightOuterJoinBuildFirstDescriptor extends AbstractJoinDescript
 	@Override
 	public DualInputPlanNode instantiate(Channel in1, Channel in2, TwoInputNode node) {
 
-		String nodeName = "RightOuterJoin("+node.getOperator().getName()+")";
+		String nodeName = "RightOuterJoin ("+node.getOperator().getName()+")";
 		return new DualInputPlanNode(node, nodeName, in1, in2, getStrategy(), this.keys1, this.keys2);
 	}
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/53f2c1c2/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashRightOuterJoinBuildSecondDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashRightOuterJoinBuildSecondDescriptor.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashRightOuterJoinBuildSecondDescriptor.java
index 7bb8f1e..7e476cd 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashRightOuterJoinBuildSecondDescriptor.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashRightOuterJoinBuildSecondDescriptor.java
@@ -54,7 +54,7 @@ public class HashRightOuterJoinBuildSecondDescriptor extends AbstractJoinDescrip
 	@Override
 	public DualInputPlanNode instantiate(Channel in1, Channel in2, TwoInputNode node) {
 
-		String nodeName = "RightOuterJoin("+node.getOperator().getName()+")";
+		String nodeName = "RightOuterJoin ("+node.getOperator().getName()+")";
 		return new DualInputPlanNode(node, nodeName, in1, in2, getStrategy(), this.keys1, this.keys2);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/53f2c1c2/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/PartialGroupProperties.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/PartialGroupProperties.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/PartialGroupProperties.java
index e4e6a7f..bb06171 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/PartialGroupProperties.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/PartialGroupProperties.java
@@ -51,7 +51,7 @@ public final class PartialGroupProperties extends OperatorDescriptorSingle {
 		GroupReduceNode combinerNode = new GroupReduceNode((GroupReduceOperatorBase<?, ?, ?>) node.getOperator());
 		combinerNode.setParallelism(in.getSource().getParallelism());
 
-		SingleInputPlanNode combiner = new SingleInputPlanNode(combinerNode, "Combine("+node.getOperator().getName()+")", in,
+		SingleInputPlanNode combiner = new SingleInputPlanNode(combinerNode, "Combine ("+node.getOperator().getName()+")", in,
 				DriverStrategy.SORTED_GROUP_COMBINE);
 		// sorting key info
 		combiner.setDriverKeyInfo(in.getLocalStrategyKeys(), in.getLocalStrategySortOrder(), 0);

http://git-wip-us.apache.org/repos/asf/flink/blob/53f2c1c2/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
index e3dcfad..248049e 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
@@ -1283,7 +1283,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
 		headConfig.setRelativeBackChannelMemory(relativeMemForBackChannel);
 		
 		// --------------------------- create the sync task ---------------------------
-		final JobVertex sync = new JobVertex("Sync(" + bulkNode.getNodeName() + ")");
+		final JobVertex sync = new JobVertex("Sync (" + bulkNode.getNodeName() + ")");
 		sync.setResources(bulkNode.getMinResources(), bulkNode.getPreferredResources());
 		sync.setInvokableClass(IterationSynchronizationSinkTask.class);
 		sync.setParallelism(1);


[2/2] flink git commit: [FLINK-6864] [core] Fix confusing "invalid POJO type" messages from TypeExtractor

Posted by gr...@apache.org.
[FLINK-6864] [core] Fix confusing "invalid POJO type" messages from TypeExtractor

This closes #4574


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/450b4241
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/450b4241
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/450b4241

Branch: refs/heads/master
Commit: 450b4241404055ed6638e354be421b83380827c5
Parents: 53f2c1c
Author: zjureel <zj...@gmail.com>
Authored: Wed Aug 23 10:30:31 2017 +0800
Committer: Greg Hogan <co...@greghogan.com>
Committed: Tue Nov 28 15:59:51 2017 -0500

----------------------------------------------------------------------
 docs/dev/types_serialization.md                 |  3 +++
 .../flink/api/java/typeutils/TypeExtractor.java | 24 +++++++++++++++-----
 2 files changed, 21 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/450b4241/docs/dev/types_serialization.md
----------------------------------------------------------------------
diff --git a/docs/dev/types_serialization.md b/docs/dev/types_serialization.md
index 1f0c466..d865d8e 100644
--- a/docs/dev/types_serialization.md
+++ b/docs/dev/types_serialization.md
@@ -115,6 +115,9 @@ conditions are fulfilled:
   or have a public getter- and a setter- method that follows the Java beans
   naming conventions for getters and setters.
 
+Note that when a user-defined data type can't be recognized as a POJO type, it must be processed as GenericType and
+serialized with Kryo.
+
 
 #### Creating a TypeInformation or TypeSerializer
 

http://git-wip-us.apache.org/repos/asf/flink/blob/450b4241/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
index 8ea2e1a..4767838 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
@@ -1885,7 +1885,9 @@ public class TypeExtractor {
 			ParameterizedType parameterizedType, TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) {
 
 		if (!Modifier.isPublic(clazz.getModifiers())) {
-			LOG.info("Class " + clazz.getName() + " is not public, cannot treat it as a POJO type. Will be handled as GenericType");
+			LOG.info("Class " + clazz.getName() + " is not public so it cannot be used as a POJO type " +
+				"and must be processed as GenericType. Please read the Flink documentation " +
+				"on \"Data Types & Serialization\" for details of the effect on performance.");
 			return new GenericTypeInfo<OUT>(clazz);
 		}
 
@@ -1900,7 +1902,9 @@ public class TypeExtractor {
 
 		List<Field> fields = getAllDeclaredFields(clazz, false);
 		if (fields.size() == 0) {
-			LOG.info("No fields detected for " + clazz + ". Cannot be used as a PojoType. Will be handled as GenericType");
+			LOG.info("No fields were detected for " + clazz + " so it cannot be used as a POJO type " +
+				"and must be processed as GenericType. Please read the Flink documentation " +
+				"on \"Data Types & Serialization\" for details of the effect on performance.");
 			return new GenericTypeInfo<OUT>(clazz);
 		}
 
@@ -1908,7 +1912,9 @@ public class TypeExtractor {
 		for (Field field : fields) {
 			Type fieldType = field.getGenericType();
 			if(!isValidPojoField(field, clazz, typeHierarchy)) {
-				LOG.info(clazz + " is not a valid POJO type because not all fields are valid POJO fields.");
+				LOG.info("Class " + clazz + " cannot be used as a POJO type because not all fields are valid POJO fields, " +
+					"and must be processed as GenericType. Please read the Flink documentation " +
+					"on \"Data Types & Serialization\" for details of the effect on performance.");
 				return null;
 			}
 			try {
@@ -1934,7 +1940,9 @@ public class TypeExtractor {
 		List<Method> methods = getAllDeclaredMethods(clazz);
 		for (Method method : methods) {
 			if (method.getName().equals("readObject") || method.getName().equals("writeObject")) {
-				LOG.info(clazz+" contains custom serialization methods we do not call.");
+				LOG.info("Class " + clazz + " contains custom serialization methods we do not call, so it cannot be used as a POJO type " +
+					"and must be processed as GenericType. Please read the Flink documentation " +
+					"on \"Data Types & Serialization\" for details of the effect on performance.");
 				return null;
 			}
 		}
@@ -1949,12 +1957,16 @@ public class TypeExtractor {
 				LOG.info(clazz + " is abstract or an interface, having a concrete " +
 						"type can increase performance.");
 			} else {
-				LOG.info(clazz + " must have a default constructor to be used as a POJO.");
+				LOG.info(clazz + " is missing a default constructor so it cannot be used as a POJO type " +
+					"and must be processed as GenericType. Please read the Flink documentation " +
+					"on \"Data Types & Serialization\" for details of the effect on performance.");
 				return null;
 			}
 		}
 		if(defaultConstructor != null && !Modifier.isPublic(defaultConstructor.getModifiers())) {
-			LOG.info("The default constructor of " + clazz + " should be Public to be used as a POJO.");
+			LOG.info("The default constructor of " + clazz + " is not Public so it cannot be used as a POJO type " +
+				"and must be processed as GenericType. Please read the Flink documentation " +
+				"on \"Data Types & Serialization\" for details of the effect on performance.");
 			return null;
 		}