You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2015/11/20 16:19:46 UTC

[1/2] flink git commit: [FLINK-3052] [optimizer] Fix instantiation of bulk iteration candidates

Repository: flink
Updated Branches:
  refs/heads/release-0.10 2ff94205d -> 968d8b496


[FLINK-3052] [optimizer] Fix instantiation of bulk iteration candidates

When a candidate for a bulk iteration is instantiated, then the optimizer creates candidates
for the step function. It is then checked that there exists a candidate solution for the step
function whose properties met the properties of the input to the bulk iteration. Sometimes
it is necessary to add a no-op plan node to the end of the step function to generate the
correct properties. These new candidates have to be added to the final set of the accepted
candidates.

This commit adds that these new candidates are properly added to the set of accepted candidates.

Fix test and add new iteration tests

Add predecessor operator and dynamic path information to no op operator in bulk iterations

This closes #1388.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/dd5dede2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/dd5dede2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/dd5dede2

Branch: refs/heads/release-0.10
Commit: dd5dede2a1999ac5378eb12d5fd63f1a635590c1
Parents: 2ff9420
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu Nov 19 14:24:15 2015 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri Nov 20 16:19:20 2015 +0100

----------------------------------------------------------------------
 .../flink/optimizer/dag/BulkIterationNode.java  |  14 +-
 .../flink/optimizer/dag/UnaryOperatorNode.java  |  14 +-
 .../optimizer/dag/WorksetIterationNode.java     |  15 +-
 .../flink/optimizer/util/NoOpUnaryUdfOp.java    |   2 +-
 .../flink/optimizer/IterationsCompilerTest.java | 144 +++++++++++++++++--
 5 files changed, 166 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/dd5dede2/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BulkIterationNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BulkIterationNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BulkIterationNode.java
index 3d95c22..556e2e3 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BulkIterationNode.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BulkIterationNode.java
@@ -28,7 +28,6 @@ import org.apache.flink.api.common.ExecutionMode;
 import org.apache.flink.api.common.operators.SemanticProperties;
 import org.apache.flink.api.common.operators.SemanticProperties.EmptySemanticProperties;
 import org.apache.flink.api.common.operators.base.BulkIterationBase;
-import org.apache.flink.api.common.operators.util.FieldList;
 import org.apache.flink.optimizer.CompilerException;
 import org.apache.flink.optimizer.DataStatistics;
 import org.apache.flink.optimizer.traversals.InterestingPropertyVisitor;
@@ -48,6 +47,7 @@ import org.apache.flink.optimizer.plan.NamedChannel;
 import org.apache.flink.optimizer.plan.PlanNode;
 import org.apache.flink.optimizer.plan.SingleInputPlanNode;
 import org.apache.flink.optimizer.plan.PlanNode.FeedbackPropertiesMeetRequirementsReport;
+import org.apache.flink.optimizer.util.NoOpUnaryUdfOp;
 import org.apache.flink.runtime.operators.DriverStrategy;
 import org.apache.flink.util.Visitor;
 
@@ -273,7 +273,7 @@ public class BulkIterationNode extends SingleInputNode implements IterationNode
 		this.openBranches = (result == null || result.isEmpty()) ? Collections.<UnclosedBranchDescriptor>emptyList() : result;
 	}
 
-
+	@SuppressWarnings("unchecked")
 	@Override
 	protected void instantiateCandidate(OperatorDescriptorSingle dps, Channel in, List<Set<? extends NamedChannel>> broadcastPlanChannels, 
 			List<PlanNode> target, CostEstimator estimator, RequestedGlobalProperties globPropsReq, RequestedLocalProperties locPropsReq)
@@ -321,8 +321,10 @@ public class BulkIterationNode extends SingleInputNode implements IterationNode
 					Channel toNoOp = new Channel(candidate);
 					globPropsReq.parameterizeChannel(toNoOp, false, rootConnection.getDataExchangeMode(), false);
 					locPropsReq.parameterizeChannel(toNoOp);
