You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@bahir.apache.org by yew1eb <gi...@git.apache.org> on 2017/08/28 04:14:46 UTC

[GitHub] bahir-flink pull request #21: [BAHIR-134] Add InfluxDB sink for flink stream

GitHub user yew1eb opened a pull request:

    https://github.com/apache/bahir-flink/pull/21

    [BAHIR-134] Add InfluxDB sink for flink stream

    add InfluxDBSink for flink stream
    
    ## Verifying this change
    Add example `InfluxDBSinkExample`
    Runing environment:
    flink version 1.3.0
    influxdb version: 1.3.4
    influxdb-java(influxdb client) version: 2.7(Compatible with InfluxDB version 0.9 ~ 1.3.x)
    
    It works well.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/yew1eb/bahir-flink BAHIR-134

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/bahir-flink/pull/21.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #21
    
----
commit 1249a8c26fa261719431a269f4b46203e748774f
Author: zhouhai02 <zh...@meituan.com>
Date:   2017-08-27T11:35:31Z

    add InfluxDb sink for flink stream

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] bahir-flink issue #21: [BAHIR-134] Add InfluxDB sink for flink stream

Posted by yew1eb <gi...@git.apache.org>.
Github user yew1eb commented on the issue:

    https://github.com/apache/bahir-flink/pull/21
  
    Hi @rmetzger, thanks for your review. I have updated the PR.
    Best,
    Hai Zhou
    :beers:


---

[GitHub] bahir-flink issue #21: [BAHIR-134] Add InfluxDB sink for flink stream

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on the issue:

    https://github.com/apache/bahir-flink/pull/21
  
    As of spark 2.2, java 7 support has been removed.
    I think flink also requires java 8, right ?
    
    Perhaps we can do the same for bahir


---

[GitHub] bahir-flink issue #21: [BAHIR-134] Add InfluxDB sink for flink stream

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the issue:

    https://github.com/apache/bahir-flink/pull/21
  
    Overall, I like the change.
    The only problem is that it breaks the build because of missing Java 7 support.
    
    I guess we need to decide whether we want to drop java 7 support.


---

[GitHub] bahir-flink pull request #21: [BAHIR-134] Add InfluxDB sink for flink stream

