You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by asicoe <gi...@git.apache.org> on 2017/09/26 08:30:36 UTC
[GitHub] flink pull request #4725: [FLINK-7689] [Streaming Connectors] Added metrics ...
GitHub user asicoe opened a pull request:
https://github.com/apache/flink/pull/4725
[FLINK-7689] [Streaming Connectors] Added metrics to JDBCOutputFormat in order to be able to m…
Added metrics to JDBCOutputFormat in order to be able to measure jdbc batch flush rate, latency and size.
## Contribution Checklist
- Make sure that the pull request corresponds to a [JIRA issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are made for typos in JavaDoc or documentation files, which need no JIRA issue.
- Name the pull request in the form "[FLINK-XXXX] [component] Title of the pull request", where *FLINK-XXXX* should be replaced by the actual issue number. Skip *component* if you are unsure about which is the best component.
Typo fixes that have no associated JIRA issue should be named following this pattern: `[hotfix] [docs] Fix typo in event time introduction` or `[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`.
- Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review.
- Make sure that the change passes the automated tests, i.e., `mvn clean verify` passes. You can set up Travis CI to do that following [this guide](http://flink.apache.org/contribute-code.html#best-practices).
- Each pull request should address only one issue, not mix up code from multiple issues.
- Each commit in the pull request has a meaningful commit message (including the JIRA id)
- Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below.
**(The sections below can be removed for hotfixes of typos)**
## What is the purpose of the change
This pull request adds some useful metrics to the flink-jdbc sink.
## Brief change log
- Added metrics to JDBCOutputFormat in order to be able to measure jdbc batch flush rate, latency and size.
## Verifying this change
This change added tests and can be verified as follows:
Check the Task Metrics section of the Flink UI of a job that uses the flink-jdbc and flink-table 1.4-SNAPSHOT dependencies and creates a JDBCAppendTableSink instance and uses it's emitDataStream method to write out a stream of messages to a JDBC enabled destination.
These metrics have been successfully used in a dev environment running a Flink streaming app for the past 1 month, in conjunction with statsd/graphite/graphana.
## Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): no
- The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no
- The serializers: no
- The runtime per-record code paths (performance sensitive): no
- Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no
## Documentation
- Does this pull request introduce a new feature? no
- If yes, how is the feature documented? not applicable
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/asicoe/flink FLINK-7689_instrument_jdbc_sink
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/4725.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #4725
----
commit c5baa1d4fb8947383c601817f84dc2e2cfb7f26a
Author: Alex Sicoe <al...@intechww.com>
Date: 2017-09-26T08:01:40Z
FLINK-7689 Added metrics to JDBCOutputFormat in order to be able to measure jdbc batch flush rate, latency and size.
----
---
[GitHub] flink pull request #4725: [FLINK-7689] [Streaming Connectors] Added metrics ...
Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/4725#discussion_r192222854
--- Diff: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java ---
@@ -41,6 +46,11 @@
public class JDBCOutputFormat extends RichOutputFormat<Row> {
private static final long serialVersionUID = 1L;
static final int DEFAULT_BATCH_INTERVAL = 5000;
+ static final String FLUSH_SCOPE = "flush";
+ static final String FLUSH_RATE_METER_NAME = "rate";
--- End diff --
I'm not convinced of the naming scheme. I would replace `FLUSH_SCOPE` with "jdbc", and explicitly prefix the rate and duration metrics with "flush".
---
[GitHub] flink issue #4725: [FLINK-7689] [Streaming Connectors] Added metrics to JDBC...
Posted by asicoe <gi...@git.apache.org>.
Github user asicoe commented on the issue:
https://github.com/apache/flink/pull/4725
Thanks for reviewing Fabian. I have added the suggested changes. Please have a look. The build is failing for a non-related issue.
---
[GitHub] flink pull request #4725: [FLINK-7689] [Streaming Connectors] Added metrics ...
Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/4725#discussion_r192222445
--- Diff: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java ---
@@ -82,6 +97,22 @@ public void open(int taskNumber, int numTasks) throws IOException {
} catch (ClassNotFoundException cnfe) {
throw new IllegalArgumentException("JDBC driver class not found.", cnfe);
}
+ this.flushMeter = getRuntimeContext()
+ .getMetricGroup()
+ .addGroup(FLUSH_SCOPE)
+ .meter(FLUSH_RATE_METER_NAME, new DropwizardMeterWrapper(new com.codahale.metrics.Meter()));
--- End diff --
These could be replaced with the built-in `MeterView`.
---
[GitHub] flink pull request #4725: [FLINK-7689] [Streaming Connectors] Added metrics ...
Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/4725#discussion_r192222079
--- Diff: flink-connectors/flink-jdbc/pom.xml ---
@@ -59,5 +59,11 @@ under the License.
<version>10.10.1.1</version>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-metrics-dropwizard</artifactId>
--- End diff --
We should exclusively rely on built-in metrics.
---
[GitHub] flink pull request #4725: [FLINK-7689] [Streaming Connectors] Added metrics ...
Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/4725#discussion_r192224177
--- Diff: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java ---
@@ -207,13 +238,18 @@ public void writeRecord(Row row) throws IOException {
if (batchCount >= batchInterval) {
// execute batch
+ batchLimitReachedMeter.markEvent();
flush();
}
}
void flush() {
try {
+ flushMeter.markEvent();
+ flushBatchCountHisto.update(batchCount);
+ long before = System.currentTimeMillis();
upload.executeBatch();
+ flushDurationMsHisto.update(System.currentTimeMillis() - before);
--- End diff --
This may result in a negative duration.
---
[GitHub] flink pull request #4725: [FLINK-7689] [Streaming Connectors] Added metrics ...
Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/4725#discussion_r192222379
--- Diff: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java ---
@@ -82,6 +97,22 @@ public void open(int taskNumber, int numTasks) throws IOException {
} catch (ClassNotFoundException cnfe) {
throw new IllegalArgumentException("JDBC driver class not found.", cnfe);
}
+ this.flushMeter = getRuntimeContext()
+ .getMetricGroup()
+ .addGroup(FLUSH_SCOPE)
+ .meter(FLUSH_RATE_METER_NAME, new DropwizardMeterWrapper(new com.codahale.metrics.Meter()));
+ this.batchLimitReachedMeter = getRuntimeContext()
+ .getMetricGroup()
+ .addGroup(FLUSH_SCOPE)
+ .meter(BATCH_LIMIT_REACHED_RATE_METER_NAME, new DropwizardMeterWrapper(new com.codahale.metrics.Meter()));
+ this.flushDurationMsHisto = getRuntimeContext()
+ .getMetricGroup()
+ .addGroup(FLUSH_SCOPE)
+ .histogram(FLUSH_DURATION_HISTO_NAME, new DropwizardHistogramWrapper(new com.codahale.metrics.Histogram(new ExponentiallyDecayingReservoir())));
--- End diff --
I recommend staying away form histograms as long as possible. Most metric backends recommend to _not_ build histograms in the application, but let the backend handle it.
---
[GitHub] flink pull request #4725: [FLINK-7689] [Streaming Connectors] Added metrics ...
Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/4725#discussion_r192223629
--- Diff: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java ---
@@ -207,13 +238,18 @@ public void writeRecord(Row row) throws IOException {
if (batchCount >= batchInterval) {
// execute batch
+ batchLimitReachedMeter.markEvent();
--- End diff --
this seems redundant given that `flushMeter` exists. While the job is running `batchLimit == flushMeter`, and at the end `batchLimit == flushMeter -1` except in the exceedingly rare case that the total number of rows fits perfectly into the batches.
---
[GitHub] flink pull request #4725: [FLINK-7689] [Streaming Connectors] Added metrics ...
Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/4725#discussion_r142919779
--- Diff: flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java ---
@@ -233,6 +255,30 @@ public void testFlush() throws SQLException, IOException {
}
}
+ @Test
+ public void testMetricsSetup() throws IOException {
--- End diff --
Can you extend this test to check that the metrics are correctly set (except for the `durationMs` histogram)?
---
[GitHub] flink pull request #4725: [FLINK-7689] [Streaming Connectors] Added metrics ...
Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/4725#discussion_r142918146
--- Diff: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java ---
@@ -41,6 +46,11 @@
public class JDBCOutputFormat extends RichOutputFormat<Row> {
private static final long serialVersionUID = 1L;
static final int DEFAULT_BATCH_INTERVAL = 5000;
+ static final String FLUSH_SCOPE = "flush";
+ static final String FLUSH_RATE_METER_NAME = "rate";
+ static final String FLUSH_RATE_GR_BATCH_INT_METER_NAME = "rateGreaterThanBatchInterval";
--- End diff --
rename to `batchLimitReachedRate`?
---
[GitHub] flink pull request #4725: [FLINK-7689] [Streaming Connectors] Added metrics ...
Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/4725#discussion_r192222532
--- Diff: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java ---
@@ -58,6 +68,11 @@
private int[] typesArray;
+ private Meter batchLimitReachedMeter;
+ private Meter flushMeter;
--- End diff --
These could be initialized in the constructor and made final.
---
[GitHub] flink issue #4725: [FLINK-7689] [Streaming Connectors] Added metrics to JDBC...
Posted by pabloem <gi...@git.apache.org>.
Github user pabloem commented on the issue:
https://github.com/apache/flink/pull/4725
@fhueske @asicoe is this PR still current / ongoing? I'm willing to driving it to the end if there's anything left to do.. : ) - Or perhaps it's almost ready to merge?
---