You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gr...@apache.org on 2016/10/19 14:29:27 UTC

[1/3] flink git commit: [FLINK-4792] [docs] Update ML quickstart import

Repository: flink
Updated Branches:
  refs/heads/master ebcedc454 -> 428419d59


[FLINK-4792] [docs] Update ML quickstart import

This closes #2641


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/27c25d01
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/27c25d01
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/27c25d01

Branch: refs/heads/master
Commit: 27c25d01b1c4eb25ef914907560c10eaa53973f8
Parents: ebcedc4
Author: Neelesh Srinivas Salian <ns...@cloudera.com>
Authored: Sun Oct 16 17:54:20 2016 -0700
Committer: Greg Hogan <co...@greghogan.com>
Committed: Wed Oct 19 09:44:07 2016 -0400

----------------------------------------------------------------------
 docs/dev/libs/ml/quickstart.md | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/27c25d01/docs/dev/libs/ml/quickstart.md
----------------------------------------------------------------------
diff --git a/docs/dev/libs/ml/quickstart.md b/docs/dev/libs/ml/quickstart.md
index 26b9275..50ca08a 100644
--- a/docs/dev/libs/ml/quickstart.md
+++ b/docs/dev/libs/ml/quickstart.md
@@ -87,7 +87,7 @@ We can load the data as a `DataSet[String]` first:
 
 {% highlight scala %}
 
-import org.apache.flink.api.scala.ExecutionEnvironment
+import org.apache.flink.api.scala._
 
 val env = ExecutionEnvironment.getExecutionEnvironment
 


[3/3] flink git commit: [FLINK-4586] [core] Broken AverageAccumulator

Posted by gr...@apache.org.
[FLINK-4586] [core] Broken AverageAccumulator

This closes #2639


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/428419d5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/428419d5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/428419d5

Branch: refs/heads/master
Commit: 428419d599d138f1647f84807d6d0224652f3d1b
Parents: cb78d70
Author: Greg Hogan <co...@greghogan.com>
Authored: Fri Oct 14 16:18:52 2016 -0400
Committer: Greg Hogan <co...@greghogan.com>
Committed: Wed Oct 19 09:46:49 2016 -0400

----------------------------------------------------------------------
 .../common/accumulators/AverageAccumulator.java | 27 ++++++++++----------
 .../accumulators/AverageAccumulatorTest.java    | 18 ++++++++-----
 2 files changed, 26 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/428419d5/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AverageAccumulator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AverageAccumulator.java b/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AverageAccumulator.java
