You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/05/21 15:40:03 UTC
[4/5] flink git commit: [FLINK-1418] [apis] Minor cleanups for eager
on-client print() statement.
[FLINK-1418] [apis] Minor cleanups for eager on-client print() statement.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6220f34b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6220f34b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6220f34b
Branch: refs/heads/master
Commit: 6220f34b626e9ed92b190042e0d69020e19461f1
Parents: 78d954b
Author: Stephan Ewen <se...@apache.org>
Authored: Thu May 21 13:49:16 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu May 21 14:50:32 2015 +0200
----------------------------------------------------------------------
.../java/org/apache/flink/api/java/DataSet.java | 51 ++++++++++++--------
.../flink/api/java/ExecutionEnvironment.java | 10 ++--
.../org/apache/flink/api/scala/DataSet.scala | 41 ++++++++++------
.../flink/api/scala/ExecutionEnvironment.scala | 6 +--
4 files changed, 66 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/6220f34b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
index 133a083..157c666 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
@@ -338,7 +338,7 @@ public abstract class DataSet<T> {
*
* @see org.apache.flink.api.java.operators.AggregateOperator
*/
- public AggregateOperator<T> sum (int field) {
+ public AggregateOperator<T> sum(int field) {
return aggregate(Aggregations.SUM, field);
}
@@ -997,7 +997,7 @@ public abstract class DataSet<T> {
* <pre>
* {@code
* DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iteration =
- * initialState.iterateDelta(initialFeedbakSet, 100, 0);
+ * initialState.iterateDelta(initialFeedbackSet, 100, 0);
*
* DataSet<Tuple2<Long, Long>> delta = iteration.groupBy(0).aggregate(Aggregations.AVG, 1)
* .join(iteration.getSolutionSet()).where(0).equalTo(0)
@@ -1236,7 +1236,7 @@ public abstract class DataSet<T> {
* @see TextOutputFormat
*/
public DataSink<String> writeAsFormattedText(String filePath, TextFormatter<T> formatter) {
- return this.map(new FormattingMapper<T>(formatter)).writeAsText(filePath);
+ return map(new FormattingMapper<T>(clean(formatter))).writeAsText(filePath);
}
/**
@@ -1250,8 +1250,8 @@ public abstract class DataSet<T> {
*
* @see TextOutputFormat
*/
- public DataSink<String> writeAsFormattedText(String filePath, WriteMode writeMode, final TextFormatter<T> formatter) {
- return this.map(new FormattingMapper<T>(clean(formatter))).writeAsText(filePath, writeMode);
+ public DataSink<String> writeAsFormattedText(String filePath, WriteMode writeMode, TextFormatter<T> formatter) {
+ return map(new FormattingMapper<T>(clean(formatter))).writeAsText(filePath, writeMode);
}
/**
@@ -1333,11 +1333,16 @@ public abstract class DataSet<T> {
}
/**
- * Writes a DataSet to the standard output stream (stdout).<br/>
- * For each element of the DataSet the result of {@link Object#toString()} is written.
- * This triggers execute() automatically.
+ * Prints the elements in a DataSet to the standard output stream {@link System#out} of the JVM that calls
+ * the print() method. For programs that are executed in a cluster, this method needs
+ * to gather the contents of the DataSet back to the client, to print it there.
+ *
+ * <p>The string written for each element is defined by the {@link Object#toString()} method.</p>
+ *
+ * <p>This method immediately triggers the program execution, similar to the
+ * {@link #collect()} and {@link #count()} methods.</p>
*/
- public void print() throws Exception{
+ public void print() throws Exception {
List<T> elements = this.collect();
for (T e: elements) {
System.out.println(e);
@@ -1345,6 +1350,23 @@ public abstract class DataSet<T> {
}
/**
+ * Prints the elements in a DataSet to the standard error stream {@link System#err} of the JVM that calls
+ * the print() method. For programs that are executed in a cluster, this method needs
+ * to gather the contents of the DataSet back to the client, to print it there.
+ *
+ * <p>The string written for each element is defined by the {@link Object#toString()} method.</p>
+ *
+ * <p>This method immediately triggers the program execution, similar to the
+ * {@link #collect()} and {@link #count()} methods.</p>
+ */
+ public void printToErr() throws Exception {
+ List<T> elements = this.collect();
+ for (T e: elements) {
+ System.err.println(e);
+ }
+ }
+
+ /**
* Writes a DataSet to the standard output stream (stdout).<br/>
* For each element of the DataSet the result of {@link Object#toString()} is written.
*
@@ -1354,17 +1376,6 @@ public abstract class DataSet<T> {
public DataSink<T> print(String sinkIdentifier) {
return output(new PrintingOutputFormat<T>(sinkIdentifier, false));
}
-
- /**
- * Writes a DataSet to the standard error stream (stderr).<br/>
- * For each element of the DataSet the result of {@link Object#toString()} is written.
- */
- public void printToErr() throws Exception{
- List<T> elements = this.collect();
- for (T e: elements) {
- System.err.println(e);
- }
- }
/**
* Writes a DataSet to the standard error stream (stderr).<br/>
http://git-wip-us.apache.org/repos/asf/flink/blob/6220f34b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index 0f65b79..9c76409 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -94,9 +94,6 @@ import com.google.common.base.Preconditions;
*/
public abstract class ExecutionEnvironment {
-
- protected JobExecutionResult lastJobExecutionResult;
-
private static final Logger LOG = LoggerFactory.getLogger(ExecutionEnvironment.class);
/** The environment of the context (local by default, cluster if invoked through command line) */
@@ -116,8 +113,11 @@ public abstract class ExecutionEnvironment {
private final List<Tuple2<String, DistributedCacheEntry>> cacheFile = new ArrayList<Tuple2<String, DistributedCacheEntry>>();
- private ExecutionConfig config = new ExecutionConfig();
+ private final ExecutionConfig config = new ExecutionConfig();
+ /** Result from the latest execution, to be make it retrievable when using eager execution methods */
+ protected JobExecutionResult lastJobExecutionResult;
+
/** Flag to indicate whether sinks have been cleared in previous executions */
private boolean wasExecuted = false;
@@ -240,6 +240,8 @@ public abstract class ExecutionEnvironment {
/**
* Returns the {@link org.apache.flink.api.common.JobExecutionResult} of the last executed job.
+ *
+ * @return The execution result from the latest job execution.
*/
public JobExecutionResult getLastJobExecutionResult(){
return this.lastJobExecutionResult;
http://git-wip-us.apache.org/repos/asf/flink/blob/6220f34b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
index 5198157..e283e95 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
@@ -1319,16 +1319,37 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
def output(outputFormat: OutputFormat[T]): DataSink[T] = {
javaSet.output(outputFormat)
}
-
+
/**
- * Writes a DataSet to the standard output stream (stdout). This uses [[AnyRef.toString]] on
- * each element.
- * This triggers execute() automatically.
+ * Prints the elements in a DataSet to the standard output stream [[System.out]] of the
+ * JVM that calls the print() method. For programs that are executed in a cluster, this
+ * method needs to gather the contents of the DataSet back to the client, to print it
+ * there.
+ *
+ * The string written for each element is defined by the [[AnyRef.toString]] method.
+ *
+ * This method immediately triggers the program execution, similar to the
+ * [[collect()]] and [[count()]] methods.
*/
- def print() = {
+ def print(): Unit = {
javaSet.print()
}
-
+
+ /**
+ * Prints the elements in a DataSet to the standard error stream [[System.err]] of the
+ * JVM that calls the print() method. For programs that are executed in a cluster, this
+ * method needs to gather the contents of the DataSet back to the client, to print it
+ * there.
+ *
+ * The string written for each element is defined by the [[AnyRef.toString]] method.
+ *
+ * This method immediately triggers the program execution, similar to the
+ * [[collect()]] and [[count()]] methods.
+ */
+ def printToErr(): Unit = {
+ javaSet.printToErr()
+ }
+
/**
* *
* Writes a DataSet to the standard output stream (stdout) with a sink identifier prefixed.
@@ -1340,14 +1361,6 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
}
/**
- * Writes a DataSet to the standard error stream (stderr). This uses [[AnyRef.toString]] on
- * each element.
- */
- def printToErr() = {
- javaSet.printToErr()
- }
-
- /**
* Writes a DataSet to the standard error stream (stderr) with a sink identifier prefixed.
* This uses [[AnyRef.toString]] on each element.
* @param sinkIdentifier The string to prefix the output with.
http://git-wip-us.apache.org/repos/asf/flink/blob/6220f34b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
index 6cc327a..e01ff3b 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
@@ -130,11 +130,9 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
def getId: UUID = {
javaEnv.getId
}
-
-
+
/**
- * retrieves JobExecutionResult from last job execution (for "eager" print)
- * @return JobExecutionResult form last job execution
+ * Gets the JobExecutionResult of the last executed job.
*/
def getLastJobExecutionResult = javaEnv.getLastJobExecutionResult