You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gr...@apache.org on 2016/10/19 14:29:27 UTC
[1/3] flink git commit: [FLINK-4792] [docs] Update ML quickstart
import
Repository: flink
Updated Branches:
refs/heads/master ebcedc454 -> 428419d59
[FLINK-4792] [docs] Update ML quickstart import
This closes #2641
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/27c25d01
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/27c25d01
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/27c25d01
Branch: refs/heads/master
Commit: 27c25d01b1c4eb25ef914907560c10eaa53973f8
Parents: ebcedc4
Author: Neelesh Srinivas Salian <ns...@cloudera.com>
Authored: Sun Oct 16 17:54:20 2016 -0700
Committer: Greg Hogan <co...@greghogan.com>
Committed: Wed Oct 19 09:44:07 2016 -0400
----------------------------------------------------------------------
docs/dev/libs/ml/quickstart.md | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/27c25d01/docs/dev/libs/ml/quickstart.md
----------------------------------------------------------------------
diff --git a/docs/dev/libs/ml/quickstart.md b/docs/dev/libs/ml/quickstart.md
index 26b9275..50ca08a 100644
--- a/docs/dev/libs/ml/quickstart.md
+++ b/docs/dev/libs/ml/quickstart.md
@@ -87,7 +87,7 @@ We can load the data as a `DataSet[String]` first:
{% highlight scala %}
-import org.apache.flink.api.scala.ExecutionEnvironment
+import org.apache.flink.api.scala._
val env = ExecutionEnvironment.getExecutionEnvironment
[3/3] flink git commit: [FLINK-4586] [core] Broken AverageAccumulator
Posted by gr...@apache.org.
[FLINK-4586] [core] Broken AverageAccumulator
This closes #2639
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/428419d5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/428419d5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/428419d5
Branch: refs/heads/master
Commit: 428419d599d138f1647f84807d6d0224652f3d1b
Parents: cb78d70
Author: Greg Hogan <co...@greghogan.com>
Authored: Fri Oct 14 16:18:52 2016 -0400
Committer: Greg Hogan <co...@greghogan.com>
Committed: Wed Oct 19 09:46:49 2016 -0400
----------------------------------------------------------------------
.../common/accumulators/AverageAccumulator.java | 27 ++++++++++----------
.../accumulators/AverageAccumulatorTest.java | 18 ++++++++-----
2 files changed, 26 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/428419d5/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AverageAccumulator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AverageAccumulator.java b/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AverageAccumulator.java
index 9c0f62f..67cf572 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AverageAccumulator.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AverageAccumulator.java
@@ -28,29 +28,30 @@ import org.apache.flink.annotation.Public;
public class AverageAccumulator implements SimpleAccumulator<Double> {
private static final long serialVersionUID = 3672555084179165255L;
-
- private double localValue;
+
private long count;
+ private double sum;
+
@Override
public void add(Double value) {
this.count++;
- this.localValue += value;
+ this.sum += value;
}
public void add(double value) {
this.count++;
- this.localValue += value;
+ this.sum += value;
}
public void add(long value) {
this.count++;
- this.localValue += value;
+ this.sum += value;
}
public void add(int value) {
this.count++;
- this.localValue += value;
+ this.sum += value;
}
@Override
@@ -58,21 +59,21 @@ public class AverageAccumulator implements SimpleAccumulator<Double> {
if (this.count == 0) {
return 0.0;
}
- return this.localValue / (double)this.count;
+ return this.sum / this.count;
}
@Override
public void resetLocal() {
this.count = 0;
- this.localValue = 0;
+ this.sum = 0;
}
@Override
public void merge(Accumulator<Double, Double> other) {
if (other instanceof AverageAccumulator) {
- AverageAccumulator temp = (AverageAccumulator)other;
- this.count += temp.count;
- this.localValue += other.getLocalValue();
+ AverageAccumulator avg = (AverageAccumulator)other;
+ this.count += avg.count;
+ this.sum += avg.sum;
} else {
throw new IllegalArgumentException("The merged accumulator must be AverageAccumulator.");
}
@@ -81,13 +82,13 @@ public class AverageAccumulator implements SimpleAccumulator<Double> {
@Override
public AverageAccumulator clone() {
AverageAccumulator average = new AverageAccumulator();
- average.localValue = this.localValue;
average.count = this.count;
+ average.sum = this.sum;
return average;
}
@Override
public String toString() {
- return "AverageAccumulator " + this.localValue + " count " + this.count;
+ return "AverageAccumulator " + this.getLocalValue() + " for " + this.count + " elements";
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/428419d5/flink-core/src/test/java/org/apache/flink/api/common/accumulators/AverageAccumulatorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/accumulators/AverageAccumulatorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/accumulators/AverageAccumulatorTest.java
index 9ebd27c..585511f 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/accumulators/AverageAccumulatorTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/accumulators/AverageAccumulatorTest.java
@@ -83,12 +83,18 @@ public class AverageAccumulatorTest {
@Test
public void testMergeSuccess() {
- AverageAccumulator average = new AverageAccumulator();
- AverageAccumulator averageNew = new AverageAccumulator();
- average.add(1);
- averageNew.add(2);
- average.merge(averageNew);
- assertEquals(1.5, average.getLocalValue(), 0.0);
+ AverageAccumulator avg1 = new AverageAccumulator();
+ for (int i = 0; i < 5; i++) {
+ avg1.add(i);
+ }
+
+ AverageAccumulator avg2 = new AverageAccumulator();
+ for (int i = 5; i < 10; i++) {
+ avg2.add(i);
+ }
+
+ avg1.merge(avg2);
+ assertEquals(4.5, avg1.getLocalValue(), 0.0);
}
@Test
[2/3] flink git commit: [FLINK-4652] [streaming connectors]
Automatically refresh AWS credentials
Posted by gr...@apache.org.
[FLINK-4652] [streaming connectors] Automatically refresh AWS credentials
By using the credentials explicitly we are responsible for checking and
refreshing credentials before they expire. If no refreshment is done we
will encounter AmazonServiceException: 'The security token included in
the request is expired'. Utilize automatic refreshment of credentials
by passing the AWSCredentialsProvider directly to AmazonClient by removing
the getCredentials() call.
This closes #2635
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cb78d70b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cb78d70b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cb78d70b
Branch: refs/heads/master
Commit: cb78d70b270bfb6962a9731d49d0d19c6e5bc88a
Parents: 27c25d0
Author: Greg Hogan <co...@greghogan.com>
Authored: Fri Oct 14 09:24:14 2016 -0400
Committer: Greg Hogan <co...@greghogan.com>
Committed: Wed Oct 19 09:46:23 2016 -0400
----------------------------------------------------------------------
.../apache/flink/streaming/connectors/kinesis/util/AWSUtil.java | 3 ++-
1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/cb78d70b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
index 81c0b6b..cff69e5 100644
--- a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
+++ b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
@@ -51,8 +51,9 @@ public class AWSUtil {
awsClientConfig.setUserAgent("Apache Flink " + EnvironmentInformation.getVersion() +
" (" + EnvironmentInformation.getRevisionInformation().commitId + ") Kinesis Connector");
+ // utilize automatic refreshment of credentials by directly passing the AWSCredentialsProvider
AmazonKinesisClient client = new AmazonKinesisClient(
- AWSUtil.getCredentialsProvider(configProps).getCredentials(), awsClientConfig);
+ AWSUtil.getCredentialsProvider(configProps), awsClientConfig);
client.setRegion(Region.getRegion(Regions.fromName(configProps.getProperty(AWSConfigConstants.AWS_REGION))));
if (configProps.containsKey(AWSConfigConstants.AWS_ENDPOINT)) {