index 9c0f62f..67cf572 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AverageAccumulator.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AverageAccumulator.java
@@ -28,29 +28,30 @@ import org.apache.flink.annotation.Public;
 public class AverageAccumulator implements SimpleAccumulator<Double> {
 
 	private static final long serialVersionUID = 3672555084179165255L;
-	
-	private double localValue;
+
 	private long count;
 
+	private double sum;
+
 	@Override
 	public void add(Double value) {
 		this.count++;
-		this.localValue += value;
+		this.sum += value;
 	}
 
 	public void add(double value) {
 		this.count++;
-		this.localValue += value;
+		this.sum += value;
 	}
 
 	public void add(long value) {
 		this.count++;
-		this.localValue += value;
+		this.sum += value;
 	}
 
 	public void add(int value) {
 		this.count++;
-		this.localValue += value;
+		this.sum += value;
 	}
 
 	@Override
@@ -58,21 +59,21 @@ public class AverageAccumulator implements SimpleAccumulator<Double> {
 		if (this.count == 0) {
 			return 0.0;
 		}
-		return this.localValue / (double)this.count;
+		return this.sum / this.count;
 	}
 
 	@Override
 	public void resetLocal() {
 		this.count = 0;
-		this.localValue = 0;
+		this.sum = 0;
 	}
 
 	@Override
 	public void merge(Accumulator<Double, Double> other) {
 		if (other instanceof AverageAccumulator) {
-			AverageAccumulator temp = (AverageAccumulator)other;
-			this.count += temp.count;
-			this.localValue += other.getLocalValue();
+			AverageAccumulator avg = (AverageAccumulator)other;
+			this.count += avg.count;
+			this.sum += avg.sum;
 		} else {
 			throw new IllegalArgumentException("The merged accumulator must be AverageAccumulator.");
 		}
@@ -81,13 +82,13 @@ public class AverageAccumulator implements SimpleAccumulator<Double> {
 	@Override
 	public AverageAccumulator clone() {
 		AverageAccumulator average = new AverageAccumulator();
-		average.localValue = this.localValue;
 		average.count = this.count;
+		average.sum = this.sum;
 		return average;
 	}
 
 	@Override
 	public String toString() {
-		return "AverageAccumulator " + this.localValue + " count " + this.count;
+		return "AverageAccumulator " + this.getLocalValue() + " for " + this.count + " elements";
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/428419d5/flink-core/src/test/java/org/apache/flink/api/common/accumulators/AverageAccumulatorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/accumulators/AverageAccumulatorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/accumulators/AverageAccumulatorTest.java
index 9ebd27c..585511f 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/accumulators/AverageAccumulatorTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/accumulators/AverageAccumulatorTest.java
@@ -83,12 +83,18 @@ public class AverageAccumulatorTest {
 
 	@Test
 	public void testMergeSuccess() {
-		AverageAccumulator average = new AverageAccumulator();
-		AverageAccumulator averageNew = new AverageAccumulator();
-		average.add(1);
-		averageNew.add(2);
-		average.merge(averageNew);
-		assertEquals(1.5, average.getLocalValue(), 0.0);
+		AverageAccumulator avg1 = new AverageAccumulator();
+		for (int i = 0; i < 5; i++) {
+			avg1.add(i);
+		}
+
+		AverageAccumulator avg2 = new AverageAccumulator();
+		for (int i = 5; i < 10; i++) {
+			avg2.add(i);
+		}
+
+		avg1.merge(avg2);
+		assertEquals(4.5, avg1.getLocalValue(), 0.0);
 	}
 
 	@Test


[2/3] flink git commit: [FLINK-4652] [streaming connectors] Automatically refresh AWS credentials

Posted by gr...@apache.org.
[FLINK-4652] [streaming connectors] Automatically refresh AWS credentials

By using the credentials explicitly we are responsible for checking and
refreshing credentials before they expire. If no refreshment is done we
will encounter AmazonServiceException: 'The security token included in
the request is expired'. Utilize automatic refreshment of credentials
by passing the AWSCredentialsProvider directly to AmazonClient by removing
the getCredentials() call.

This closes #2635


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cb78d70b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cb78d70b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cb78d70b

Branch: refs/heads/master
Commit: cb78d70b270bfb6962a9731d49d0d19c6e5bc88a
Parents: 27c25d0
Author: Greg Hogan <co...@greghogan.com>
Authored: Fri Oct 14 09:24:14 2016 -0400
Committer: Greg Hogan <co...@greghogan.com>
Committed: Wed Oct 19 09:46:23 2016 -0400

----------------------------------------------------------------------
 .../apache/flink/streaming/connectors/kinesis/util/AWSUtil.java   | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/cb78d70b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
index 81c0b6b..cff69e5 100644
--- a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
+++ b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
@@ -51,8 +51,9 @@ public class AWSUtil {
 		awsClientConfig.setUserAgent("Apache Flink " + EnvironmentInformation.getVersion() +
 			" (" + EnvironmentInformation.getRevisionInformation().commitId + ") Kinesis Connector");
 
+		// utilize automatic refreshment of credentials by directly passing the AWSCredentialsProvider
 		AmazonKinesisClient client = new AmazonKinesisClient(
-			AWSUtil.getCredentialsProvider(configProps).getCredentials(), awsClientConfig);
+			AWSUtil.getCredentialsProvider(configProps), awsClientConfig);
 
 		client.setRegion(Region.getRegion(Regions.fromName(configProps.getProperty(AWSConfigConstants.AWS_REGION))));
 		if (configProps.containsKey(AWSConfigConstants.AWS_ENDPOINT)) {