Posted by yew1eb <gi...@git.apache.org>.
Github user yew1eb commented on a diff in the pull request:

    https://github.com/apache/bahir-flink/pull/21#discussion_r139290380
  
    --- Diff: flink-connector-influxdb/src/main/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBSink.java ---
    @@ -0,0 +1,114 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.connectors.influxdb;
    +
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
    +import org.apache.flink.util.CollectionUtil;
    +import org.apache.flink.util.Preconditions;
    +import org.apache.flink.util.StringUtils;
    +import org.influxdb.InfluxDB;
    +import org.influxdb.InfluxDBFactory;
    +import org.influxdb.dto.Point;
    +
    +import java.util.concurrent.TimeUnit;
    +
    +/**
    + * Sink to save data into a InfluxDB cluster.
    + */
    +public class InfluxDBSink extends RichSinkFunction<InfluxDBPoint> {
    +
    +    private transient InfluxDB influxDB = null;
    +    private final String dbName;
    +    private final String username;
    +    private final String password;
    +    private final String host;
    +    private boolean batchEnabled = true;
    +
    +    /**
    +     * Creates a new {@link InfluxDBSink} that connects to the InfluxDB server.
    +     *
    +     * @param host     the url to connect to.
    +     * @param username the username which is used to authorize against the influxDB instance.
    +     * @param password the password for the username which is used to authorize against the influxDB instance.
    +     * @param dbName   the database to write to.
    +     */
    +    public InfluxDBSink(String host, String username, String password, String dbName) {
    +        this.host = Preconditions.checkNotNull(host, "host can not be null");
    +        this.username = Preconditions.checkNotNull(username, "username can not be null");
    +        this.password = Preconditions.checkNotNull(password, "password can not be null");
    +        this.dbName = Preconditions.checkNotNull(dbName, "dbName can not be null");
    +    }
    +
    +    public InfluxDBSink(String host, String username, String password, String dbName, boolean batchEnabled) {
    +        this(host, username, password, dbName);
    +        this.batchEnabled = Preconditions.checkNotNull(batchEnabled, "batchEnabled can not be null");
    +    }
    +
    +    /**
    +     * Initializes the connection to InfluxDB by either cluster or sentinels or single server.
    +     */
    +    @Override
    +    public void open(Configuration parameters) throws Exception {
    +        super.open(parameters);
    +
    +        influxDB = InfluxDBFactory.connect(host, username, password);
    +        if (!influxDB.databaseExists(dbName)) {
    +            influxDB.createDatabase(dbName);
    --- End diff --
    
    Thank you for review.
    I will use `throw RuntimeException` instead of helping the user to create. 
    Because `open` method will being execute by all parallel sink tasks.


---

[GitHub] bahir-flink pull request #21: [BAHIR-134] Add InfluxDB sink for flink stream

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/bahir-flink/pull/21#discussion_r139234041
  
    --- Diff: flink-connector-influxdb/src/main/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBSink.java ---
    @@ -0,0 +1,114 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.connectors.influxdb;
    +
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
    +import org.apache.flink.util.CollectionUtil;
    +import org.apache.flink.util.Preconditions;
    +import org.apache.flink.util.StringUtils;
    +import org.influxdb.InfluxDB;
    +import org.influxdb.InfluxDBFactory;
    +import org.influxdb.dto.Point;
    +
    +import java.util.concurrent.TimeUnit;
    +
    +/**
    + * Sink to save data into a InfluxDB cluster.
    + */
    +public class InfluxDBSink extends RichSinkFunction<InfluxDBPoint> {
    +
    +    private transient InfluxDB influxDB = null;
    +    private final String dbName;
    +    private final String username;
    +    private final String password;
    +    private final String host;
    +    private boolean batchEnabled = true;
    +
    +    /**
    +     * Creates a new {@link InfluxDBSink} that connects to the InfluxDB server.
    +     *
    +     * @param host     the url to connect to.
    +     * @param username the username which is used to authorize against the influxDB instance.
    +     * @param password the password for the username which is used to authorize against the influxDB instance.
    +     * @param dbName   the database to write to.
    +     */
    +    public InfluxDBSink(String host, String username, String password, String dbName) {
    +        this.host = Preconditions.checkNotNull(host, "host can not be null");
    +        this.username = Preconditions.checkNotNull(username, "username can not be null");
    +        this.password = Preconditions.checkNotNull(password, "password can not be null");
    +        this.dbName = Preconditions.checkNotNull(dbName, "dbName can not be null");
    +    }
    +
    +    public InfluxDBSink(String host, String username, String password, String dbName, boolean batchEnabled) {
    +        this(host, username, password, dbName);
    +        this.batchEnabled = Preconditions.checkNotNull(batchEnabled, "batchEnabled can not be null");
    +    }
    +
    +    /**
    +     * Initializes the connection to InfluxDB by either cluster or sentinels or single server.
    +     */
    +    @Override
    +    public void open(Configuration parameters) throws Exception {
    +        super.open(parameters);
    +
    +        influxDB = InfluxDBFactory.connect(host, username, password);
    +        if (!influxDB.databaseExists(dbName)) {
    +            influxDB.createDatabase(dbName);
    +        }
    +
    +        if (batchEnabled) {
    +            // Flush every 2000 Points, at least every 100ms
    +            influxDB.enableBatch(2000, 100, TimeUnit.MILLISECONDS);
    --- End diff --
    
    Ideally we'll make this configurable.


---

[GitHub] bahir-flink pull request #21: [BAHIR-134] Add InfluxDB sink for flink stream

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/bahir-flink/pull/21


---

[GitHub] bahir-flink issue #21: [BAHIR-134] Add InfluxDB sink for flink stream

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the issue:

    https://github.com/apache/bahir-flink/pull/21
  
    Flink 1.3, the current release has still Java 7 support. But starting from Flink 1.4, Java 7 will be dropped.
    I'll start a quick discussion on the dev@ list, to make sure nobody disagrees.


---

[GitHub] bahir-flink pull request #21: [BAHIR-134] Add InfluxDB sink for flink stream

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/bahir-flink/pull/21#discussion_r139233921
  
    --- Diff: flink-connector-influxdb/src/main/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBSink.java ---
    @@ -0,0 +1,114 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.connectors.influxdb;
    +
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
    +import org.apache.flink.util.CollectionUtil;
    +import org.apache.flink.util.Preconditions;
    +import org.apache.flink.util.StringUtils;
    +import org.influxdb.InfluxDB;
    +import org.influxdb.InfluxDBFactory;
    +import org.influxdb.dto.Point;
    +
    +import java.util.concurrent.TimeUnit;
    +
    +/**
    + * Sink to save data into a InfluxDB cluster.
    + */
    +public class InfluxDBSink extends RichSinkFunction<InfluxDBPoint> {
    +
    +    private transient InfluxDB influxDB = null;
    +    private final String dbName;
    +    private final String username;
    +    private final String password;
    +    private final String host;
    +    private boolean batchEnabled = true;
    +
    +    /**
    +     * Creates a new {@link InfluxDBSink} that connects to the InfluxDB server.
    +     *
    +     * @param host     the url to connect to.
    +     * @param username the username which is used to authorize against the influxDB instance.
    +     * @param password the password for the username which is used to authorize against the influxDB instance.
    +     * @param dbName   the database to write to.
    +     */
    +    public InfluxDBSink(String host, String username, String password, String dbName) {
    +        this.host = Preconditions.checkNotNull(host, "host can not be null");
    +        this.username = Preconditions.checkNotNull(username, "username can not be null");
    +        this.password = Preconditions.checkNotNull(password, "password can not be null");
    +        this.dbName = Preconditions.checkNotNull(dbName, "dbName can not be null");
    +    }
    +
    +    public InfluxDBSink(String host, String username, String password, String dbName, boolean batchEnabled) {
    +        this(host, username, password, dbName);
    +        this.batchEnabled = Preconditions.checkNotNull(batchEnabled, "batchEnabled can not be null");
    +    }
    +
    +    /**
    +     * Initializes the connection to InfluxDB by either cluster or sentinels or single server.
    +     */
    +    @Override
    +    public void open(Configuration parameters) throws Exception {
    +        super.open(parameters);
    +
    +        influxDB = InfluxDBFactory.connect(host, username, password);
    +        if (!influxDB.databaseExists(dbName)) {
    +            influxDB.createDatabase(dbName);
    --- End diff --
    
    Maybe for the future: I would actually log an info message, stating that you've created the database.
    I think its a good if code always tells the user when it is doing magic.


---

[GitHub] bahir-flink issue #21: [BAHIR-134] Add InfluxDB sink for flink stream

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the issue:

    https://github.com/apache/bahir-flink/pull/21
  
    Thanks. I'll merge the change.


---