You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/05/24 14:46:19 UTC
[1/8] flink git commit: [FLINK-6669] set inputEncoding to UTF-8 in
scalastyle-maven-plugin
Repository: flink
Updated Branches:
refs/heads/master 61914abff -> f827d730e
[FLINK-6669] set inputEncoding to UTF-8 in scalastyle-maven-plugin
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f827d730
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f827d730
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f827d730
Branch: refs/heads/master
Commit: f827d730e40ac1c71fe974d4fd674e55ad530cdb
Parents: ed65c25
Author: lingjinjiang <li...@gmail.com>
Authored: Tue May 23 10:51:46 2017 +0800
Committer: zentol <ch...@apache.org>
Committed: Wed May 24 16:46:03 2017 +0200
----------------------------------------------------------------------
pom.xml | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/f827d730/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 8c5f622..95fcf1e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1337,6 +1337,7 @@ under the License.
<sourceDirectory>${basedir}/src/main/scala</sourceDirectory>
<testSourceDirectory>${basedir}/src/test/scala</testSourceDirectory>
<outputFile>${project.basedir}/target/scalastyle-output.xml</outputFile>
+ <inputEncoding>UTF-8</inputEncoding>
<outputEncoding>UTF-8</outputEncoding>
</configuration>
</plugin>
[8/8] flink git commit: [FLINK-6659] fix SavepointITCase leaving
temporary data behind
Posted by ch...@apache.org.
[FLINK-6659] fix SavepointITCase leaving temporary data behind
-> use a JUnit '@Rule' that does the cleanup
This closes #3962.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fe671476
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fe671476
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fe671476
Branch: refs/heads/master
Commit: fe6714760b0feb688aa2e8169b1a73028e42afd4
Parents: b58a420
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Mon May 22 16:55:39 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Wed May 24 16:46:03 2017 +0200
----------------------------------------------------------------------
.../org/apache/flink/test/checkpointing/SavepointITCase.java | 7 +++----
1 file changed, 3 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/fe671476/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index 72a1b63..1c8a429 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -68,7 +68,6 @@ import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.ResponseS
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.WaitForAllVerticesToBeRunning;
import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.ResponseSubmitTaskListener;
-import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.IterativeStream;
@@ -143,7 +142,7 @@ public class SavepointITCase extends TestLogger {
final int numSlotsPerTaskManager = 2;
final int parallelism = numTaskManagers * numSlotsPerTaskManager;
final Deadline deadline = new FiniteDuration(5, TimeUnit.MINUTES).fromNow();
- final File testRoot = folder.newFolder();
+ final File testRoot = folder.getRoot();
TestingCluster flink = null;
@@ -424,7 +423,7 @@ public class SavepointITCase extends TestLogger {
// Test deadline
final Deadline deadline = new FiniteDuration(5, TimeUnit.MINUTES).fromNow();
- final File tmpDir = CommonTestUtils.createTempDirectory();
+ final File tmpDir = folder.getRoot();
final File savepointDir = new File(tmpDir, "savepoints");
TestingCluster flink = null;
@@ -494,7 +493,7 @@ public class SavepointITCase extends TestLogger {
// Test deadline
final Deadline deadline = new FiniteDuration(5, TimeUnit.MINUTES).fromNow();
- final File tmpDir = CommonTestUtils.createTempDirectory();
+ final File tmpDir = folder.getRoot();
final File savepointDir = new File(tmpDir, "savepoints");
TestingCluster flink = null;
[2/8] flink git commit: [FLINK-6320] fix unit test failing sometimes
when deleting a temp directory
Posted by ch...@apache.org.
[FLINK-6320] fix unit test failing sometimes when deleting a temp directory
This closes #3966.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6ad3d140
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6ad3d140
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6ad3d140
Branch: refs/heads/master
Commit: 6ad3d140f35722055c9011dbee88d19319cfbfbe
Parents: fe67147
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Mon May 22 16:31:08 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Wed May 24 16:46:03 2017 +0200
----------------------------------------------------------------------
.../JobManagerHAJobGraphRecoveryITCase.java | 36 ++++++--------------
1 file changed, 10 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/6ad3d140/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java
index 052195a..80b8e18 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java
@@ -49,7 +49,6 @@ import org.apache.flink.runtime.taskmanager.TaskManager;
import org.apache.flink.runtime.testingUtils.TestingCluster;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable;
-import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.JobManagerActorTestUtils;
import org.apache.flink.runtime.testutils.JobManagerProcess;
import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
@@ -58,7 +57,9 @@ import org.apache.flink.util.TestLogger;
import org.apache.zookeeper.data.Stat;
import org.junit.AfterClass;
import org.junit.Before;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
import scala.Option;
import scala.Some;
import scala.Tuple2;
@@ -68,7 +69,6 @@ import scala.concurrent.duration.Deadline;
import scala.concurrent.duration.FiniteDuration;
import java.io.File;
-import java.io.IOException;
import java.util.Collection;
import java.util.Queue;
import java.util.UUID;
@@ -88,32 +88,16 @@ public class JobManagerHAJobGraphRecoveryITCase extends TestLogger {
private final static FiniteDuration TestTimeOut = new FiniteDuration(5, TimeUnit.MINUTES);
- private static final File FileStateBackendBasePath;
-
- static {
- try {
- FileStateBackendBasePath = CommonTestUtils.createTempDirectory();
- }
- catch (IOException e) {
- throw new RuntimeException("Error in test setup. Could not create directory.", e);
- }
- }
+ @Rule
+ public TemporaryFolder tempFolder = new TemporaryFolder();
@AfterClass
public static void tearDown() throws Exception {
ZooKeeper.shutdown();
-
- if (FileStateBackendBasePath != null) {
- FileUtils.deleteDirectory(FileStateBackendBasePath);
- }
}
@Before
public void cleanUp() throws Exception {
- if (FileStateBackendBasePath != null) {
- FileUtils.cleanDirectory(FileStateBackendBasePath);
- }
-
ZooKeeper.deleteAll();
}
@@ -125,7 +109,7 @@ public class JobManagerHAJobGraphRecoveryITCase extends TestLogger {
@Test
public void testJobPersistencyWhenJobManagerShutdown() throws Exception {
Configuration config = ZooKeeperTestUtils.createZooKeeperHAConfig(
- ZooKeeper.getConnectString(), FileStateBackendBasePath.getPath());
+ ZooKeeper.getConnectString(), tempFolder.getRoot().getPath());
// Configure the cluster
config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 1);
@@ -172,7 +156,7 @@ public class JobManagerHAJobGraphRecoveryITCase extends TestLogger {
@Test
public void testClientNonDetachedListeningBehaviour() throws Exception {
Configuration config = ZooKeeperTestUtils.createZooKeeperHAConfig(
- ZooKeeper.getConnectString(), FileStateBackendBasePath.getPath());
+ ZooKeeper.getConnectString(), tempFolder.getRoot().getPath());
// Test actor system
ActorSystem testSystem = null;
@@ -397,10 +381,10 @@ public class JobManagerHAJobGraphRecoveryITCase extends TestLogger {
/**
* Fails the test if the recovery state (file state backend and ZooKeeper) is not clean.
*/
- private static void verifyCleanRecoveryState(Configuration config) throws Exception {
+ private void verifyCleanRecoveryState(Configuration config) throws Exception {
// File state backend empty
Collection<File> stateHandles = FileUtils.listFiles(
- FileStateBackendBasePath, TrueFileFilter.INSTANCE, TrueFileFilter.INSTANCE);
+ tempFolder.getRoot(), TrueFileFilter.INSTANCE, TrueFileFilter.INSTANCE);
if (!stateHandles.isEmpty()) {
fail("File state backend is not clean: " + stateHandles);
@@ -429,10 +413,10 @@ public class JobManagerHAJobGraphRecoveryITCase extends TestLogger {
/**
* Fails the test if the recovery state (file state backend and ZooKeeper) has been cleaned.
*/
- private static void verifyRecoveryState(Configuration config) throws Exception {
+ private void verifyRecoveryState(Configuration config) throws Exception {
// File state backend empty
Collection<File> stateHandles = FileUtils.listFiles(
- FileStateBackendBasePath, TrueFileFilter.INSTANCE, TrueFileFilter.INSTANCE);
+ tempFolder.getRoot(), TrueFileFilter.INSTANCE, TrueFileFilter.INSTANCE);
if (stateHandles.isEmpty()) {
fail("File state backend has been cleaned: " + stateHandles);
[7/8] flink git commit: [FLINK-5376] Fix log statement in
UnorderedStreamElementQueue
Posted by ch...@apache.org.
[FLINK-5376] Fix log statement in UnorderedStreamElementQueue
This closes #3948.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ed65c253
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ed65c253
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ed65c253
Branch: refs/heads/master
Commit: ed65c253478e688fdfbb149a1a75dc01b8537cab
Parents: 6ad3d14
Author: zentol <ch...@apache.org>
Authored: Fri May 19 14:33:18 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Wed May 24 16:46:03 2017 +0200
----------------------------------------------------------------------
.../api/operators/async/queue/UnorderedStreamElementQueue.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/ed65c253/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java
index c9dc358..e6f71bf 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java
@@ -148,7 +148,7 @@ public class UnorderedStreamElementQueue implements StreamElementQueue {
hasCompletedEntries.await();
}
- LOG.debug("Peeked head element from ordered stream element queue with filling degree " +
+ LOG.debug("Peeked head element from unordered stream element queue with filling degree " +
"({}/{}).", numberEntries, capacity);
return completedQueue.peek();
[3/8] flink git commit: [FLINK-6432] [py] Activate strict checkstyle
for flink-python
Posted by ch...@apache.org.
[FLINK-6432] [py] Activate strict checkstyle for flink-python
This closes #3969.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/04fae536
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/04fae536
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/04fae536
Branch: refs/heads/master
Commit: 04fae5362bdc933c111c92a9cc3b3b2c1d71850e
Parents: ce573c6
Author: zentol <ch...@apache.org>
Authored: Tue May 2 18:03:48 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Wed May 24 16:46:03 2017 +0200
----------------------------------------------------------------------
flink-libraries/flink-python/pom.xml | 35 +++++++++++++++
.../flink/python/api/PythonOperationInfo.java | 14 ++++--
.../apache/flink/python/api/PythonOptions.java | 5 ++-
.../flink/python/api/PythonPlanBinder.java | 6 ++-
.../python/api/functions/PythonCoGroup.java | 6 ++-
.../api/functions/PythonMapPartition.java | 4 +-
.../api/functions/util/IdentityGroupReduce.java | 10 +++--
.../python/api/functions/util/KeyDiscarder.java | 7 +--
.../api/functions/util/NestedKeyDiscarder.java | 8 ++--
.../api/functions/util/SerializerMap.java | 7 +--
.../functions/util/StringDeserializerMap.java | 7 +--
.../util/StringTupleDeserializerMap.java | 7 +--
.../streaming/data/PythonDualInputSender.java | 1 +
.../streaming/data/PythonDualInputStreamer.java | 1 +
.../api/streaming/data/PythonReceiver.java | 13 +++---
.../python/api/streaming/data/PythonSender.java | 19 +++++++--
.../streaming/data/PythonSingleInputSender.java | 1 +
.../data/PythonSingleInputStreamer.java | 1 +
.../api/streaming/data/PythonStreamer.java | 2 +
.../data/SingleElementPushBackIterator.java | 1 +
.../api/streaming/plan/PythonPlanReceiver.java | 12 +++---
.../api/streaming/plan/PythonPlanSender.java | 4 +-
.../api/streaming/plan/PythonPlanStreamer.java | 10 +++--
.../api/streaming/util/SerializationUtils.java | 45 ++++++++++++++++++++
.../api/streaming/util/StreamPrinter.java | 1 +
.../python/api/types/CustomTypeWrapper.java | 1 +
.../apache/flink/python/api/util/SetCache.java | 1 +
.../flink/python/api/PythonPlanBinderTest.java | 6 ++-
.../data/SingleElementPushBackIteratorTest.java | 4 ++
29 files changed, 189 insertions(+), 50 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/04fae536/flink-libraries/flink-python/pom.xml
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/pom.xml b/flink-libraries/flink-python/pom.xml
index 6a57c3e..f6cea0c 100644
--- a/flink-libraries/flink-python/pom.xml
+++ b/flink-libraries/flink-python/pom.xml
@@ -48,6 +48,41 @@ under the License.
</archive>
</configuration>
</plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-checkstyle-plugin</artifactId>
+ <version>2.17</version>
+ <dependencies>
+ <dependency>
+ <groupId>com.puppycrawl.tools</groupId>
+ <artifactId>checkstyle</artifactId>
+ <version>6.19</version>
+ </dependency>
+ </dependencies>
+ <configuration>
+ <configLocation>/tools/maven/strict-checkstyle.xml</configLocation>
+ <suppressionsLocation>/tools/maven/suppressions.xml</suppressionsLocation>
+ <includeTestSourceDirectory>true</includeTestSourceDirectory>
+ <logViolationsToConsole>true</logViolationsToConsole>
+ <failOnViolation>true</failOnViolation>
+ </configuration>
+ <executions>
+ <!--
+ Execute checkstyle after compilation but before tests.
+
+ This ensures that any parsing or type checking errors are from
+ javac, so they look as expected. Beyond that, we want to
+ fail as early as possible.
+ -->
+ <execution>
+ <phase>test-compile</phase>
+ <goals>
+ <goal>check</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
</plugins>
</build>
http://git-wip-us.apache.org/repos/asf/flink/blob/04fae536/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java
index 694c1b4..42eeffb 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java
@@ -10,17 +10,23 @@
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
+
package org.apache.flink.python.api;
-import java.io.IOException;
-import java.util.Arrays;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple;
-import static org.apache.flink.api.java.typeutils.TypeExtractor.getForObject;
import org.apache.flink.core.fs.FileSystem.WriteMode;
import org.apache.flink.python.api.streaming.plan.PythonPlanStreamer;
+import java.io.IOException;
+import java.util.Arrays;
+
+import static org.apache.flink.api.java.typeutils.TypeExtractor.getForObject;
+
+/**
+ * Generic container for all information required to an operation to the DataSet API.
+ */
public class PythonOperationInfo {
public String identifier;
public int parentID; //DataSet that an operation is applied on
@@ -121,7 +127,7 @@ public class PythonOperationInfo {
return sb.toString();
}
- public enum DatasizeHint {
+ enum DatasizeHint {
NONE,
TINY,
HUGE
http://git-wip-us.apache.org/repos/asf/flink/blob/04fae536/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOptions.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOptions.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOptions.java
index de053a0..4137c11 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOptions.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOptions.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.python.api;
import org.apache.flink.configuration.ConfigOption;
@@ -40,8 +41,8 @@ public class PythonOptions {
* The config parameter defining the size of the memory-mapped files, in kb.
* This value must be large enough to ensure that the largest serialized record can be written completely into
* the file.
- *
- * Every task will allocate 2 memory-files, each with this size.
+ *
+ * <p>Every task will allocate 2 memory-files, each with this size.
*/
public static final ConfigOption<Long> MMAP_FILE_SIZE =
key("python.mmap.size.kb")
http://git-wip-us.apache.org/repos/asf/flink/blob/04fae536/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
index 2378d60..810c8cd 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
@@ -10,6 +10,7 @@
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
+
package org.apache.flink.python.api;
import org.apache.flink.api.common.JobExecutionResult;
@@ -44,6 +45,7 @@ import org.apache.flink.python.api.functions.util.StringTupleDeserializerMap;
import org.apache.flink.python.api.streaming.plan.PythonPlanStreamer;
import org.apache.flink.python.api.util.SetCache;
import org.apache.flink.runtime.filecache.FileCache;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -104,9 +106,9 @@ public class PythonPlanBinder {
tmpPlanFilesDir = configuredPlanTmpPath != null
? configuredPlanTmpPath
: System.getProperty("java.io.tmpdir") + File.separator + "flink_plan_" + UUID.randomUUID();
-
+
tmpDistributedDir = new Path(globalConfig.getString(PythonOptions.DC_TMP_DIR));
-
+
String flinkRootDir = System.getenv("FLINK_ROOT_DIR");
pythonLibraryPath = flinkRootDir != null
//command-line
http://git-wip-us.apache.org/repos/asf/flink/blob/04fae536/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonCoGroup.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonCoGroup.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonCoGroup.java
index a5e3e75..eaec88525 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonCoGroup.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonCoGroup.java
@@ -10,16 +10,18 @@
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
+
package org.apache.flink.python.api.functions;
+import org.apache.flink.api.common.functions.RichCoGroupFunction;
import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.python.api.streaming.data.PythonDualInputStreamer;
import org.apache.flink.util.Collector;
+
import java.io.IOException;
-import org.apache.flink.api.common.functions.RichCoGroupFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
/**
* CoGroupFunction that uses a python script.
http://git-wip-us.apache.org/repos/asf/flink/blob/04fae536/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonMapPartition.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonMapPartition.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonMapPartition.java
index 207ead9..924538e 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonMapPartition.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonMapPartition.java
@@ -10,9 +10,9 @@
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
+
package org.apache.flink.python.api.functions;
-import java.io.IOException;
import org.apache.flink.api.common.functions.RichMapPartitionFunction;
import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -21,6 +21,8 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.python.api.streaming.data.PythonSingleInputStreamer;
import org.apache.flink.util.Collector;
+import java.io.IOException;
+
/**
* Multi-purpose class, usable by all operations using a python script with one input source and possibly differing
* in-/output types.
http://git-wip-us.apache.org/repos/asf/flink/blob/04fae536/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/IdentityGroupReduce.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/IdentityGroupReduce.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/IdentityGroupReduce.java
index 32fd22a..6d82e34 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/IdentityGroupReduce.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/IdentityGroupReduce.java
@@ -10,15 +10,17 @@
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
+
package org.apache.flink.python.api.functions.util;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
import org.apache.flink.util.Collector;
-import org.apache.flink.api.common.functions.GroupReduceFunction;
-/*
-Utility function to group and sort data.
-*/
+/**
+ * Utility function to group and sort data.
+ * @param <IN> input type
+ */
@ForwardedFields("*->*")
public class IdentityGroupReduce<IN> implements GroupReduceFunction<IN, IN> {
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/04fae536/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/KeyDiscarder.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/KeyDiscarder.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/KeyDiscarder.java
index b2af7be..985eee5 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/KeyDiscarder.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/KeyDiscarder.java
@@ -10,15 +10,16 @@
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
+
package org.apache.flink.python.api.functions.util;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
import org.apache.flink.api.java.tuple.Tuple2;
-/*
-Utility function to extract the value from a Key-Value Tuple.
-*/
+/**
+ * Utility function to extract the value from a Key-Value Tuple.
+ */
@ForwardedFields("f1->*")
public class KeyDiscarder <T> implements MapFunction<Tuple2<T, byte[]>, byte[]> {
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/04fae536/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/NestedKeyDiscarder.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/NestedKeyDiscarder.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/NestedKeyDiscarder.java
index 4c8511e..a2c9e52 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/NestedKeyDiscarder.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/NestedKeyDiscarder.java
@@ -10,6 +10,7 @@
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
+
package org.apache.flink.python.api.functions.util;
import org.apache.flink.api.common.functions.MapFunction;
@@ -17,9 +18,10 @@ import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
-/*
-Utility function to extract values from 2 Key-Value Tuples after a DefaultJoin.
-*/
+/**
+ * Utility function to extract values from 2 Key-Value Tuples after a DefaultJoin.
+ * @param <IN> input type
+ */
@ForwardedFields("f0.f1->f0; f1.f1->f1")
public class NestedKeyDiscarder<IN> implements MapFunction<IN, Tuple2<byte[], byte[]>> {
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/04fae536/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/SerializerMap.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/SerializerMap.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/SerializerMap.java
index 116efd4..aaa7544 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/SerializerMap.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/SerializerMap.java
@@ -10,15 +10,16 @@
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
+
package org.apache.flink.python.api.functions.util;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.python.api.streaming.util.SerializationUtils;
import org.apache.flink.python.api.streaming.util.SerializationUtils.Serializer;
-/*
-Utility function to serialize values, usually directly from data sources.
-*/
+/**
+ * Utility function to serialize values, usually directly from data sources.
+ */
public class SerializerMap<IN> implements MapFunction<IN, byte[]> {
private transient Serializer<IN> serializer;
http://git-wip-us.apache.org/repos/asf/flink/blob/04fae536/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/StringDeserializerMap.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/StringDeserializerMap.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/StringDeserializerMap.java
index 3d79b08..c72b4be 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/StringDeserializerMap.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/StringDeserializerMap.java
@@ -10,14 +10,15 @@
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
+
package org.apache.flink.python.api.functions.util;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.configuration.ConfigConstants;
-/*
-Utility function to deserialize strings, used for non-CSV sinks.
-*/
+/**
+ * Utility function to deserialize strings, used for non-CSV sinks.
+ */
public class StringDeserializerMap implements MapFunction<byte[], String> {
@Override
public String map(byte[] value) throws Exception {
http://git-wip-us.apache.org/repos/asf/flink/blob/04fae536/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/StringTupleDeserializerMap.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/StringTupleDeserializerMap.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/StringTupleDeserializerMap.java
index af5eac6..48082ac 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/StringTupleDeserializerMap.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/StringTupleDeserializerMap.java
@@ -10,15 +10,16 @@
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
+
package org.apache.flink.python.api.functions.util;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.configuration.ConfigConstants;
-/*
-Utility function to deserialize strings, used for CSV sinks.
-*/
+/**
+ * Utility function to deserialize strings, used for CSV sinks.
+ */
public class StringTupleDeserializerMap implements MapFunction<byte[], Tuple1<String>> {
@Override
public Tuple1<String> map(byte[] value) throws Exception {
http://git-wip-us.apache.org/repos/asf/flink/blob/04fae536/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonDualInputSender.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonDualInputSender.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonDualInputSender.java
index a16f522..de129b1 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonDualInputSender.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonDualInputSender.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.python.api.streaming.data;
import org.apache.flink.configuration.Configuration;
http://git-wip-us.apache.org/repos/asf/flink/blob/04fae536/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonDualInputStreamer.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonDualInputStreamer.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonDualInputStreamer.java
index 8c9fde9..bc3e546 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonDualInputStreamer.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonDualInputStreamer.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.python.api.streaming.data;
import org.apache.flink.api.common.functions.AbstractRichFunction;
http://git-wip-us.apache.org/repos/asf/flink/blob/04fae536/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonReceiver.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonReceiver.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonReceiver.java
index c7c1f7a..d8436c7 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonReceiver.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonReceiver.java
@@ -10,20 +10,21 @@
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
+
package org.apache.flink.python.api.streaming.data;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.python.api.PythonOptions;
+import org.apache.flink.util.Collector;
+
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.io.Serializable;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.tuple.Tuple2;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.python.api.PythonOptions;
-import org.apache.flink.util.Collector;
/**
* This class is used to read data from memory-mapped files.
http://git-wip-us.apache.org/repos/asf/flink/blob/04fae536/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSender.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSender.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSender.java
index 3d13271..2739fb1 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSender.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSender.java
@@ -10,6 +10,7 @@
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
+
package org.apache.flink.python.api.streaming.data;
import org.apache.flink.api.java.tuple.Tuple;
@@ -41,7 +42,7 @@ public abstract class PythonSender implements Serializable {
private transient MappedByteBuffer fileBuffer;
private final long mappedFileSizeBytes;
-
+
private final Configuration config;
protected PythonSender(Configuration config) {
@@ -59,7 +60,6 @@ public abstract class PythonSender implements Serializable {
outputFile.createNewFile();
outputRAF = new RandomAccessFile(outputFile, "rw");
-
outputRAF.setLength(mappedFileSizeBytes);
outputRAF.seek(mappedFileSizeBytes - 1);
outputRAF.writeByte(0);
@@ -125,16 +125,29 @@ public abstract class PythonSender implements Serializable {
throw new IllegalArgumentException("This object can't be serialized: " + value);
}
+ /**
+ * Interface for all serializers used by {@link PythonSender} classes to write container objects.
+ *
+ * <p>These serializers must be kept in sync with the python counterparts.
+ *
+ * @param <T> input type
+ */
protected abstract static class Serializer<T> {
protected ByteBuffer buffer;
+ /**
+ * Serializes the given value into a {@link ByteBuffer}.
+ *
+ * @param value value to serialize
+ * @return ByteBuffer containing serialized record
+ */
public ByteBuffer serialize(T value) {
serializeInternal(value);
buffer.flip();
return buffer;
}
- public abstract void serializeInternal(T value);
+ protected abstract void serializeInternal(T value);
}
private static class ArraySerializer extends Serializer<byte[]> {
http://git-wip-us.apache.org/repos/asf/flink/blob/04fae536/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSingleInputSender.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSingleInputSender.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSingleInputSender.java
index 74d0604..af093f1 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSingleInputSender.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSingleInputSender.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.python.api.streaming.data;
import org.apache.flink.configuration.Configuration;
http://git-wip-us.apache.org/repos/asf/flink/blob/04fae536/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSingleInputStreamer.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSingleInputStreamer.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSingleInputStreamer.java
index 6c0a13c..d596c39 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSingleInputStreamer.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSingleInputStreamer.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.python.api.streaming.data;
import org.apache.flink.api.common.functions.AbstractRichFunction;
http://git-wip-us.apache.org/repos/asf/flink/blob/04fae536/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
index cc4ba43..3fec947 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
@@ -10,6 +10,7 @@
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
+
package org.apache.flink.python.api.streaming.data;
import org.apache.flink.api.common.functions.AbstractRichFunction;
@@ -20,6 +21,7 @@ import org.apache.flink.python.api.streaming.util.SerializationUtils.IntSerializ
import org.apache.flink.python.api.streaming.util.SerializationUtils.StringSerializer;
import org.apache.flink.python.api.streaming.util.StreamPrinter;
import org.apache.flink.util.ExceptionUtils;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/flink/blob/04fae536/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/SingleElementPushBackIterator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/SingleElementPushBackIterator.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/SingleElementPushBackIterator.java
index ef80c98..ea10b1d 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/SingleElementPushBackIterator.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/SingleElementPushBackIterator.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.python.api.streaming.data;
import org.apache.flink.util.Preconditions;
http://git-wip-us.apache.org/repos/asf/flink/blob/04fae536/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanReceiver.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanReceiver.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanReceiver.java
index 6276302..a90f581 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanReceiver.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanReceiver.java
@@ -10,12 +10,17 @@
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
+
package org.apache.flink.python.api.streaming.plan;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.python.api.types.CustomTypeWrapper;
+
import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
-import org.apache.flink.api.java.tuple.Tuple;
+
import static org.apache.flink.python.api.streaming.data.PythonReceiver.createTuple;
import static org.apache.flink.python.api.streaming.util.SerializationUtils.TYPE_BOOLEAN;
import static org.apache.flink.python.api.streaming.util.SerializationUtils.TYPE_BYTE;
@@ -27,9 +32,6 @@ import static org.apache.flink.python.api.streaming.util.SerializationUtils.TYPE
import static org.apache.flink.python.api.streaming.util.SerializationUtils.TYPE_NULL;
import static org.apache.flink.python.api.streaming.util.SerializationUtils.TYPE_STRING;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.python.api.types.CustomTypeWrapper;
-
/**
* Instances of this class can be used to receive data from the plan process.
*/
@@ -82,7 +84,7 @@ public class PythonPlanReceiver {
}
private abstract static class Deserializer<T> {
-
+
public T deserialize() throws IOException {
return deserialize(false);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/04fae536/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanSender.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanSender.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanSender.java
index 331c67e..db78b8c 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanSender.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanSender.java
@@ -10,12 +10,14 @@
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
+
package org.apache.flink.python.api.streaming.plan;
+import org.apache.flink.python.api.streaming.util.SerializationUtils;
+
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
-import org.apache.flink.python.api.streaming.util.SerializationUtils;
/**
* Instances of this class can be used to send data to the plan process.
http://git-wip-us.apache.org/repos/asf/flink/blob/04fae536/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java
index 9e93dda..d25f3d5 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java
@@ -10,12 +10,14 @@
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
+
package org.apache.flink.python.api.streaming.plan;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.python.api.PythonOptions;
import org.apache.flink.python.api.streaming.util.StreamPrinter;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,7 +42,7 @@ public class PythonPlanStreamer {
private Process process;
private ServerSocket server;
private Socket socket;
-
+
public PythonPlanStreamer(Configuration config) {
this.config = config;
}
@@ -90,7 +92,7 @@ public class PythonPlanStreamer {
return false;
}
while (true) {
- try {
+ try {
socket = server.accept();
sender = new PythonPlanSender(socket.getOutputStream());
receiver = new PythonPlanReceiver(socket.getInputStream());
@@ -107,7 +109,7 @@ public class PythonPlanStreamer {
}
}
}
-
+
public void finishPlanMode() {
try {
socket.close();
@@ -143,7 +145,7 @@ public class PythonPlanStreamer {
return ProcessState.RUNNING;
}
}
-
+
private enum ProcessState {
RUNNING,
FAILED,
http://git-wip-us.apache.org/repos/asf/flink/blob/04fae536/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/util/SerializationUtils.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/util/SerializationUtils.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/util/SerializationUtils.java
index 721746a..1fe3ba1 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/util/SerializationUtils.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/util/SerializationUtils.java
@@ -10,6 +10,7 @@
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
+
package org.apache.flink.python.api.streaming.util;
import org.apache.flink.api.java.tuple.Tuple;
@@ -19,6 +20,9 @@ import org.apache.flink.python.api.types.CustomTypeWrapper;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+/**
+ * Utility class containing serializers for all supported types.
+ */
public class SerializationUtils {
public static final byte TYPE_BOOLEAN = 34;
public static final byte TYPE_BYTE = 33;
@@ -85,6 +89,14 @@ public class SerializationUtils {
return (Serializer<IN>) serializer;
}
+ /**
+ * Super class for all serializers used to serialize data. These serializers are used to serialize values emitted
+ * from java input formats.
+ *
+ * <p>These serializer smut be kept in sync with the python counterparts.
+ *
+ * @param <IN> input type
+ */
public abstract static class Serializer<IN> {
private byte[] typeInfo = null;
@@ -109,6 +121,9 @@ public class SerializationUtils {
}
}
+ /**
+ * A serializer for {@link CustomTypeWrapper CustomTypeWrappers}.
+ */
public static class CustomTypeWrapperSerializer extends Serializer<CustomTypeWrapper> {
private final byte type;
@@ -129,6 +144,9 @@ public class SerializationUtils {
}
}
+ /**
+ * A serializer for bytes.
+ */
public static class ByteSerializer extends Serializer<Byte> {
@Override
public byte[] serializeWithoutTypeInfo(Byte value) {
@@ -141,6 +159,9 @@ public class SerializationUtils {
}
}
+ /**
+ * A serializer for booleans.
+ */
public static class BooleanSerializer extends Serializer<Boolean> {
@Override
public byte[] serializeWithoutTypeInfo(Boolean value) {
@@ -153,6 +174,9 @@ public class SerializationUtils {
}
}
+ /**
+ * A serializer for ints.
+ */
public static class IntSerializer extends Serializer<Integer> {
@Override
public byte[] serializeWithoutTypeInfo(Integer value) {
@@ -167,6 +191,9 @@ public class SerializationUtils {
}
}
+ /**
+ * A serializer for longs.
+ */
public static class LongSerializer extends Serializer<Long> {
@Override
public byte[] serializeWithoutTypeInfo(Long value) {
@@ -181,6 +208,9 @@ public class SerializationUtils {
}
}
+ /**
+ * A serializer for strings.
+ */
public static class StringSerializer extends Serializer<String> {
@Override
public byte[] serializeWithoutTypeInfo(String value) {
@@ -196,6 +226,9 @@ public class SerializationUtils {
}
}
+ /**
+ * A serializer for floats.
+ */
public static class FloatSerializer extends Serializer<Float> {
@Override
public byte[] serializeWithoutTypeInfo(Float value) {
@@ -210,6 +243,9 @@ public class SerializationUtils {
}
}
+ /**
+ * A serializer for doubles.
+ */
public static class DoubleSerializer extends Serializer<Double> {
@Override
public byte[] serializeWithoutTypeInfo(Double value) {
@@ -224,6 +260,9 @@ public class SerializationUtils {
}
}
+ /**
+ * A serializer for null.
+ */
public static class NullSerializer extends Serializer<Object> {
@Override
public byte[] serializeWithoutTypeInfo(Object value) {
@@ -236,6 +275,9 @@ public class SerializationUtils {
}
}
+ /**
+ * A serializer for byte arrays.
+ */
public static class BytesSerializer extends Serializer<byte[]> {
@Override
public byte[] serializeWithoutTypeInfo(byte[] value) {
@@ -250,6 +292,9 @@ public class SerializationUtils {
}
}
+ /**
+ * A serializer for tuples.
+ */
public static class TupleSerializer extends Serializer<Tuple> {
private final Serializer<Object>[] serializer;
http://git-wip-us.apache.org/repos/asf/flink/blob/04fae536/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/util/StreamPrinter.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/util/StreamPrinter.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/util/StreamPrinter.java
index c6a1ede..313b983 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/util/StreamPrinter.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/util/StreamPrinter.java
@@ -10,6 +10,7 @@
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
+
package org.apache.flink.python.api.streaming.util;
import org.apache.flink.configuration.ConfigConstants;
http://git-wip-us.apache.org/repos/asf/flink/blob/04fae536/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/types/CustomTypeWrapper.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/types/CustomTypeWrapper.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/types/CustomTypeWrapper.java
index e16c3eb..f5674d5 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/types/CustomTypeWrapper.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/types/CustomTypeWrapper.java
@@ -10,6 +10,7 @@
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
+
package org.apache.flink.python.api.types;
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/04fae536/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/util/SetCache.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/util/SetCache.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/util/SetCache.java
index 750ba63..48011fc 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/util/SetCache.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/util/SetCache.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.python.api.util;
import org.apache.flink.api.java.DataSet;
http://git-wip-us.apache.org/repos/asf/flink/blob/04fae536/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java b/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java
index 20f3503..701ac73 100644
--- a/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java
+++ b/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java
@@ -10,6 +10,7 @@
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
+
package org.apache.flink.python.api;
import org.apache.flink.configuration.Configuration;
@@ -23,8 +24,11 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+/**
+ * Tests for the PythonPlanBinder.
+ */
public class PythonPlanBinderTest extends JavaProgramTestBase {
-
+
@Override
protected boolean skipCollectionExecution() {
return true;
http://git-wip-us.apache.org/repos/asf/flink/blob/04fae536/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/streaming/data/SingleElementPushBackIteratorTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/streaming/data/SingleElementPushBackIteratorTest.java b/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/streaming/data/SingleElementPushBackIteratorTest.java
index 5e9eb42..a41aac3 100644
--- a/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/streaming/data/SingleElementPushBackIteratorTest.java
+++ b/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/streaming/data/SingleElementPushBackIteratorTest.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.python.api.streaming.data;
import org.junit.Assert;
@@ -24,6 +25,9 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+/**
+ * Tests for the SingleElementPushBackIterator.
+ */
public class SingleElementPushBackIteratorTest {
@Test
[5/8] flink git commit: [FLINK-6431] [metrics] Activate strict
checkstyle in flink-metrics
Posted by ch...@apache.org.
[FLINK-6431] [metrics] Activate strict checkstyle in flink-metrics
This closes #3968.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ce573c65
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ce573c65
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ce573c65
Branch: refs/heads/master
Commit: ce573c65e2573ae12c8f6a76cc580445886a0a74
Parents: 61914ab
Author: zentol <ch...@apache.org>
Authored: Tue May 2 17:31:27 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Wed May 24 16:46:03 2017 +0200
----------------------------------------------------------------------
.../apache/flink/metrics/CharacterFilter.java | 2 +-
.../org/apache/flink/metrics/Histogram.java | 2 +-
.../flink/metrics/HistogramStatistics.java | 6 +-
.../org/apache/flink/metrics/MeterView.java | 23 +-
.../org/apache/flink/metrics/MetricConfig.java | 1 +
.../org/apache/flink/metrics/MetricGroup.java | 20 +-
.../org/apache/flink/metrics/SimpleCounter.java | 3 +-
.../java/org/apache/flink/metrics/View.java | 3 +-
.../metrics/reporter/AbstractReporter.java | 1 +
.../flink/metrics/reporter/MetricReporter.java | 4 +-
.../apache/flink/metrics/util/TestMeter.java | 3 +
.../org/apache/flink/metrics/MeterViewTest.java | 4 +
.../apache/flink/metrics/datadog/DCounter.java | 8 +-
.../apache/flink/metrics/datadog/DGauge.java | 9 +-
.../apache/flink/metrics/datadog/DMeter.java | 6 +-
.../apache/flink/metrics/datadog/DMetric.java | 8 +-
.../apache/flink/metrics/datadog/DSeries.java | 8 +-
.../metrics/datadog/DatadogHttpClient.java | 8 +-
.../metrics/datadog/DatadogHttpReporter.java | 31 +-
.../flink/metrics/datadog/MetricType.java | 8 +-
.../metrics/datadog/DatadogHttpClientTest.java | 317 +++++++++----------
.../dropwizard/ScheduledDropwizardReporter.java | 11 +-
.../metrics/DropwizardHistogramStatistics.java | 3 +-
.../dropwizard/metrics/FlinkCounterWrapper.java | 4 +
.../dropwizard/metrics/FlinkGaugeWrapper.java | 5 +-
.../metrics/FlinkHistogramWrapper.java | 3 +-
.../dropwizard/metrics/FlinkMeterWrapper.java | 3 +-
.../metrics/HistogramStatisticsWrapper.java | 6 +-
.../ScheduledDropwizardReporterTest.java | 10 +-
.../DropwizardFlinkHistogramWrapperTest.java | 27 +-
.../metrics/DropwizardMeterWrapperTest.java | 3 +
.../metrics/FlinkMeterWrapperTest.java | 4 +
.../flink/metrics/ganglia/GangliaReporter.java | 15 +-
.../metrics/graphite/GraphiteReporter.java | 14 +-
.../apache/flink/metrics/jmx/JMXReporter.java | 46 ++-
.../flink/metrics/jmx/JMXReporterTest.java | 13 +-
.../jobmanager/JMXJobManagerMetricTest.java | 9 +
.../flink/metrics/statsd/StatsDReporter.java | 18 +-
.../metrics/statsd/StatsDReporterTest.java | 17 +-
flink-metrics/pom.xml | 39 +++
40 files changed, 418 insertions(+), 307 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/ce573c65/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/CharacterFilter.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/CharacterFilter.java b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/CharacterFilter.java
index 1e9fbc4..10cd9ce 100644
--- a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/CharacterFilter.java
+++ b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/CharacterFilter.java
@@ -27,7 +27,7 @@ public interface CharacterFilter {
/**
* Filter the given string and generate a resulting string from it.
*
- * For example, one implementation could filter out invalid characters from the input string.
+ * <p>For example, one implementation could filter out invalid characters from the input string.
*
* @param input Input string
* @return Filtered result string
http://git-wip-us.apache.org/repos/asf/flink/blob/ce573c65/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/Histogram.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/Histogram.java b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/Histogram.java
index af5c9b0..070a52a 100644
--- a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/Histogram.java
+++ b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/Histogram.java
@@ -21,7 +21,7 @@ package org.apache.flink.metrics;
/**
* Histogram interface to be used with Flink's metrics system.
*
- * The histogram allows to record values, get the current count of recorded values and create
+ * <p>The histogram allows to record values, get the current count of recorded values and create
* histogram statistics for the currently seen elements.
*/
public interface Histogram extends Metric {
http://git-wip-us.apache.org/repos/asf/flink/blob/ce573c65/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/HistogramStatistics.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/HistogramStatistics.java b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/HistogramStatistics.java
index b2e4507..7a4b0a9 100644
--- a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/HistogramStatistics.java
+++ b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/HistogramStatistics.java
@@ -21,7 +21,7 @@ package org.apache.flink.metrics;
/**
* Histogram statistics represent the current snapshot of elements recorded in the histogram.
*
- * The histogram statistics allow to calculate values for quantiles, the mean, the standard
+ * <p>The histogram statistics allow to calculate values for quantiles, the mean, the standard
* deviation, the minimum and the maximum.
*/
public abstract class HistogramStatistics {
@@ -35,14 +35,14 @@ public abstract class HistogramStatistics {
public abstract double getQuantile(double quantile);
/**
- * Returns the elements of the statistics' sample
+ * Returns the elements of the statistics' sample.
*
* @return Elements of the statistics' sample
*/
public abstract long[] getValues();
/**
- * Returns the size of the statistics' sample
+ * Returns the size of the statistics' sample.
*
* @return Size of the statistics' sample
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/ce573c65/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/MeterView.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/MeterView.java b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/MeterView.java
index b87b983..8df0e86 100644
--- a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/MeterView.java
+++ b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/MeterView.java
@@ -15,32 +15,33 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.metrics;
/**
* A MeterView provides an average rate of events per second over a given time period.
- *
- * The primary advantage of this class is that the rate is neither updated by the computing thread nor for every event.
+ *
+ * <p>The primary advantage of this class is that the rate is neither updated by the computing thread nor for every event.
* Instead, a history of counts is maintained that is updated in regular intervals by a background thread. From this
* history a rate is derived on demand, which represents the average rate of events over the given time span.
- *
- * Setting the time span to a low value reduces memory-consumption and will more accurately report short-term changes.
+ *
+ * <p>Setting the time span to a low value reduces memory-consumption and will more accurately report short-term changes.
* The minimum value possible is {@link View#UPDATE_INTERVAL_SECONDS}.
* A high value in turn increases memory-consumption, since a longer history has to be maintained, but will result in
* smoother transitions between rates.
- *
- * The events are counted by a {@link Counter}.
+ *
+ * <p>The events are counted by a {@link Counter}.
*/
public class MeterView implements Meter, View {
- /** The underlying counter maintaining the count */
+ /** The underlying counter maintaining the count. */
private final Counter counter;
- /** The time-span over which the average is calculated */
+ /** The time-span over which the average is calculated. */
private final int timeSpanInSeconds;
- /** Circular array containing the history of values */
+ /** Circular array containing the history of values. */
private final long[] values;
- /** The index in the array for the current time */
+ /** The index in the array for the current time. */
private int time = 0;
- /** The last rate we computed */
+ /** The last rate we computed. */
private double currentRate = 0;
public MeterView(int timeSpanInSeconds) {
http://git-wip-us.apache.org/repos/asf/flink/blob/ce573c65/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/MetricConfig.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/MetricConfig.java b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/MetricConfig.java
index 4a2e616..699afdf 100644
--- a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/MetricConfig.java
+++ b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/MetricConfig.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.metrics;
import java.util.Properties;
http://git-wip-us.apache.org/repos/asf/flink/blob/ce573c65/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/MetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/MetricGroup.java b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/MetricGroup.java
index d4221ef..39ab3b6 100644
--- a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/MetricGroup.java
+++ b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/MetricGroup.java
@@ -22,10 +22,10 @@ import java.util.Map;
/**
* A MetricGroup is a named container for {@link Metric Metrics} and further metric subgroups.
- *
+ *
* <p>Instances of this class can be used to register new metrics with Flink and to create a nested
* hierarchy based on the group names.
- *
+ *
* <p>A MetricGroup is uniquely identified by it's place in the hierarchy and name.
*/
public interface MetricGroup {
@@ -69,7 +69,7 @@ public interface MetricGroup {
* @return the given counter
*/
<C extends Counter> C counter(String name, C counter);
-
+
/**
* Registers a new {@link org.apache.flink.metrics.Gauge} with Flink.
*
@@ -95,7 +95,7 @@ public interface MetricGroup {
*
* @param name name of the histogram
* @param histogram histogram to register
- * @param <H> histogram type
+ * @param <H> histogram type
* @return the registered histogram
*/
<H extends Histogram> H histogram(String name, H histogram);
@@ -105,7 +105,7 @@ public interface MetricGroup {
*
* @param name name of the histogram
* @param histogram histogram to register
- * @param <H> histogram type
+ * @param <H> histogram type
* @return the registered histogram
*/
<H extends Histogram> H histogram(int name, H histogram);
@@ -156,7 +156,7 @@ public interface MetricGroup {
/**
* Gets the scope as an array of the scope components, for example
- * {@code ["host-7", "taskmanager-2", "window_word_count", "my-mapper"]}
+ * {@code ["host-7", "taskmanager-2", "window_word_count", "my-mapper"]}.
*
* @see #getMetricIdentifier(String)
* @see #getMetricIdentifier(String, CharacterFilter)
@@ -165,15 +165,15 @@ public interface MetricGroup {
/**
* Returns a map of all variables and their associated value, for example
- * {@code {"<host>"="host-7", "<tm_id>"="taskmanager-2"}}
- *
+ * {@code {"<host>"="host-7", "<tm_id>"="taskmanager-2"}}.
+ *
* @return map of all variables and their associated value
*/
Map<String, String> getAllVariables();
/**
* Returns the fully qualified metric name, for example
- * {@code "host-7.taskmanager-2.window_word_count.my-mapper.metricName"}
+ * {@code "host-7.taskmanager-2.window_word_count.my-mapper.metricName"}.
*
* @param metricName metric name
* @return fully qualified metric name
@@ -182,7 +182,7 @@ public interface MetricGroup {
/**
* Returns the fully qualified metric name, for example
- * {@code "host-7.taskmanager-2.window_word_count.my-mapper.metricName"}
+ * {@code "host-7.taskmanager-2.window_word_count.my-mapper.metricName"}.
*
* @param metricName metric name
* @param filter character filter which is applied to the scope components if not null.
http://git-wip-us.apache.org/repos/asf/flink/blob/ce573c65/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/SimpleCounter.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/SimpleCounter.java b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/SimpleCounter.java
index 6ec3b28..2efc0c9 100644
--- a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/SimpleCounter.java
+++ b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/SimpleCounter.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.metrics;
/**
@@ -22,7 +23,7 @@ package org.apache.flink.metrics;
*/
public class SimpleCounter implements Counter {
- /** the current count */
+ /** the current count. */
private long count;
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/ce573c65/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/View.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/View.java b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/View.java
index 1780130..2bbb3a9 100644
--- a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/View.java
+++ b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/View.java
@@ -15,13 +15,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.metrics;
/**
* An interface for metrics which should be updated in regular intervals by a background thread.
*/
public interface View {
- /** The interval in which metrics are updated */
+ /** The interval in which metrics are updated. */
int UPDATE_INTERVAL_SECONDS = 5;
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/ce573c65/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/AbstractReporter.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/AbstractReporter.java b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/AbstractReporter.java
index 0c8d9ad..c0aeb4b 100644
--- a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/AbstractReporter.java
+++ b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/AbstractReporter.java
@@ -25,6 +25,7 @@ import org.apache.flink.metrics.Histogram;
import org.apache.flink.metrics.Meter;
import org.apache.flink.metrics.Metric;
import org.apache.flink.metrics.MetricGroup;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/flink/blob/ce573c65/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/MetricReporter.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/MetricReporter.java b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/MetricReporter.java
index ee92a10..5c8085f 100644
--- a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/MetricReporter.java
+++ b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/MetricReporter.java
@@ -24,7 +24,7 @@ import org.apache.flink.metrics.MetricGroup;
/**
* Reporters are used to export {@link Metric Metrics} to an external backend.
- *
+ *
* <p>Reporters are instantiated via reflection and must be public, non-abstract, and have a
* public no-argument constructor.
*/
@@ -37,7 +37,7 @@ public interface MetricReporter {
/**
* Configures this reporter. Since reporters are instantiated generically and hence parameter-less,
* this method is the place where the reporters set their basic fields based on configuration values.
- *
+ *
* <p>This method is always called first on a newly instantiated reporter.
*
* @param config A properties object that contains all parameters set for this reporter.
http://git-wip-us.apache.org/repos/asf/flink/blob/ce573c65/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/util/TestMeter.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/util/TestMeter.java b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/util/TestMeter.java
index b44b996..b1ec3a3 100644
--- a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/util/TestMeter.java
+++ b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/util/TestMeter.java
@@ -20,6 +20,9 @@ package org.apache.flink.metrics.util;
import org.apache.flink.metrics.Meter;
+/**
+ * A dummy {@link Meter} implementation.
+ */
public class TestMeter implements Meter {
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/ce573c65/flink-metrics/flink-metrics-core/src/test/java/org/apache/flink/metrics/MeterViewTest.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-core/src/test/java/org/apache/flink/metrics/MeterViewTest.java b/flink-metrics/flink-metrics-core/src/test/java/org/apache/flink/metrics/MeterViewTest.java
index 8ba298f..a7a63b0 100644
--- a/flink-metrics/flink-metrics-core/src/test/java/org/apache/flink/metrics/MeterViewTest.java
+++ b/flink-metrics/flink-metrics-core/src/test/java/org/apache/flink/metrics/MeterViewTest.java
@@ -15,12 +15,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.metrics;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
+/**
+ * Tests for the MeterView.
+ */
public class MeterViewTest {
@Test
public void testGetCount() {
http://git-wip-us.apache.org/repos/asf/flink/blob/ce573c65/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DCounter.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DCounter.java b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DCounter.java
index 58abbd6..e187efb 100644
--- a/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DCounter.java
+++ b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DCounter.java
@@ -23,8 +23,8 @@ import org.apache.flink.metrics.Counter;
import java.util.List;
/**
- * Mapping of counter between Flink and Datadog
- * */
+ * Mapping of counter between Flink and Datadog.
+ */
public class DCounter extends DMetric {
private final Counter counter;
@@ -35,8 +35,8 @@ public class DCounter extends DMetric {
/**
* Visibility of this method must not be changed
- * since we deliberately not map it to json object in a Datadog-defined format
- * */
+ * since we deliberately not map it to json object in a Datadog-defined format.
+ */
@Override
public Number getMetricValue() {
return counter.getCount();
http://git-wip-us.apache.org/repos/asf/flink/blob/ce573c65/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DGauge.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DGauge.java b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DGauge.java
index 8deb117..ba97d59 100644
--- a/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DGauge.java
+++ b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DGauge.java
@@ -18,14 +18,13 @@
package org.apache.flink.metrics.datadog;
-
import org.apache.flink.metrics.Gauge;
import java.util.List;
/**
- * Mapping of gauge between Flink and Datadog
- * */
+ * Mapping of gauge between Flink and Datadog.
+ */
public class DGauge extends DMetric {
private final Gauge<Number> gauge;
@@ -36,8 +35,8 @@ public class DGauge extends DMetric {
/**
* Visibility of this method must not be changed
- * since we deliberately not map it to json object in a Datadog-defined format
- * */
+ * since we deliberately not map it to json object in a Datadog-defined format.
+ */
@Override
public Number getMetricValue() {
return gauge.getValue();
http://git-wip-us.apache.org/repos/asf/flink/blob/ce573c65/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DMeter.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DMeter.java b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DMeter.java
index 181a00c..68c61cf 100644
--- a/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DMeter.java
+++ b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DMeter.java
@@ -23,10 +23,10 @@ import org.apache.flink.metrics.Meter;
import java.util.List;
/**
- * Mapping of meter between Flink and Datadog
+ * Mapping of meter between Flink and Datadog.
*
- * Only consider rate of the meter, due to Datadog HTTP API's limited support of meter
- * */
+ * <p>Only consider rate of the meter, due to Datadog HTTP API's limited support of meter
+ */
public class DMeter extends DMetric {
private final Meter meter;
http://git-wip-us.apache.org/repos/asf/flink/blob/ce573c65/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DMetric.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DMetric.java b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DMetric.java
index 3f9d6ff..e55a9f0 100644
--- a/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DMetric.java
+++ b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DMetric.java
@@ -25,16 +25,16 @@ import java.util.ArrayList;
import java.util.List;
/**
- * Abstract metric of Datadog for serialization
- * */
+ * Abstract metric of Datadog for serialization.
+ */
@JsonInclude(JsonInclude.Include.NON_NULL)
public abstract class DMetric {
private static final long MILLIS_TO_SEC = 1000L;
/**
* Names of metric/type/tags field and their getters must not be changed
- * since they are mapped to json objects in a Datadog-defined format
- * */
+ * since they are mapped to json objects in a Datadog-defined format.
+ */
private final String metric; // Metric name
private final MetricType type;
private final String host;
http://git-wip-us.apache.org/repos/asf/flink/blob/ce573c65/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DSeries.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DSeries.java b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DSeries.java
index fb0bb09..54d907c 100644
--- a/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DSeries.java
+++ b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DSeries.java
@@ -22,13 +22,13 @@ import java.util.ArrayList;
import java.util.List;
/**
- * Json serialization between Flink and Datadog
- **/
+ * Json serialization between Flink and Datadog.
+ */
public class DSeries {
/**
* Names of series field and its getters must not be changed
- * since they are mapped to json objects in a Datadog-defined format
- * */
+ * since they are mapped to json objects in a Datadog-defined format.
+ */
private List<DMetric> series;
public DSeries() {
http://git-wip-us.apache.org/repos/asf/flink/blob/ce573c65/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpClient.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpClient.java b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpClient.java
index c94a2b7..086966b 100644
--- a/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpClient.java
+++ b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpClient.java
@@ -23,15 +23,15 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import okhttp3.MediaType;
import okhttp3.OkHttpClient;
import okhttp3.Request;
-import okhttp3.Response;
import okhttp3.RequestBody;
+import okhttp3.Response;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
/**
- * Http client talking to Datadog
- * */
+ * Http client talking to Datadog.
+ */
public class DatadogHttpClient{
private static final String SERIES_URL_FORMAT = "https://app.datadoghq.com/api/v1/series?api_key=%s";
private static final String VALIDATE_URL_FORMAT = "https://app.datadoghq.com/api/v1/validate?api_key=%s";
@@ -69,7 +69,7 @@ public class DatadogHttpClient{
throw new IllegalArgumentException(
String.format("API key: %s is invalid", apiKey));
}
- } catch(IOException e) {
+ } catch (IOException e) {
throw new IllegalStateException("Failed contacting Datadog to validate API key", e);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ce573c65/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpReporter.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpReporter.java b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpReporter.java
index fcb5c4b..a47b2bf 100644
--- a/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpReporter.java
+++ b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpReporter.java
@@ -20,13 +20,14 @@ package org.apache.flink.metrics.datadog;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
-import org.apache.flink.metrics.Meter;
import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.Meter;
import org.apache.flink.metrics.Metric;
import org.apache.flink.metrics.MetricConfig;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.reporter.MetricReporter;
import org.apache.flink.metrics.reporter.Scheduled;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,12 +37,11 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-
/**
- * Metric Reporter for Datadog
+ * Metric Reporter for Datadog.
*
- * Variables in metrics scope will be sent to Datadog as tags
- * */
+ * <p>Variables in metrics scope will be sent to Datadog as tags.
+ */
public class DatadogHttpReporter implements MetricReporter, Scheduled {
private static final Logger LOGGER = LoggerFactory.getLogger(DatadogHttpReporter.class);
private static final String HOST_VARIABLE = "<host>";
@@ -146,20 +146,20 @@ public class DatadogHttpReporter implements MetricReporter, Scheduled {
}
/**
- * Get config tags from config 'metrics.reporter.dghttp.tags'
- * */
+ * Get config tags from config 'metrics.reporter.dghttp.tags'.
+ */
private List<String> getTagsFromConfig(String str) {
return Arrays.asList(str.split(","));
}
/**
- * Get tags from MetricGroup#getAllVariables(), excluding 'host'
- * */
+ * Get tags from MetricGroup#getAllVariables(), excluding 'host'.
+ */
private List<String> getTagsFromMetricGroup(MetricGroup metricGroup) {
List<String> tags = new ArrayList<>();
for (Map.Entry<String, String> entry: metricGroup.getAllVariables().entrySet()) {
- if(!entry.getKey().equals(HOST_VARIABLE)) {
+ if (!entry.getKey().equals(HOST_VARIABLE)) {
tags.add(getVariableName(entry.getKey()) + ":" + entry.getValue());
}
}
@@ -167,23 +167,20 @@ public class DatadogHttpReporter implements MetricReporter, Scheduled {
return tags;
}
- /**
- * Get host from MetricGroup#getAllVariables() if it exists; returns Null otherwise
- * */
private String getHostFromMetricGroup(MetricGroup metricGroup) {
return metricGroup.getAllVariables().get(HOST_VARIABLE);
}
/**
- * Given "<xxx>", return "xxx"
- * */
+ * Removes leading and trailing angle brackets.
+ */
private String getVariableName(String str) {
return str.substring(1, str.length() - 1);
}
/**
- * Compact metrics in batch, serialize them, and send to Datadog via HTTP
- * */
+ * Compact metrics in batch, serialize them, and send to Datadog via HTTP.
+ */
static class DatadogHttpRequest {
private final DSeries series;
http://git-wip-us.apache.org/repos/asf/flink/blob/ce573c65/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/MetricType.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/MetricType.java b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/MetricType.java
index 97f9b29..9681514 100644
--- a/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/MetricType.java
+++ b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/MetricType.java
@@ -19,12 +19,12 @@
package org.apache.flink.metrics.datadog;
/**
- * Metric types supported by Datadog
- * */
+ * Metric types supported by Datadog.
+ */
public enum MetricType {
/**
* Names of 'gauge' and 'counter' must not be changed
- * since they are mapped to json objects in a Datadog-defined format
- * */
+ * since they are mapped to json objects in a Datadog-defined format.
+ */
gauge, counter
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ce573c65/flink-metrics/flink-metrics-datadog/src/test/java/org/apache/flink/metrics/datadog/DatadogHttpClientTest.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-datadog/src/test/java/org/apache/flink/metrics/datadog/DatadogHttpClientTest.java b/flink-metrics/flink-metrics-datadog/src/test/java/org/apache/flink/metrics/datadog/DatadogHttpClientTest.java
index bda5d47..0680252 100644
--- a/flink-metrics/flink-metrics-datadog/src/test/java/org/apache/flink/metrics/datadog/DatadogHttpClientTest.java
+++ b/flink-metrics/flink-metrics-datadog/src/test/java/org/apache/flink/metrics/datadog/DatadogHttpClientTest.java
@@ -18,13 +18,13 @@
package org.apache.flink.metrics.datadog;
-import com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Meter;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
import org.junit.Before;
import org.junit.Test;
-import org.junit.experimental.runners.Enclosed;
import org.junit.runner.RunWith;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
@@ -35,165 +35,164 @@ import java.util.List;
import static org.junit.Assert.assertEquals;
-@RunWith(Enclosed.class)
+/**
+ * Tests for the DatadogHttpClient.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(DMetric.class)
public class DatadogHttpClientTest {
- public static class TestApiKey {
- @Test(expected = IllegalArgumentException.class)
- public void testClientWithEmptyKey() {
- new DatadogHttpClient("");
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testClientWithNullKey() {
- new DatadogHttpClient(null);
- }
+
+ private static List<String> tags = Arrays.asList("tag1", "tag2");
+
+ private static final long MOCKED_SYSTEM_MILLIS = 123L;
+
+ @Before
+ public void mockSystemMillis() {
+ PowerMockito.mockStatic(DMetric.class);
+ PowerMockito.when(DMetric.getUnixEpochTimestamp()).thenReturn(MOCKED_SYSTEM_MILLIS);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testClientWithEmptyKey() {
+ new DatadogHttpClient("");
}
- @RunWith(PowerMockRunner.class)
- @PrepareForTest(DMetric.class)
- public static class TestSerialization {
- private static List<String> tags = Arrays.asList("tag1", "tag2");
-
- private static final long MOCKED_SYSTEM_MILLIS = 123L;
-
- @Before
- public void mockSystemMillis() {
- PowerMockito.mockStatic(DMetric.class);
- PowerMockito.when(DMetric.getUnixEpochTimestamp()).thenReturn(MOCKED_SYSTEM_MILLIS);
- }
-
- @Test
- public void serializeGauge() throws JsonProcessingException {
-
- DGauge g = new DGauge(new Gauge<Number>() {
- @Override
- public Number getValue() {
- return 1;
- }
- }, "testCounter", "localhost", tags);
-
- assertEquals(
- "{\"metric\":\"testCounter\",\"type\":\"gauge\",\"host\":\"localhost\",\"tags\":[\"tag1\",\"tag2\"],\"points\":[[123,1]]}",
- DatadogHttpClient.serialize(g));
- }
-
- @Test
- public void serializeGaugeWithoutHost() throws JsonProcessingException {
-
- DGauge g = new DGauge(new Gauge<Number>() {
- @Override
- public Number getValue() {
- return 1;
- }
- }, "testCounter", null, tags);
-
- assertEquals(
- "{\"metric\":\"testCounter\",\"type\":\"gauge\",\"tags\":[\"tag1\",\"tag2\"],\"points\":[[123,1]]}",
- DatadogHttpClient.serialize(g));
- }
-
- @Test
- public void serializeCounter() throws JsonProcessingException {
- DCounter c = new DCounter(new Counter() {
- @Override
- public void inc() {}
-
- @Override
- public void inc(long n) {}
-
- @Override
- public void dec() {}
-
- @Override
- public void dec(long n) {}
-
- @Override
- public long getCount() {
- return 1;
- }
- }, "testCounter", "localhost", tags);
-
- assertEquals(
- "{\"metric\":\"testCounter\",\"type\":\"counter\",\"host\":\"localhost\",\"tags\":[\"tag1\",\"tag2\"],\"points\":[[123,1]]}",
- DatadogHttpClient.serialize(c));
- }
-
- @Test
- public void serializeCounterWithoutHost() throws JsonProcessingException {
- DCounter c = new DCounter(new Counter() {
- @Override
- public void inc() {}
-
- @Override
- public void inc(long n) {}
-
- @Override
- public void dec() {}
-
- @Override
- public void dec(long n) {}
-
- @Override
- public long getCount() {
- return 1;
- }
- }, "testCounter", null, tags);
-
- assertEquals(
- "{\"metric\":\"testCounter\",\"type\":\"counter\",\"tags\":[\"tag1\",\"tag2\"],\"points\":[[123,1]]}",
- DatadogHttpClient.serialize(c));
- }
-
- @Test
- public void serializeMeter() throws JsonProcessingException {
-
- DMeter m = new DMeter(new Meter() {
- @Override
- public void markEvent() {}
-
- @Override
- public void markEvent(long n) {}
-
- @Override
- public double getRate() {
- return 1;
- }
-
- @Override
- public long getCount() {
- return 0;
- }
- }, "testMeter","localhost", tags);
-
- assertEquals(
- "{\"metric\":\"testMeter\",\"type\":\"gauge\",\"host\":\"localhost\",\"tags\":[\"tag1\",\"tag2\"],\"points\":[[123,1.0]]}",
- DatadogHttpClient.serialize(m));
- }
-
- @Test
- public void serializeMeterWithoutHost() throws JsonProcessingException {
-
- DMeter m = new DMeter(new Meter() {
- @Override
- public void markEvent() {}
-
- @Override
- public void markEvent(long n) {}
-
- @Override
- public double getRate() {
- return 1;
- }
-
- @Override
- public long getCount() {
- return 0;
- }
- }, "testMeter", null, tags);
-
- assertEquals(
- "{\"metric\":\"testMeter\",\"type\":\"gauge\",\"tags\":[\"tag1\",\"tag2\"],\"points\":[[123,1.0]]}",
- DatadogHttpClient.serialize(m));
- }
+ @Test(expected = IllegalArgumentException.class)
+ public void testClientWithNullKey() {
+ new DatadogHttpClient(null);
+ }
+
+ @Test
+ public void serializeGauge() throws JsonProcessingException {
+
+ DGauge g = new DGauge(new Gauge<Number>() {
+ @Override
+ public Number getValue() {
+ return 1;
+ }
+ }, "testCounter", "localhost", tags);
+
+ assertEquals(
+ "{\"metric\":\"testCounter\",\"type\":\"gauge\",\"host\":\"localhost\",\"tags\":[\"tag1\",\"tag2\"],\"points\":[[123,1]]}",
+ DatadogHttpClient.serialize(g));
+ }
+
+ @Test
+ public void serializeGaugeWithoutHost() throws JsonProcessingException {
+
+ DGauge g = new DGauge(new Gauge<Number>() {
+ @Override
+ public Number getValue() {
+ return 1;
+ }
+ }, "testCounter", null, tags);
+
+ assertEquals(
+ "{\"metric\":\"testCounter\",\"type\":\"gauge\",\"tags\":[\"tag1\",\"tag2\"],\"points\":[[123,1]]}",
+ DatadogHttpClient.serialize(g));
+ }
+
+ @Test
+ public void serializeCounter() throws JsonProcessingException {
+ DCounter c = new DCounter(new Counter() {
+ @Override
+ public void inc() {}
+
+ @Override
+ public void inc(long n) {}
+
+ @Override
+ public void dec() {}
+
+ @Override
+ public void dec(long n) {}
+
+ @Override
+ public long getCount() {
+ return 1;
+ }
+ }, "testCounter", "localhost", tags);
+
+ assertEquals(
+ "{\"metric\":\"testCounter\",\"type\":\"counter\",\"host\":\"localhost\",\"tags\":[\"tag1\",\"tag2\"],\"points\":[[123,1]]}",
+ DatadogHttpClient.serialize(c));
+ }
+
+ @Test
+ public void serializeCounterWithoutHost() throws JsonProcessingException {
+ DCounter c = new DCounter(new Counter() {
+ @Override
+ public void inc() {}
+
+ @Override
+ public void inc(long n) {}
+
+ @Override
+ public void dec() {}
+
+ @Override
+ public void dec(long n) {}
+
+ @Override
+ public long getCount() {
+ return 1;
+ }
+ }, "testCounter", null, tags);
+
+ assertEquals(
+ "{\"metric\":\"testCounter\",\"type\":\"counter\",\"tags\":[\"tag1\",\"tag2\"],\"points\":[[123,1]]}",
+ DatadogHttpClient.serialize(c));
+ }
+
+ @Test
+ public void serializeMeter() throws JsonProcessingException {
+
+ DMeter m = new DMeter(new Meter() {
+ @Override
+ public void markEvent() {}
+
+ @Override
+ public void markEvent(long n) {}
+
+ @Override
+ public double getRate() {
+ return 1;
+ }
+
+ @Override
+ public long getCount() {
+ return 0;
+ }
+ }, "testMeter", "localhost", tags);
+
+ assertEquals(
+ "{\"metric\":\"testMeter\",\"type\":\"gauge\",\"host\":\"localhost\",\"tags\":[\"tag1\",\"tag2\"],\"points\":[[123,1.0]]}",
+ DatadogHttpClient.serialize(m));
+ }
+
+ @Test
+ public void serializeMeterWithoutHost() throws JsonProcessingException {
+
+ DMeter m = new DMeter(new Meter() {
+ @Override
+ public void markEvent() {}
+
+ @Override
+ public void markEvent(long n) {}
+
+ @Override
+ public double getRate() {
+ return 1;
+ }
+
+ @Override
+ public long getCount() {
+ return 0;
+ }
+ }, "testMeter", null, tags);
+
+ assertEquals(
+ "{\"metric\":\"testMeter\",\"type\":\"gauge\",\"tags\":[\"tag1\",\"tag2\"],\"points\":[[123,1.0]]}",
+ DatadogHttpClient.serialize(m));
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ce573c65/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java
index 380abc4..e3184da 100644
--- a/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java
+++ b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java
@@ -18,9 +18,6 @@
package org.apache.flink.dropwizard;
-import com.codahale.metrics.MetricRegistry;
-import com.codahale.metrics.Reporter;
-import com.codahale.metrics.ScheduledReporter;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.dropwizard.metrics.DropwizardHistogramWrapper;
@@ -39,6 +36,10 @@ import org.apache.flink.metrics.MetricConfig;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.reporter.MetricReporter;
import org.apache.flink.metrics.reporter.Scheduled;
+
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Reporter;
+import com.codahale.metrics.ScheduledReporter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -161,7 +162,7 @@ public abstract class ScheduledDropwizardReporter implements MetricReporter, Sch
public void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group) {
synchronized (this) {
String fullName;
-
+
if (metric instanceof Counter) {
fullName = counters.remove(metric);
} else if (metric instanceof Gauge) {
@@ -173,7 +174,7 @@ public abstract class ScheduledDropwizardReporter implements MetricReporter, Sch
} else {
fullName = null;
}
-
+
if (fullName != null) {
registry.remove(fullName);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ce573c65/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/DropwizardHistogramStatistics.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/DropwizardHistogramStatistics.java b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/DropwizardHistogramStatistics.java
index 6f4eab2..8acf401 100644
--- a/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/DropwizardHistogramStatistics.java
+++ b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/DropwizardHistogramStatistics.java
@@ -18,9 +18,10 @@
package org.apache.flink.dropwizard.metrics;
-import com.codahale.metrics.Snapshot;
import org.apache.flink.metrics.HistogramStatistics;
+import com.codahale.metrics.Snapshot;
+
/**
* Dropwizard histogram statistics implementation returned by {@link DropwizardHistogramWrapper}.
* The statistics class wraps a {@link Snapshot} instance and forwards the method calls accordingly.
http://git-wip-us.apache.org/repos/asf/flink/blob/ce573c65/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/FlinkCounterWrapper.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/FlinkCounterWrapper.java b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/FlinkCounterWrapper.java
index a44c3f5..b4ea8dc 100644
--- a/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/FlinkCounterWrapper.java
+++ b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/FlinkCounterWrapper.java
@@ -15,10 +15,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.dropwizard.metrics;
import org.apache.flink.metrics.Counter;
+/**
+ * A wrapper that allows a Flink counter to be used as a DropWizard counter.
+ */
public class FlinkCounterWrapper extends com.codahale.metrics.Counter {
private final Counter counter;
http://git-wip-us.apache.org/repos/asf/flink/blob/ce573c65/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/FlinkGaugeWrapper.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/FlinkGaugeWrapper.java b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/FlinkGaugeWrapper.java
index 058ecad..0d53a9e 100644
--- a/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/FlinkGaugeWrapper.java
+++ b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/FlinkGaugeWrapper.java
@@ -20,8 +20,11 @@ package org.apache.flink.dropwizard.metrics;
import org.apache.flink.metrics.Gauge;
+/**
+ * A wrapper that allows a Flink gauge to be used as a DropWizard gauge.
+ */
public class FlinkGaugeWrapper<T> implements com.codahale.metrics.Gauge<T> {
-
+
private final Gauge<T> gauge;
public FlinkGaugeWrapper(Gauge<T> gauge) {
http://git-wip-us.apache.org/repos/asf/flink/blob/ce573c65/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/FlinkHistogramWrapper.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/FlinkHistogramWrapper.java b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/FlinkHistogramWrapper.java
index 8bd8078..d2167d0 100644
--- a/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/FlinkHistogramWrapper.java
+++ b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/FlinkHistogramWrapper.java
@@ -18,9 +18,10 @@
package org.apache.flink.dropwizard.metrics;
-import com.codahale.metrics.Snapshot;
import org.apache.flink.metrics.Histogram;
+import com.codahale.metrics.Snapshot;
+
/**
* Wrapper to use a Flink {@link Histogram} as a Dropwizard {@link com.codahale.metrics.Histogram}.
* This is necessary to report Flink's histograms via the Dropwizard
http://git-wip-us.apache.org/repos/asf/flink/blob/ce573c65/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/FlinkMeterWrapper.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/FlinkMeterWrapper.java b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/FlinkMeterWrapper.java
index d0b8483..213c21c 100644
--- a/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/FlinkMeterWrapper.java
+++ b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/FlinkMeterWrapper.java
@@ -18,9 +18,10 @@
package org.apache.flink.dropwizard.metrics;
-import com.codahale.metrics.Clock;
import org.apache.flink.metrics.Meter;
+import com.codahale.metrics.Clock;
+
/**
* Wrapper to use a Flink {@link Meter} as a Dropwizard {@link com.codahale.metrics.Meter}.
* This is necessary to report Flink's meters via the Dropwizard
http://git-wip-us.apache.org/repos/asf/flink/blob/ce573c65/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/HistogramStatisticsWrapper.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/HistogramStatisticsWrapper.java b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/HistogramStatisticsWrapper.java
index 6d3a69b..7b1ff74 100644
--- a/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/HistogramStatisticsWrapper.java
+++ b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/HistogramStatisticsWrapper.java
@@ -18,9 +18,10 @@
package org.apache.flink.dropwizard.metrics;
-import com.codahale.metrics.Snapshot;
import org.apache.flink.metrics.HistogramStatistics;
+import com.codahale.metrics.Snapshot;
+
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
@@ -39,6 +40,7 @@ class HistogramStatisticsWrapper extends Snapshot {
HistogramStatisticsWrapper(HistogramStatistics histogramStatistics) {
this.histogramStatistics = histogramStatistics;
}
+
@Override
public double getValue(double quantile) {
return histogramStatistics.getQuantile(quantile);
@@ -76,7 +78,7 @@ class HistogramStatisticsWrapper extends Snapshot {
@Override
public void dump(OutputStream output) {
- try(PrintWriter printWriter = new PrintWriter(new OutputStreamWriter(output, UTF_8))){
+ try (PrintWriter printWriter = new PrintWriter(new OutputStreamWriter(output, UTF_8))) {
for (Long value : histogramStatistics.getValues()) {
printWriter.printf("%d%n", value);
http://git-wip-us.apache.org/repos/asf/flink/blob/ce573c65/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java
index 73e7f0b..85769c2 100644
--- a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java
+++ b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java
@@ -18,7 +18,6 @@
package org.apache.flink.dropwizard;
-import com.codahale.metrics.ScheduledReporter;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
@@ -40,6 +39,8 @@ import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.util.AbstractID;
+
+import com.codahale.metrics.ScheduledReporter;
import org.junit.Test;
import java.lang.reflect.InvocationTargetException;
@@ -49,6 +50,9 @@ import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+/**
+ * Tests for the ScheduledDropwizardReporter.
+ */
public class ScheduledDropwizardReporterTest {
@Test
@@ -199,7 +203,6 @@ public class ScheduledDropwizardReporterTest {
assertEquals(1, rep.getGauges().size());
assertEquals(1, rep.registry.getGauges().size());
-
rep.notifyOfRemovedMetric(c, "counter", mp);
assertEquals(0, rep.getCounters().size());
assertEquals(0, rep.registry.getCounters().size());
@@ -217,6 +220,9 @@ public class ScheduledDropwizardReporterTest {
assertEquals(0, rep.registry.getGauges().size());
}
+ /**
+ * Dummy test reporter.
+ */
public static class TestingScheduledDropwizardReporter extends ScheduledDropwizardReporter {
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/ce573c65/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java
index 8f7796c..63765ae 100644
--- a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java
+++ b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java
@@ -18,14 +18,6 @@
package org.apache.flink.dropwizard.metrics;
-import com.codahale.metrics.Counter;
-import com.codahale.metrics.Gauge;
-import com.codahale.metrics.Meter;
-import com.codahale.metrics.MetricFilter;
-import com.codahale.metrics.ScheduledReporter;
-import com.codahale.metrics.SlidingWindowReservoir;
-import com.codahale.metrics.Snapshot;
-import com.codahale.metrics.Timer;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MetricOptions;
@@ -36,6 +28,15 @@ import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
import org.apache.flink.util.TestLogger;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricFilter;
+import com.codahale.metrics.ScheduledReporter;
+import com.codahale.metrics.SlidingWindowReservoir;
+import com.codahale.metrics.Snapshot;
+import com.codahale.metrics.Timer;
import org.junit.Test;
import java.util.ArrayList;
@@ -52,6 +53,9 @@ import java.util.concurrent.TimeoutException;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+/**
+ * Tests for the DropwizardFlinkHistogramWrapper.
+ */
public class DropwizardFlinkHistogramWrapperTest extends TestLogger {
/**
@@ -72,7 +76,7 @@ public class DropwizardFlinkHistogramWrapperTest extends TestLogger {
}
assertEquals(size, histogramWrapper.getStatistics().size());
- assertEquals((size - 1)/2.0, histogramWrapper.getStatistics().getQuantile(0.5), 0.001);
+ assertEquals((size - 1) / 2.0, histogramWrapper.getStatistics().getQuantile(0.5), 0.001);
for (int i = size; i < 2 * size; i++) {
histogramWrapper.update(i);
@@ -83,7 +87,7 @@ public class DropwizardFlinkHistogramWrapperTest extends TestLogger {
}
assertEquals(size, histogramWrapper.getStatistics().size());
- assertEquals(size + (size - 1)/2.0, histogramWrapper.getStatistics().getQuantile(0.5), 0.001);
+ assertEquals(size + (size - 1) / 2.0, histogramWrapper.getStatistics().getQuantile(0.5), 0.001);
}
/**
@@ -153,6 +157,9 @@ public class DropwizardFlinkHistogramWrapperTest extends TestLogger {
}
}
+ /**
+ * Test reporter.
+ */
public static class TestingReporter extends ScheduledDropwizardReporter {
TestingScheduledReporter scheduledReporter = null;
http://git-wip-us.apache.org/repos/asf/flink/blob/ce573c65/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardMeterWrapperTest.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardMeterWrapperTest.java b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardMeterWrapperTest.java
index 0b8fa52..baab773 100644
--- a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardMeterWrapperTest.java
+++ b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardMeterWrapperTest.java
@@ -25,6 +25,9 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+/**
+ * Tests for the DropwizardMeterWrapper.
+ */
public class DropwizardMeterWrapperTest {
@Test
http://git-wip-us.apache.org/repos/asf/flink/blob/ce573c65/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/FlinkMeterWrapperTest.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/FlinkMeterWrapperTest.java b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/FlinkMeterWrapperTest.java
index b6389c5..2298e0f 100644
--- a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/FlinkMeterWrapperTest.java
+++ b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/FlinkMeterWrapperTest.java
@@ -20,12 +20,16 @@ package org.apache.flink.dropwizard.metrics;
import org.apache.flink.metrics.Meter;
import org.apache.flink.metrics.util.TestMeter;
+
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
+/**
+ * Tests for the FlinkMeterWrapper.
+ */
public class FlinkMeterWrapperTest {
private static final double DELTA = 0.0001;
http://git-wip-us.apache.org/repos/asf/flink/blob/ce573c65/flink-metrics/flink-metrics-ganglia/src/main/java/org/apache/flink/metrics/ganglia/GangliaReporter.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-ganglia/src/main/java/org/apache/flink/metrics/ganglia/GangliaReporter.java b/flink-metrics/flink-metrics-ganglia/src/main/java/org/apache/flink/metrics/ganglia/GangliaReporter.java
index de9da74..8719901 100644
--- a/flink-metrics/flink-metrics-ganglia/src/main/java/org/apache/flink/metrics/ganglia/GangliaReporter.java
+++ b/flink-metrics/flink-metrics-ganglia/src/main/java/org/apache/flink/metrics/ganglia/GangliaReporter.java
@@ -18,20 +18,23 @@
package org.apache.flink.metrics.ganglia;
-import com.codahale.metrics.ScheduledReporter;
-
-import info.ganglia.gmetric4j.gmetric.GMetric;
-
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.dropwizard.ScheduledDropwizardReporter;
import org.apache.flink.metrics.MetricConfig;
+import com.codahale.metrics.ScheduledReporter;
+import info.ganglia.gmetric4j.gmetric.GMetric;
+
import java.io.IOException;
import java.util.concurrent.TimeUnit;
+/**
+ * This class acts as a factory for the {@link com.codahale.metrics.ganglia.GangliaReporter} and allows using it as a
+ * Flink reporter.
+ */
@PublicEvolving
public class GangliaReporter extends ScheduledDropwizardReporter {
-
+
public static final String ARG_DMAX = "dmax";
public static final String ARG_TMAX = "tmax";
public static final String ARG_TTL = "ttl";
@@ -72,7 +75,7 @@ public class GangliaReporter extends ScheduledDropwizardReporter {
builder.withTMax(tMax);
log.info("Configured GangliaReporter with {host:{}, port:{}, dmax:{}, tmax:{}, ttl:{}, addressingMode:{}}",
- host, port, dMax, tMax, ttl, addressingMode);
+ host, port, dMax, tMax, ttl, addressingMode);
return builder.build(gMetric);
} catch (IOException e) {
throw new RuntimeException("Error while instantiating GangliaReporter.", e);
http://git-wip-us.apache.org/repos/asf/flink/blob/ce573c65/flink-metrics/flink-metrics-graphite/src/main/java/org/apache/flink/metrics/graphite/GraphiteReporter.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-graphite/src/main/java/org/apache/flink/metrics/graphite/GraphiteReporter.java b/flink-metrics/flink-metrics-graphite/src/main/java/org/apache/flink/metrics/graphite/GraphiteReporter.java
index 47a9d87..3338af2 100644
--- a/flink-metrics/flink-metrics-graphite/src/main/java/org/apache/flink/metrics/graphite/GraphiteReporter.java
+++ b/flink-metrics/flink-metrics-graphite/src/main/java/org/apache/flink/metrics/graphite/GraphiteReporter.java
@@ -18,16 +18,20 @@
package org.apache.flink.metrics.graphite;
-import com.codahale.metrics.ScheduledReporter;
-import com.codahale.metrics.graphite.Graphite;
-
-import com.codahale.metrics.graphite.GraphiteUDP;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.dropwizard.ScheduledDropwizardReporter;
import org.apache.flink.metrics.MetricConfig;
+import com.codahale.metrics.ScheduledReporter;
+import com.codahale.metrics.graphite.Graphite;
+import com.codahale.metrics.graphite.GraphiteUDP;
+
import java.util.concurrent.TimeUnit;
+/**
+ * This class acts as a factory for the {@link com.codahale.metrics.graphite.GraphiteReporter} and allows using it as a
+ * Flink reporter.
+ */
@PublicEvolving
public class GraphiteReporter extends ScheduledDropwizardReporter {
@@ -78,7 +82,7 @@ public class GraphiteReporter extends ScheduledDropwizardReporter {
log.info("Configured GraphiteReporter with {host:{}, port:{}, protocol:{}}", host, port, prot);
switch(prot) {
case UDP:
- return builder.build(new GraphiteUDP(host, port));
+ return builder.build(new GraphiteUDP(host, port));
case TCP:
default:
return builder.build(new Graphite(host, port));
http://git-wip-us.apache.org/repos/asf/flink/blob/ce573c65/flink-metrics/flink-metrics-jmx/src/main/java/org/apache/flink/metrics/jmx/JMXReporter.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-jmx/src/main/java/org/apache/flink/metrics/jmx/JMXReporter.java b/flink-metrics/flink-metrics-jmx/src/main/java/org/apache/flink/metrics/jmx/JMXReporter.java
index f0c0fcb..1cc7d38 100644
--- a/flink-metrics/flink-metrics-jmx/src/main/java/org/apache/flink/metrics/jmx/JMXReporter.java
+++ b/flink-metrics/flink-metrics-jmx/src/main/java/org/apache/flink/metrics/jmx/JMXReporter.java
@@ -30,6 +30,7 @@ import org.apache.flink.metrics.reporter.MetricReporter;
import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
import org.apache.flink.runtime.metrics.groups.FrontMetricGroup;
import org.apache.flink.util.NetUtils;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -42,6 +43,7 @@ import javax.management.ObjectName;
import javax.management.remote.JMXConnectorServer;
import javax.management.remote.JMXConnectorServerFactory;
import javax.management.remote.JMXServiceURL;
+
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.net.MalformedURLException;
@@ -57,7 +59,7 @@ import java.util.Map;
/**
* {@link MetricReporter} that exports {@link Metric Metrics} via JMX.
*
- * Largely based on the JmxReporter class of the dropwizard metrics library
+ * <p>Largely based on the JmxReporter class of the dropwizard metrics library
* https://github.com/dropwizard/metrics/blob/master/metrics-core/src/main/java/io/dropwizard/metrics/JmxReporter.java
*/
public class JMXReporter implements MetricReporter {
@@ -77,18 +79,15 @@ public class JMXReporter implements MetricReporter {
// ------------------------------------------------------------------------
- /** The server where the management beans are registered and deregistered */
+ /** The server where the management beans are registered and deregistered. */
private final MBeanServer mBeanServer;
- /** The names under which the registered metrics have been added to the MBeanServer */
+ /** The names under which the registered metrics have been added to the MBeanServer. */
private final Map<Metric, ObjectName> registeredMetrics;
/** The server to which JMX clients connect to. ALlows for better control over port usage. */
private JMXServer jmxServer;
- /**
- * Creates a new JMXReporter
- */
public JMXReporter() {
this.mBeanServer = ManagementFactory.getPlatformMBeanServer();
this.registeredMetrics = new HashMap<>();
@@ -140,7 +139,7 @@ public class JMXReporter implements MetricReporter {
}
}
}
-
+
public int getPort() {
if (jmxServer == null) {
throw new NullPointerException("No server was opened. Did you specify a port?");
@@ -220,7 +219,7 @@ public class JMXReporter implements MetricReporter {
}
// ------------------------------------------------------------------------
- // Utilities
+ // Utilities
// ------------------------------------------------------------------------
static Hashtable<String, String> generateJmxTable(Map<String, String> variables) {
@@ -239,9 +238,9 @@ public class JMXReporter implements MetricReporter {
* Lightweight method to replace unsupported characters.
* If the string does not contain any unsupported characters, this method creates no
* new string (and in fact no new objects at all).
- *
+ *
* <p>Replacements:
- *
+ *
* <ul>
* <li>{@code "} is removed</li>
* <li>{@code space} is replaced by {@code _} (underscore)</li>
@@ -252,7 +251,7 @@ public class JMXReporter implements MetricReporter {
char[] chars = null;
final int strLen = str.length();
int pos = 0;
-
+
for (int i = 0; i < strLen; i++) {
final char c = str.charAt(i);
switch (c) {
@@ -271,7 +270,7 @@ public class JMXReporter implements MetricReporter {
}
chars[pos++] = '_';
break;
-
+
case ',':
case '=':
case ';':
@@ -292,18 +291,24 @@ public class JMXReporter implements MetricReporter {
pos++;
}
}
-
+
return chars == null ? str : new String(chars, 0, pos);
}
// ------------------------------------------------------------------------
- // Interfaces and base classes for JMX beans
+ // Interfaces and base classes for JMX beans
// ------------------------------------------------------------------------
+ /**
+ * The common MBean interface for all metrics.
+ */
public interface MetricMBean {}
private abstract static class AbstractBean implements MetricMBean {}
+ /**
+ * The MBean interface for an exposed counter.
+ */
public interface JmxCounterMBean extends MetricMBean {
long getCount();
}
@@ -321,6 +326,9 @@ public class JMXReporter implements MetricReporter {
}
}
+ /**
+ * The MBean interface for an exposed gauge.
+ */
public interface JmxGaugeMBean extends MetricMBean {
Object getValue();
}
@@ -339,6 +347,9 @@ public class JMXReporter implements MetricReporter {
}
}
+ /**
+ * The MBean interface for an exposed histogram.
+ */
public interface JmxHistogramMBean extends MetricMBean {
long getCount();
@@ -427,6 +438,9 @@ public class JMXReporter implements MetricReporter {
}
}
+ /**
+ * The MBean interface for an exposed meter.
+ */
public interface JmxMeterMBean extends MetricMBean {
double getRate();
@@ -455,9 +469,9 @@ public class JMXReporter implements MetricReporter {
/**
* JMX Server implementation that JMX clients can connect to.
*
- * Heavily based on j256 simplejmx project
+ * <p>Heavily based on j256 simplejmx project
*
- * https://github.com/j256/simplejmx/blob/master/src/main/java/com/j256/simplejmx/server/JmxServer.java
+ * <p>https://github.com/j256/simplejmx/blob/master/src/main/java/com/j256/simplejmx/server/JmxServer.java
*/
private static class JMXServer {
private Registry rmiRegistry;
http://git-wip-us.apache.org/repos/asf/flink/blob/ce573c65/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java
index 85ab897..f10769a 100644
--- a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java
+++ b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java
@@ -22,8 +22,8 @@ import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.metrics.Gauge;
-import org.apache.flink.metrics.util.TestMeter;
import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.metrics.util.TestMeter;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
import org.apache.flink.runtime.metrics.groups.FrontMetricGroup;
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
import org.apache.flink.runtime.metrics.util.TestReporter;
import org.apache.flink.runtime.metrics.util.TestingHistogram;
import org.apache.flink.util.TestLogger;
+
import org.junit.Test;
import javax.management.MBeanAttributeInfo;
@@ -41,6 +42,7 @@ import javax.management.ObjectName;
import javax.management.remote.JMXConnector;
import javax.management.remote.JMXConnectorFactory;
import javax.management.remote.JMXServiceURL;
+
import java.lang.management.ManagementFactory;
import java.util.HashMap;
import java.util.Hashtable;
@@ -51,6 +53,9 @@ import static org.apache.flink.metrics.jmx.JMXReporter.JMX_DOMAIN_PREFIX;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+/**
+ * Tests for the JMXReporter.
+ */
public class JMXReporterTest extends TestLogger {
@Test
@@ -140,7 +145,7 @@ public class JMXReporterTest extends TestLogger {
rep1.notifyOfRemovedMetric(g1, "rep1", null);
rep1.notifyOfRemovedMetric(g2, "rep2", null);
-
+
mg.close();
reg.shutdown();
}
@@ -194,7 +199,7 @@ public class JMXReporterTest extends TestLogger {
ObjectName objectName1 = new ObjectName(JMX_DOMAIN_PREFIX + "taskmanager.rep1", JMXReporter.generateJmxTable(mg.getAllVariables()));
ObjectName objectName2 = new ObjectName(JMX_DOMAIN_PREFIX + "taskmanager.rep2", JMXReporter.generateJmxTable(mg.getAllVariables()));
- JMXServiceURL url1 = new JMXServiceURL("service:jmx:rmi://localhost:" + ((JMXReporter)rep1).getPort() + "/jndi/rmi://localhost:" + ((JMXReporter)rep1).getPort() + "/jmxrmi");
+ JMXServiceURL url1 = new JMXServiceURL("service:jmx:rmi://localhost:" + ((JMXReporter) rep1).getPort() + "/jndi/rmi://localhost:" + ((JMXReporter) rep1).getPort() + "/jmxrmi");
JMXConnector jmxCon1 = JMXConnectorFactory.connect(url1);
MBeanServerConnection mCon1 = jmxCon1.getMBeanServerConnection();
@@ -203,7 +208,7 @@ public class JMXReporterTest extends TestLogger {
jmxCon1.close();
- JMXServiceURL url2 = new JMXServiceURL("service:jmx:rmi://localhost:" + ((JMXReporter)rep2).getPort() + "/jndi/rmi://localhost:" + ((JMXReporter)rep2).getPort() + "/jmxrmi");
+ JMXServiceURL url2 = new JMXServiceURL("service:jmx:rmi://localhost:" + ((JMXReporter) rep2).getPort() + "/jndi/rmi://localhost:" + ((JMXReporter) rep2).getPort() + "/jmxrmi");
JMXConnector jmxCon2 = JMXConnectorFactory.connect(url2);
MBeanServerConnection mCon2 = jmxCon2.getMBeanServerConnection();
http://git-wip-us.apache.org/repos/asf/flink/blob/ce573c65/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
index 6d54db3..6b55eeb 100644
--- a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
+++ b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.runtime.jobmanager;
import org.apache.flink.configuration.ConfigConstants;
@@ -29,6 +30,7 @@ import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.testingUtils.TestingCluster;
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
+
import org.junit.Assert;
import org.junit.Test;
import scala.concurrent.Await;
@@ -38,6 +40,7 @@ import scala.concurrent.duration.FiniteDuration;
import javax.management.MBeanServer;
import javax.management.ObjectName;
+
import java.lang.management.ManagementFactory;
import java.util.Collections;
import java.util.Set;
@@ -45,6 +48,9 @@ import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
+/**
+ * Tests to verify JMX reporter functionality on the JobManager.
+ */
public class JMXJobManagerMetricTest {
/**
* Tests that metrics registered on the JobManager are actually accessible via JMX.
@@ -102,6 +108,9 @@ public class JMXJobManagerMetricTest {
}
}
+ /**
+ * Utility to block/unblock a task.
+ */
public static class BlockingInvokable extends AbstractInvokable {
private static boolean blocking = true;
private static final Object lock = new Object();
http://git-wip-us.apache.org/repos/asf/flink/blob/ce573c65/flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java b/flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java
index 113107f..527f9c1 100644
--- a/flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java
+++ b/flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java
@@ -42,20 +42,19 @@ import java.util.Map;
import java.util.NoSuchElementException;
/**
- * Largely based on the StatsDReporter class by ReadyTalk
- * https://github.com/ReadyTalk/metrics-statsd/blob/master/metrics3-statsd/src/main/java/com/readytalk/metrics/StatsDReporter.java
+ * Largely based on the StatsDReporter class by ReadyTalk.
*
- * Ported since it was not present in maven central.
+ * <p>https://github.com/ReadyTalk/metrics-statsd/blob/master/metrics3-statsd/src/main/java/com/readytalk/metrics/StatsDReporter.java
+ *
+ * <p>Ported since it was not present in maven central.
*/
@PublicEvolving
public class StatsDReporter extends AbstractReporter implements Scheduled {
-
+
private static final Logger LOG = LoggerFactory.getLogger(StatsDReporter.class);
public static final String ARG_HOST = "host";
public static final String ARG_PORT = "port";
-// public static final String ARG_CONVERSION_RATE = "rateConversion";
-// public static final String ARG_CONVERSION_DURATION = "durationConversion";
private boolean closed = false;
@@ -73,11 +72,6 @@ public class StatsDReporter extends AbstractReporter implements Scheduled {
this.address = new InetSocketAddress(host, port);
-// String conversionRate = config.getString(ARG_CONVERSION_RATE, "SECONDS");
-// String conversionDuration = config.getString(ARG_CONVERSION_DURATION, "MILLISECONDS");
-// this.rateFactor = TimeUnit.valueOf(conversionRate).toSeconds(1);
-// this.durationFactor = 1.0 / TimeUnit.valueOf(conversionDuration).toNanos(1);
-
try {
this.socket = new DatagramSocket(0);
} catch (SocketException e) {
@@ -131,7 +125,7 @@ public class StatsDReporter extends AbstractReporter implements Scheduled {
}
// ------------------------------------------------------------------------
-
+
private void reportCounter(final String name, final Counter counter) {
send(name, String.valueOf(counter.getCount()));
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ce573c65/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java b/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
index 9215c3f..7d063e7 100644
--- a/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
+++ b/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
@@ -25,10 +25,10 @@ import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.metrics.HistogramStatistics;
-import org.apache.flink.metrics.util.TestMeter;
import org.apache.flink.metrics.MetricConfig;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.metrics.util.TestMeter;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
@@ -36,6 +36,7 @@ import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.util.AbstractID;
import org.apache.flink.util.TestLogger;
+
import org.junit.Test;
import java.io.IOException;
@@ -53,6 +54,9 @@ import java.util.concurrent.TimeoutException;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+/**
+ * Tests for the StatsDReporter.
+ */
public class StatsDReporterTest extends TestLogger {
@Test
@@ -124,7 +128,7 @@ public class StatsDReporterTest extends TestLogger {
}
/**
- * Tests that histograms are properly reported via the StatsD reporter
+ * Tests that histograms are properly reported via the StatsD reporter.
*/
@Test
public void testStatsDHistogramReporting() throws Exception {
@@ -198,7 +202,7 @@ public class StatsDReporterTest extends TestLogger {
}
/**
- * Tests that meters are properly reported via the StatsD reporter
+ * Tests that meters are properly reported via the StatsD reporter.
*/
@Test
public void testStatsDMetersReporting() throws Exception {
@@ -241,7 +245,6 @@ public class StatsDReporterTest extends TestLogger {
Set<String> lines = receiver.getLines();
-
assertEquals(expectedLines, lines);
} finally {
@@ -260,7 +263,7 @@ public class StatsDReporterTest extends TestLogger {
}
/**
- * Testing StatsDReporter which disables the socket creation
+ * Testing StatsDReporter which disables the socket creation.
*/
public static class TestingStatsDReporter extends StatsDReporter {
@Override
@@ -273,7 +276,7 @@ public class StatsDReporterTest extends TestLogger {
}
}
- public static class TestingHistogram implements Histogram {
+ private static class TestingHistogram implements Histogram {
@Override
public void update(long value) {
@@ -326,7 +329,7 @@ public class StatsDReporterTest extends TestLogger {
}
}
- public static class DatagramSocketReceiver implements Runnable {
+ private static class DatagramSocketReceiver implements Runnable {
private static final Object obj = new Object();
private final DatagramSocket socket;
http://git-wip-us.apache.org/repos/asf/flink/blob/ce573c65/flink-metrics/pom.xml
----------------------------------------------------------------------
diff --git a/flink-metrics/pom.xml b/flink-metrics/pom.xml
index e1b66c2..3be21aa 100644
--- a/flink-metrics/pom.xml
+++ b/flink-metrics/pom.xml
@@ -60,4 +60,43 @@ under the License.
</dependency>
</dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-checkstyle-plugin</artifactId>
+ <version>2.17</version>
+ <dependencies>
+ <dependency>
+ <groupId>com.puppycrawl.tools</groupId>
+ <artifactId>checkstyle</artifactId>
+ <version>6.19</version>
+ </dependency>
+ </dependencies>
+ <configuration>
+ <configLocation>/tools/maven/strict-checkstyle.xml</configLocation>
+ <suppressionsLocation>/tools/maven/suppressions.xml</suppressionsLocation>
+ <includeTestSourceDirectory>true</includeTestSourceDirectory>
+ <logViolationsToConsole>true</logViolationsToConsole>
+ <failOnViolation>true</failOnViolation>
+ </configuration>
+ <executions>
+ <!--
+ Execute checkstyle after compilation but before tests.
+
+ This ensures that any parsing or type checking errors are from
+ javac, so they look as expected. Beyond that, we want to
+ fail as early as possible.
+ -->
+ <execution>
+ <phase>test-compile</phase>
+ <goals>
+ <goal>check</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
</project>
[6/8] flink git commit: [FLINK-6659] fix RocksDBMergeIteratorTest
leaving temporary data behind
Posted by ch...@apache.org.
[FLINK-6659] fix RocksDBMergeIteratorTest leaving temporary data behind
-> use a JUnit '@Rule' that does the cleanup
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b58a420e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b58a420e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b58a420e
Branch: refs/heads/master
Commit: b58a420e3954f734192a7eff2e187349c4ed9959
Parents: dbcc456
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Mon May 22 16:47:35 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Wed May 24 16:46:03 2017 +0200
----------------------------------------------------------------------
.../state/RocksDBMergeIteratorTest.java | 19 +++++++++++++------
1 file changed, 13 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/b58a420e/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMergeIteratorTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMergeIteratorTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMergeIteratorTest.java
index 956ef2f..f5bcf86 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMergeIteratorTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMergeIteratorTest.java
@@ -21,16 +21,16 @@ package org.apache.flink.contrib.streaming.state;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
-import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.junit.Assert;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksIterator;
import java.io.DataOutputStream;
-import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -43,6 +43,9 @@ public class RocksDBMergeIteratorTest {
private static final int NUM_KEY_VAL_STATES = 50;
private static final int MAX_NUM_KEYS = 20;
+ @Rule
+ public TemporaryFolder tempFolder = new TemporaryFolder();
+
@Test
public void testEmptyMergeIterator() throws IOException {
RocksDBKeyedStateBackend.RocksDBMergeIterator emptyIterator =
@@ -51,19 +54,23 @@ public class RocksDBMergeIteratorTest {
}
@Test
- public void testMergeIterator() throws Exception {
+ public void testMergeIteratorByte() throws Exception {
Assert.assertTrue(MAX_NUM_KEYS <= Byte.MAX_VALUE);
testMergeIterator(Byte.MAX_VALUE);
+ }
+
+ @Test
+ public void testMergeIteratorShort() throws Exception {
+ Assert.assertTrue(MAX_NUM_KEYS <= Byte.MAX_VALUE);
+
testMergeIterator(Short.MAX_VALUE);
}
public void testMergeIterator(int maxParallelism) throws Exception {
Random random = new Random(1234);
- File tmpDir = CommonTestUtils.createTempDirectory();
-
- RocksDB rocksDB = RocksDB.open(tmpDir.getAbsolutePath());
+ RocksDB rocksDB = RocksDB.open(tempFolder.getRoot().getAbsolutePath());
try {
List<Tuple2<RocksIterator, Integer>> rocksIteratorsWithKVStateId = new ArrayList<>();
List<Tuple2<ColumnFamilyHandle, Integer>> columnFamilyHandlesWithKeyCount = new ArrayList<>();
[4/8] flink git commit: [FLINK-6675] Activate strict checkstyle for
flink-annotations
Posted by ch...@apache.org.
[FLINK-6675] Activate strict checkstyle for flink-annotations
This closes #3970.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/dbcc456a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/dbcc456a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/dbcc456a
Branch: refs/heads/master
Commit: dbcc456a652e980323b1b23692578e3c22e25e68
Parents: 04fae53
Author: zentol <ch...@apache.org>
Authored: Tue May 23 13:47:53 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Wed May 24 16:46:03 2017 +0200
----------------------------------------------------------------------
flink-annotations/pom.xml | 38 ++++++++++++++++++++
.../org/apache/flink/annotation/Internal.java | 1 +
.../org/apache/flink/annotation/Public.java | 1 +
.../apache/flink/annotation/PublicEvolving.java | 1 +
.../flink/annotation/VisibleForTesting.java | 3 +-
5 files changed, 43 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/dbcc456a/flink-annotations/pom.xml
----------------------------------------------------------------------
diff --git a/flink-annotations/pom.xml b/flink-annotations/pom.xml
index b375611..08e10f5 100644
--- a/flink-annotations/pom.xml
+++ b/flink-annotations/pom.xml
@@ -34,4 +34,42 @@ under the License.
<packaging>jar</packaging>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-checkstyle-plugin</artifactId>
+ <version>2.17</version>
+ <dependencies>
+ <dependency>
+ <groupId>com.puppycrawl.tools</groupId>
+ <artifactId>checkstyle</artifactId>
+ <version>6.19</version>
+ </dependency>
+ </dependencies>
+ <configuration>
+ <configLocation>/tools/maven/strict-checkstyle.xml</configLocation>
+ <suppressionsLocation>/tools/maven/suppressions.xml</suppressionsLocation>
+ <includeTestSourceDirectory>true</includeTestSourceDirectory>
+ <logViolationsToConsole>true</logViolationsToConsole>
+ <failOnViolation>true</failOnViolation>
+ </configuration>
+ <executions>
+ <!--
+ Execute checkstyle after compilation but before tests.
+ This ensures that any parsing or type checking errors are from
+ javac, so they look as expected. Beyond that, we want to
+ fail as early as possible.
+ -->
+ <execution>
+ <phase>test-compile</phase>
+ <goals>
+ <goal>check</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/dbcc456a/flink-annotations/src/main/java/org/apache/flink/annotation/Internal.java
----------------------------------------------------------------------
diff --git a/flink-annotations/src/main/java/org/apache/flink/annotation/Internal.java b/flink-annotations/src/main/java/org/apache/flink/annotation/Internal.java
index dd9f080..65231c2 100644
--- a/flink-annotations/src/main/java/org/apache/flink/annotation/Internal.java
+++ b/flink-annotations/src/main/java/org/apache/flink/annotation/Internal.java
@@ -16,6 +16,7 @@
* limitations under the License.
*
*/
+
package org.apache.flink.annotation;
import java.lang.annotation.Documented;
http://git-wip-us.apache.org/repos/asf/flink/blob/dbcc456a/flink-annotations/src/main/java/org/apache/flink/annotation/Public.java
----------------------------------------------------------------------
diff --git a/flink-annotations/src/main/java/org/apache/flink/annotation/Public.java b/flink-annotations/src/main/java/org/apache/flink/annotation/Public.java
index d891a0a..1e3bd09 100644
--- a/flink-annotations/src/main/java/org/apache/flink/annotation/Public.java
+++ b/flink-annotations/src/main/java/org/apache/flink/annotation/Public.java
@@ -16,6 +16,7 @@
* limitations under the License.
*
*/
+
package org.apache.flink.annotation;
import java.lang.annotation.Documented;
http://git-wip-us.apache.org/repos/asf/flink/blob/dbcc456a/flink-annotations/src/main/java/org/apache/flink/annotation/PublicEvolving.java
----------------------------------------------------------------------
diff --git a/flink-annotations/src/main/java/org/apache/flink/annotation/PublicEvolving.java b/flink-annotations/src/main/java/org/apache/flink/annotation/PublicEvolving.java
index 0c093a2..3389e71 100644
--- a/flink-annotations/src/main/java/org/apache/flink/annotation/PublicEvolving.java
+++ b/flink-annotations/src/main/java/org/apache/flink/annotation/PublicEvolving.java
@@ -16,6 +16,7 @@
* limitations under the License.
*
*/
+
package org.apache.flink.annotation;
import java.lang.annotation.Documented;
http://git-wip-us.apache.org/repos/asf/flink/blob/dbcc456a/flink-annotations/src/main/java/org/apache/flink/annotation/VisibleForTesting.java
----------------------------------------------------------------------
diff --git a/flink-annotations/src/main/java/org/apache/flink/annotation/VisibleForTesting.java b/flink-annotations/src/main/java/org/apache/flink/annotation/VisibleForTesting.java
index 8f945a9..bed7db0 100644
--- a/flink-annotations/src/main/java/org/apache/flink/annotation/VisibleForTesting.java
+++ b/flink-annotations/src/main/java/org/apache/flink/annotation/VisibleForTesting.java
@@ -16,6 +16,7 @@
* limitations under the License.
*
*/
+
package org.apache.flink.annotation;
import java.lang.annotation.Documented;
@@ -25,7 +26,7 @@ import java.lang.annotation.Target;
/**
* This annotations declares that a function, field, constructor, or entire type, is only visible for
* testing purposes.
- *
+ *
* <p>This annotation is typically attached when for example a method should be {@code private}
* (because it is not intended to be called externally), but cannot be declared private, because
* some tests need to have access to it.