-					
-					UnaryOperatorNode rebuildPropertiesNode = new UnaryOperatorNode("Rebuild Partial Solution Properties", FieldList.EMPTY_LIST);
+
+					NoOpUnaryUdfOp noOpUnaryUdfOp = new NoOpUnaryUdfOp<>();
+					noOpUnaryUdfOp.setInput(candidate.getProgramOperator());
+					UnaryOperatorNode rebuildPropertiesNode = new UnaryOperatorNode("Rebuild Partial Solution Properties", noOpUnaryUdfOp, true);
 					rebuildPropertiesNode.setParallelism(candidate.getParallelism());
 					
 					SingleInputPlanNode rebuildPropertiesPlanNode = new SingleInputPlanNode(rebuildPropertiesNode, "Rebuild Partial Solution Properties", toNoOp, DriverStrategy.UNARY_NO_OP);
@@ -343,8 +345,10 @@ public class BulkIterationNode extends SingleInputNode implements IterationNode
 					planDeleter.remove();
 				}
 			}
+
+			candidates.addAll(newCandidates);
 		}
-		
+
 		if (candidates.isEmpty()) {
 			return;
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/dd5dede2/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/UnaryOperatorNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/UnaryOperatorNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/UnaryOperatorNode.java
index 0c48033..0ec0264 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/UnaryOperatorNode.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/UnaryOperatorNode.java
@@ -18,10 +18,12 @@
 
 package org.apache.flink.optimizer.dag;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
 import org.apache.flink.api.common.operators.SemanticProperties;
+import org.apache.flink.api.common.operators.SingleInputOperator;
 import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
 import org.apache.flink.api.common.operators.util.FieldSet;
 import org.apache.flink.optimizer.DataStatistics;
@@ -30,11 +32,17 @@ import org.apache.flink.optimizer.operators.OperatorDescriptorSingle;
 
 public class UnaryOperatorNode extends SingleInputNode {
 	
-	private final List<OperatorDescriptorSingle> operator;
+	private final List<OperatorDescriptorSingle> operators;
 	
 	private final String name;
 
+	public UnaryOperatorNode(String name, SingleInputOperator<?, ?, ?> operator, boolean onDynamicPath) {
+		super(operator);
 
+		this.name = name;
+		this.operators = new ArrayList<>();
+		this.onDynamicPath = onDynamicPath;
+	}
 	
 	public UnaryOperatorNode(String name, FieldSet keys, OperatorDescriptorSingle ... operators) {
 		this(name, keys, Arrays.asList(operators));
@@ -43,13 +51,13 @@ public class UnaryOperatorNode extends SingleInputNode {
 	public UnaryOperatorNode(String name, FieldSet keys, List<OperatorDescriptorSingle> operators) {
 		super(keys);
 		
-		this.operator = operators;
+		this.operators = operators;
 		this.name = name;
 	}
 
 	@Override
 	protected List<OperatorDescriptorSingle> getPossibleProperties() {
-		return this.operator;
+		return this.operators;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/dd5dede2/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/WorksetIterationNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/WorksetIterationNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/WorksetIterationNode.java
index 15b9a50..7969a94 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/WorksetIterationNode.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/WorksetIterationNode.java
@@ -52,6 +52,7 @@ import org.apache.flink.optimizer.plan.WorksetIterationPlanNode;
 import org.apache.flink.optimizer.plan.WorksetPlanNode;
 import org.apache.flink.optimizer.plan.PlanNode.FeedbackPropertiesMeetRequirementsReport;
 import org.apache.flink.optimizer.util.NoOpBinaryUdfOp;
+import org.apache.flink.optimizer.util.NoOpUnaryUdfOp;
 import org.apache.flink.runtime.operators.DriverStrategy;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.apache.flink.runtime.operators.util.LocalStrategy;
@@ -307,7 +308,8 @@ public class WorksetIterationNode extends TwoInputNode implements IterationNode
 		this.nextWorkset.accept(InterestingPropertiesClearer.INSTANCE);
 		this.solutionSetDelta.accept(InterestingPropertiesClearer.INSTANCE);
 	}
-	
+
+	@SuppressWarnings("unchecked")
 	@Override
 	protected void instantiate(OperatorDescriptorDual operator, Channel solutionSetIn, Channel worksetIn,
 			List<Set<? extends NamedChannel>> broadcastPlanChannels, List<PlanNode> target, CostEstimator estimator,
@@ -367,9 +369,14 @@ public class WorksetIterationNode extends TwoInputNode implements IterationNode
 					globPropsReqWorkset.parameterizeChannel(toNoOp, false,
 															nextWorksetRootConnection.getDataExchangeMode(), false);
 					locPropsReqWorkset.parameterizeChannel(toNoOp);
-					
-					UnaryOperatorNode rebuildWorksetPropertiesNode = new UnaryOperatorNode("Rebuild Workset Properties",
-																							FieldList.EMPTY_LIST);
+
+					NoOpUnaryUdfOp noOpUnaryUdfOp = new NoOpUnaryUdfOp<>();
+					noOpUnaryUdfOp.setInput(candidate.getProgramOperator());
+
+					UnaryOperatorNode rebuildWorksetPropertiesNode = new UnaryOperatorNode(
+						"Rebuild Workset Properties",
+						noOpUnaryUdfOp,
+						true);
 					
 					rebuildWorksetPropertiesNode.setParallelism(candidate.getParallelism());
 					

http://git-wip-us.apache.org/repos/asf/flink/blob/dd5dede2/flink-optimizer/src/main/java/org/apache/flink/optimizer/util/NoOpUnaryUdfOp.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/util/NoOpUnaryUdfOp.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/util/NoOpUnaryUdfOp.java
index cc4a4d6..8537b9c 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/util/NoOpUnaryUdfOp.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/util/NoOpUnaryUdfOp.java
@@ -36,7 +36,7 @@ public class NoOpUnaryUdfOp<OUT> extends SingleInputOperator<OUT, OUT, NoOpFunct
 	@SuppressWarnings("rawtypes")
 	public static final NoOpUnaryUdfOp INSTANCE = new NoOpUnaryUdfOp();
 	
-	private NoOpUnaryUdfOp() {
+	public NoOpUnaryUdfOp() {
 		// pass null here because we override getOutputType to return type
 		// of input operator
 		super(new UserCodeClassWrapper<NoOpFunction>(NoOpFunction.class), null, "");

http://git-wip-us.apache.org/repos/asf/flink/blob/dd5dede2/flink-optimizer/src/test/java/org/apache/flink/optimizer/IterationsCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/IterationsCompilerTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/IterationsCompilerTest.java
index bd5eb56..76337e9 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/IterationsCompilerTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/IterationsCompilerTest.java
@@ -20,11 +20,11 @@ package org.apache.flink.optimizer;
 
 import static org.junit.Assert.*;
 
+import org.apache.flink.api.common.functions.*;
 import org.apache.flink.optimizer.dag.TempMode;
 import org.apache.flink.runtime.io.network.DataExchangeMode;
 import org.junit.Test;
 
-import org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.apache.flink.api.common.Plan;
@@ -33,11 +33,6 @@ import org.apache.flink.api.java.operators.DeltaIteration;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.operators.IterativeDataSet;
 import org.apache.flink.api.java.aggregation.Aggregations;
-import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.api.common.functions.JoinFunction;
-import org.apache.flink.api.common.functions.RichGroupReduceFunction;
-import org.apache.flink.api.common.functions.RichJoinFunction;
-import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
 import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
 import org.apache.flink.api.java.tuple.Tuple1;
@@ -53,6 +48,8 @@ import org.apache.flink.optimizer.testfunctions.IdentityMapper;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.apache.flink.util.Collector;
 
+import java.util.Iterator;
+
 @SuppressWarnings({"serial", "unchecked"})
 public class IterationsCompilerTest extends CompilerTestBase {
 
@@ -157,8 +154,16 @@ public class IterationsCompilerTest extends CompilerTestBase {
 			assertTrue(op.getDataSinks().iterator().next().getInput().getSource() instanceof WorksetIterationPlanNode);
 			
 			WorksetIterationPlanNode wipn = (WorksetIterationPlanNode) op.getDataSinks().iterator().next().getInput().getSource();
-			
-			assertEquals(ShipStrategyType.PARTITION_HASH, wipn.getInput1().getShipStrategy());
+			BulkIterationPlanNode bipn = (BulkIterationPlanNode)wipn.getInput1().getSource();
+
+			// the hash partitioning has been pushed out of the delta iteration into the bulk iteration
+			assertEquals(ShipStrategyType.FORWARD, wipn.getInput1().getShipStrategy());
+
+			// the input of the root step function is the last operator of the step function
+			// since the work has been pushed out of the bulk iteration, it has to guarantee the hash partitioning
+			for (Channel c : bipn.getRootOfStepFunction().getInputs()) {
+				assertEquals(ShipStrategyType.PARTITION_HASH, c.getShipStrategy());
+			}
 
 			assertEquals(DataExchangeMode.BATCH, wipn.getInput1().getDataExchangeMode());
 			assertEquals(DataExchangeMode.BATCH, wipn.getInput2().getDataExchangeMode());
@@ -223,7 +228,10 @@ public class IterationsCompilerTest extends CompilerTestBase {
 			DataSet<Tuple2<Long, Long>> input1 = env.readCsvFile("/some/file/path").types(Long.class).map(new DuplicateValue());
 			
 			DataSet<Tuple2<Long, Long>> input2 = env.readCsvFile("/some/file/path").types(Long.class, Long.class);
-			
+
+			// we do two join operations with input1 which is the partial solution
+			// it is cheaper to push the partitioning out so that the feedback channel and the
+			// initial input do the partitioning
 			doBulkIteration(input1, input2).output(new DiscardingOutputFormat<Tuple2<Long,Long>>());
 			
 			Plan p = env.createProgramPlan();
@@ -234,10 +242,17 @@ public class IterationsCompilerTest extends CompilerTestBase {
 			
 			BulkIterationPlanNode bipn = (BulkIterationPlanNode) op.getDataSinks().iterator().next().getInput().getSource();
 			
-			// check that work has not! been pushed out, as the end of the step function does not produce the necessary properties
+			// check that work has been pushed out
 			for (Channel c : bipn.getPartialSolutionPlanNode().getOutgoingChannels()) {
+				assertEquals(ShipStrategyType.FORWARD, c.getShipStrategy());
+			}
+
+			// the end of the step function has to produce the necessary properties
+			for (Channel c : bipn.getRootOfStepFunction().getInputs()) {
 				assertEquals(ShipStrategyType.PARTITION_HASH, c.getShipStrategy());
 			}
+
+			assertEquals(ShipStrategyType.PARTITION_HASH, bipn.getInput().getShipStrategy());
 			
 			new JobGraphGenerator().compileJobGraph(op);
 		}
@@ -246,6 +261,44 @@ public class IterationsCompilerTest extends CompilerTestBase {
 			fail(e.getMessage());
 		}
 	}
+
+	@Test
+	public void testIterationNotPushingWorkOut() throws Exception {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			env.setParallelism(8);
+
+			DataSet<Tuple2<Long, Long>> input1 = env.readCsvFile("/some/file/path").types(Long.class).map(new DuplicateValue());
+
+			DataSet<Tuple2<Long, Long>> input2 = env.readCsvFile("/some/file/path").types(Long.class, Long.class);
+
+			// Use input1 as partial solution. Partial solution is used in a single join operation --> it is cheaper
+			// to do the hash partitioning between the partial solution node and the join node
+			// instead of pushing the partitioning out
+			doSimpleBulkIteration(input1, input2).output(new DiscardingOutputFormat<Tuple2<Long,Long>>());
+
+			Plan p = env.createProgramPlan();
+			OptimizedPlan op = compileNoStats(p);
+
+			assertEquals(1, op.getDataSinks().size());
+			assertTrue(op.getDataSinks().iterator().next().getInput().getSource() instanceof BulkIterationPlanNode);
+
+			BulkIterationPlanNode bipn = (BulkIterationPlanNode) op.getDataSinks().iterator().next().getInput().getSource();
+
+			// check that work has not been pushed out
+			for (Channel c : bipn.getPartialSolutionPlanNode().getOutgoingChannels()) {
+				assertEquals(ShipStrategyType.PARTITION_HASH, c.getShipStrategy());
+			}
+
+			assertEquals(ShipStrategyType.FORWARD, bipn.getInput().getShipStrategy());
+
+			new JobGraphGenerator().compileJobGraph(op);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
 	
 	@Test
 	public void testWorksetIterationPipelineBreakerPlacement() {
@@ -322,6 +375,64 @@ public class IterationsCompilerTest extends CompilerTestBase {
 			fail(e.getMessage());
 		}
 	}
+
+	/**
+	 * Tests that interesting properties can be pushed out of the bulk iteration. This requires
+	 * that a NoOp node is appended to the step function which re-establishes the properties of
+	 * the initial input. If this does not work, then Flink won't find a plan, because the optimizer
+	 * will not consider plans where the partitioning is done after the partial solution node in
+	 * this case (because of pruning).
+	 * @throws Exception
+	 */
+	@Test
+	public void testBulkIterationWithPartialSolutionProperties() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple1<Long>> input1 = env.generateSequence(1, 10).map(new MapFunction<Long, Tuple1<Long>>() {
+			@Override
+			public Tuple1<Long> map(Long value) throws Exception {
+				return new Tuple1<>(value);
+			}
+		});
+
+		DataSet<Tuple1<Long>> input2 = env.generateSequence(1, 10).map(new MapFunction<Long, Tuple1<Long>>() {
+			@Override
+			public Tuple1<Long> map(Long value) throws Exception {
+				return new Tuple1<>(value);
+			}
+		});
+
+		DataSet<Tuple1<Long>> distinctInput = input1.distinct();
+
+		IterativeDataSet<Tuple1<Long>> iteration = distinctInput.iterate(10);
+
+		DataSet<Tuple1<Long>> iterationStep = iteration
+				.coGroup(input2)
+				.where(0)
+				.equalTo(0)
+				.with(new CoGroupFunction<Tuple1<Long>, Tuple1<Long>, Tuple1<Long>>() {
+					@Override
+					public void coGroup(
+							Iterable<Tuple1<Long>> first,
+							Iterable<Tuple1<Long>> second,
+							Collector<Tuple1<Long>> out) throws Exception {
+						Iterator<Tuple1<Long>> it = first.iterator();
+
+						if (it.hasNext()) {
+							out.collect(it.next());
+						}
+					}
+				});
+
+		DataSet<Tuple1<Long>> iterationResult = iteration.closeWith(iterationStep);
+
+		iterationResult.output(new DiscardingOutputFormat<Tuple1<Long>>());
+
+		Plan p = env.createProgramPlan();
+		OptimizedPlan op = compileNoStats(p);
+
+		new JobGraphGenerator().compileJobGraph(op);
+	}
 	
 	// --------------------------------------------------------------------------------------------
 	
@@ -339,6 +450,19 @@ public class IterationsCompilerTest extends CompilerTestBase {
 		// close the bulk iteration
 		return iteration.closeWith(changes);
 	}
+
+	public static DataSet<Tuple2<Long, Long>> doSimpleBulkIteration(DataSet<Tuple2<Long, Long>> vertices, DataSet<Tuple2<Long, Long>> edges) {
+
+		// open a bulk iteration
+		IterativeDataSet<Tuple2<Long, Long>> iteration = vertices.iterate(20);
+
+		DataSet<Tuple2<Long, Long>> changes = iteration
+				.join(edges).where(0).equalTo(0)
+				.flatMap(new FlatMapJoin());
+
+		// close the bulk iteration
+		return iteration.closeWith(changes);
+	}
 				
 		
 	public static DataSet<Tuple2<Long, Long>> doDeltaIteration(DataSet<Tuple2<Long, Long>> vertices, DataSet<Tuple2<Long, Long>> edges) {


[2/2] flink git commit: [doc] Fix Documentation formatting for recently added ExecutionEnvironment.readSequenceFile()

Posted by tr...@apache.org.
[doc] Fix Documentation formatting for recently added ExecutionEnvironment.readSequenceFile()

This closes #1389.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/968d8b49
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/968d8b49
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/968d8b49

Branch: refs/heads/release-0.10
Commit: 968d8b4969e0e5a2db3e389efd325e9ca198ad07
Parents: dd5dede
Author: smarthi <sm...@apache.org>
Authored: Fri Nov 20 09:34:55 2015 -0500
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri Nov 20 16:19:25 2015 +0100

----------------------------------------------------------------------
 docs/apis/programming_guide.md | 14 +++++++++-----
 1 file changed, 9 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/968d8b49/docs/apis/programming_guide.md
----------------------------------------------------------------------
diff --git a/docs/apis/programming_guide.md b/docs/apis/programming_guide.md
index 8c96980..31b8969 100644
--- a/docs/apis/programming_guide.md
+++ b/docs/apis/programming_guide.md
@@ -1783,10 +1783,12 @@ DataSet<Person>> csvInput = env.readCsvFile("hdfs:///the/CSV/file")
                                                  
 
 // read a file from the specified path of type TextInputFormat 
-DataSet<Tuple2<LongWritable, Text>> tuples = env.readHadoopFile(new TextInputFormat(), LongWritable.class, Text.class, "hdfs://nnHost:nnPort/path/to/file");
+DataSet<Tuple2<LongWritable, Text>> tuples =
+ env.readHadoopFile(new TextInputFormat(), LongWritable.class, Text.class, "hdfs://nnHost:nnPort/path/to/file");
                          
 // read a file from the specified path of type SequenceFileInputFormat
-DataSet<Tuple2<IntWritable, Text>> tuples = env.readSequenceFile(IntWritable.class, Text.class, "hdfs://nnHost:nnPort/path/to/file");
+DataSet<Tuple2<IntWritable, Text>> tuples =
+ env.readSequenceFile(IntWritable.class, Text.class, "hdfs://nnHost:nnPort/path/to/file");
 
 // creates a set from some given elements
 DataSet<String> value = env.fromElements("Foo", "bar", "foobar", "fubar");
@@ -1941,14 +1943,16 @@ val values = env.fromElements("Foo", "bar", "foobar", "fubar")
 
 // generate a number sequence
 val numbers = env.generateSequence(1, 10000000);
-{% endhighlight %}
 
 // read a file from the specified path of type TextInputFormat 
-val tuples = env.readHadoopFile(new TextInputFormat, classOf[LongWritable], classOf[Text], "hdfs://nnHost:nnPort/path/to/file")
+val tuples = env.readHadoopFile(new TextInputFormat, classOf[LongWritable],
+ classOf[Text], "hdfs://nnHost:nnPort/path/to/file")
                          
 // read a file from the specified path of type SequenceFileInputFormat
-val tuples = env.readSequenceFile(classOf[IntWritable], classOf[Text], "hdfs://nnHost:nnPort/path/to/file")
+val tuples = env.readSequenceFile(classOf[IntWritable], classOf[Text],
+ "hdfs://nnHost:nnPort/path/to/file")
 
+{% endhighlight %}
 
 #### Configuring CSV Parsing