You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by GitBox <gi...@apache.org> on 2022/08/22 10:34:57 UTC

[GitHub] [incubator-seatunnel] 531651225 opened a new pull request, #2499: Phoenix connector sink

531651225 opened a new pull request, #2499:
URL: https://github.com/apache/incubator-seatunnel/pull/2499

   <!--
   
   Thank you for contributing to SeaTunnel! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   ## Contribution Checklist
   
     - Make sure that the pull request corresponds to a [GITHUB issue](https://github.com/apache/incubator-seatunnel/issues).
   
     - Name the pull request in the form "[Feature] [component] Title of the pull request", where *Feature* can be replaced by `Hotfix`, `Bug`, etc.
   
     - Minor fixes should be named following this pattern: `[hotfix] [docs] Fix typo in README.md doc`.
   
   -->
   https://github.com/apache/incubator-seatunnel/issues/1946 Add phoenix sink connector 
   support  thin driver  client and Thick driver client 
   ## Purpose of this pull request
   
   <!-- Describe the purpose of this pull request. For example: This pull request adds checkstyle plugin.-->
   
   ## Check list
   
   * [x] Code changed are covered with tests, or it does not need tests for reason:
   * [x] If any new Jar binary package adding in your PR, please add License Notice according
     [New License Guide](https://github.com/apache/incubator-seatunnel/blob/dev/docs/en/contribution/new-license.md)
   * [x] If necessary, please update the documentation to describe the new feature. https://github.com/apache/incubator-seatunnel/tree/dev/docs
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] hailin0 commented on a diff in pull request #2499: [Feature][Connector-V2] Add phoenix connector sink

Posted by GitBox <gi...@apache.org>.
hailin0 commented on code in PR #2499:
URL: https://github.com/apache/incubator-seatunnel/pull/2499#discussion_r954442058


##########
seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/phoenix/fakesource_to_phoenix.conf:
##########
@@ -0,0 +1,60 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+    # You can set spark configuration here
+    spark.app.name = "SeaTunnel"
+    spark.executor.instances = 2
+    spark.executor.cores = 1
+    spark.executor.memory = "1g"
+    spark.master = local
+    job.mode = "BATCH"
+}
+
+source {
+    FakeSource {
+        result_table_name = "fake"
+        field_name = "name, age"

Review Comment:
   Use new config style? reference #2406 
   
   
   explame:
   ```xml
   fields {
       name = string
       age = int
   }
   ``



##########
seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/phoenix/fakesource_to_phoenix.conf:
##########
@@ -0,0 +1,60 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+    # You can set spark configuration here
+    spark.app.name = "SeaTunnel"
+    spark.executor.instances = 2
+    spark.executor.cores = 1
+    spark.executor.memory = "1g"
+    spark.master = local
+    job.mode = "BATCH"
+}
+
+source {
+    FakeSource {
+        result_table_name = "fake"
+        field_name = "name, age"
+    }
+
+    # If you would like to get more information about how to configure seatunnel and see full list of source plugins,
+    # please go to https://seatunnel.apache.org/docs/connector-v2/source/FakeSource
+}
+
+transform {
+    sql {
+              sql = "select name, age from fake"
+            }

Review Comment:
   check style: indent



##########
seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/phoenix/FakeSourceToIoPhoenixIT.java:
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.spark.v2.phoenix;
+
+import org.apache.seatunnel.e2e.spark.SparkContainer;
+
+import com.google.common.collect.Lists;
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.List;
+import java.util.stream.Stream;
+
+@Slf4j
+public class FakeSourceToIoPhoenixIT extends SparkContainer {
+
+    private static final String PHOENIX_DOCKER_IMAGE = "iteblog/hbase-phoenix-docker:1.0";
+    private static final String PHOENIX_HOST = "flink_e2e_phoenix_sink";
+    private static final int PHOENIX_PORT = 8765;
+    private static final String PHOENIX_CONNECT_URL = "jdbc:phoenix:thin:url=http://localhost:8765;serialization=PROTOBUF";
+    private static final String PHOENIX_JDBC_DRIVER = "org.apache.phoenix.queryserver.client.Driver";
+
+    private GenericContainer<?> phoenixServer;
+
+    @BeforeEach
+    public void startPhoenixContainer() throws Exception {
+        phoenixServer = new GenericContainer<>(PHOENIX_DOCKER_IMAGE)
+                .withNetwork(NETWORK)
+                .withNetworkAliases(PHOENIX_HOST)
+                .withLogConsumer(new Slf4jLogConsumer(log));
+        phoenixServer.setPortBindings(Lists.newArrayList(
+                String.format("%s:8765", PHOENIX_PORT)));
+        Startables.deepStart(Stream.of(phoenixServer)).join();
+        log.info("phoenix container started");
+        // wait for phoenix fully start
+        Thread.sleep(5000L);

Review Comment:
   Use awaitility ?
   
   case:
   https://github.com/apache/incubator-seatunnel/blob/dev/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcGreenplumIT.java#L78



##########
seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/phoenix/fakesource_to_phoenix.conf:
##########
@@ -0,0 +1,60 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+    # You can set spark configuration here
+    spark.app.name = "SeaTunnel"
+    spark.executor.instances = 2
+    spark.executor.cores = 1
+    spark.executor.memory = "1g"
+    spark.master = local
+    job.mode = "BATCH"
+}
+
+source {
+    FakeSource {
+        result_table_name = "fake"
+        field_name = "name, age"
+    }
+
+    # If you would like to get more information about how to configure seatunnel and see full list of source plugins,
+    # please go to https://seatunnel.apache.org/docs/connector-v2/source/FakeSource
+}
+
+transform {
+    sql {
+              sql = "select name, age from fake"
+            }
+
+    # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
+    # please go to https://seatunnel.apache.org/docs/transform/sql
+}
+
+sink {
+     Phoenix {
+
+            connect_url = "jdbc:phoenix:thin:url=http://flink_e2e_phoenix_sink:8765;serialization=PROTOBUF"

Review Comment:
   flink -> spark ?



##########
seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/phoenix/fakesource_to_phoenix.conf:
##########
@@ -0,0 +1,61 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+    # You can set flink configuration here
+    execution.parallelism = 1
+    job.mode = "BATCH"
+    #execution.checkpoint.interval = 10000
+    #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+    FakeSource {
+          result_table_name = "fake"
+          field_name = "name, age"
+    }
+
+    # If you would like to get more information about how to configure seatunnel and see full list of source plugins,
+    # please go to https://seatunnel.apache.org/docs/connector-v2/source/FakeSource
+}
+
+transform {
+    sql {
+          sql = "select name, age from fake"
+        }

Review Comment:
   Use new config style? reference #2406 
   
   
   explame:
   ```xml
   fields {
       name = string
       age = int
   }
   ``



##########
seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/phoenix/FakeSourceToIoPhoenixIT.java:
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.spark.v2.phoenix;
+
+import org.apache.seatunnel.e2e.spark.SparkContainer;
+
+import com.google.common.collect.Lists;
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.List;
+import java.util.stream.Stream;
+
+@Slf4j
+public class FakeSourceToIoPhoenixIT extends SparkContainer {
+
+    private static final String PHOENIX_DOCKER_IMAGE = "iteblog/hbase-phoenix-docker:1.0";
+    private static final String PHOENIX_HOST = "flink_e2e_phoenix_sink";

Review Comment:
   flink -> spark ?



##########
seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/phoenix/FakeSourceToIoPhoenixIT.java:
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.spark.v2.phoenix;
+
+import org.apache.seatunnel.e2e.spark.SparkContainer;
+
+import com.google.common.collect.Lists;
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.List;
+import java.util.stream.Stream;
+
+@Slf4j
+public class FakeSourceToIoPhoenixIT extends SparkContainer {
+
+    private static final String PHOENIX_DOCKER_IMAGE = "iteblog/hbase-phoenix-docker:1.0";
+    private static final String PHOENIX_HOST = "flink_e2e_phoenix_sink";
+    private static final int PHOENIX_PORT = 8765;

Review Comment:
   port conflict when flink、spark e2e-testcase concurrent running



##########
seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/phoenix/FakeSourceToPhoenixIT.java:
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.flink.v2.phoenix;
+
+import org.apache.seatunnel.e2e.flink.FlinkContainer;
+
+import com.google.common.collect.Lists;
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.List;
+import java.util.stream.Stream;
+
+@Slf4j
+public class FakeSourceToPhoenixIT extends FlinkContainer {
+
+    private static final String PHOENIX_DOCKER_IMAGE = "iteblog/hbase-phoenix-docker:1.0";
+    private static final String PHOENIX_HOST = "flink_e2e_phoenix_sink";
+    private static final int PHOENIX_PORT = 8765;
+    private static final String PHOENIX_CONNECT_URL = "jdbc:phoenix:thin:url=http://localhost:8765;serialization=PROTOBUF";
+    private static final String PHOENIX_JDBC_DRIVER = "org.apache.phoenix.queryserver.client.Driver";
+
+    private GenericContainer<?> phoenixServer;
+
+    @BeforeEach
+    public void startPhoenixContainer() throws Exception {
+        phoenixServer = new GenericContainer<>(PHOENIX_DOCKER_IMAGE)
+                .withNetwork(NETWORK)
+                .withNetworkAliases(PHOENIX_HOST)
+                .withLogConsumer(new Slf4jLogConsumer(log));
+        phoenixServer.setPortBindings(Lists.newArrayList(
+                String.format("%s:8765", PHOENIX_PORT)));
+        Startables.deepStart(Stream.of(phoenixServer)).join();
+        log.info("phoenix container started");
+        // wait for phoenix fully start
+        Thread.sleep(5000L);

Review Comment:
   Use awaitility ?
   
   case:
   https://github.com/apache/incubator-seatunnel/blob/dev/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcGreenplumIT.java#L78



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] 531651225 commented on a diff in pull request #2499: [Feature][Connector-V2] Add phoenix connector sink

Posted by GitBox <gi...@apache.org>.
531651225 commented on code in PR #2499:
URL: https://github.com/apache/incubator-seatunnel/pull/2499#discussion_r956750695


##########
docs/en/connector-v2/sink/Phoenix.md:
##########
@@ -0,0 +1,41 @@
+# Phoenix
+
+> Phoenix sink connector
+
+## Description
+Write Phoenix data through [Jdbc connector](Jdbc.md).
+Support Batch mode and Streaming mode. The tested Phoenix version is 4.xx and 5.xx
+On the underlying implementation, through the jdbc driver of Phoenix, execute the upsert statement to write data to HBase.
+Two ways of connecting Phoenix with Java JDBC. One is to connect to zookeeper through JDBC, and the other is to connect to queryserver through JDBC thin client.
+
+> Tips: By default, the (thin) driver jar is used. If you want to use the (thick) driver  or other versions of Phoenix (thin) driver, you need to recompile the jdbc connector module
+
+> Tips: Not support exactly-once semantics (XA transaction is not yet supported in Greenplum database).
+
+## Options
+
+### driver [string]
+if you use  phoenix (thick) driver the value is  `org.apache.phoenix.jdbc.PhoenixDriver` or you use (thin) driver  the value is `org.apache.phoenix.queryserver.client.Driver`
+
+### url [string]
+if you use  phoenix (thick) driver the value is  `jdbc:phoenix:localhost:2182/hbase` or you use (thin) driver  the value is `jdbc:phoenix:thin:url=http://localhost:8765;serialization=PROTOBUF`

Review Comment:
   > update `if you use<space><space>phoenix` to `if you use<space>phoenix`
   
   thanks,done
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] EricJoy2048 commented on pull request #2499: Phoenix connector sink

Posted by GitBox <gi...@apache.org>.
EricJoy2048 commented on PR #2499:
URL: https://github.com/apache/incubator-seatunnel/pull/2499#issuecomment-1222189993

   Please update the pr title and add the `[Connector-V2]` label.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] 531651225 commented on a diff in pull request #2499: [Feature][Connector-V2] Add phoenix connector sink

Posted by GitBox <gi...@apache.org>.
531651225 commented on code in PR #2499:
URL: https://github.com/apache/incubator-seatunnel/pull/2499#discussion_r956660900


##########
seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/phoenix/PhoenixDialectFactory.java:
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.phoenix;
+
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectFactory;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql.MysqlDialect;
+
+import com.google.auto.service.AutoService;
+import lombok.NonNull;
+
+@AutoService(JdbcDialectFactory.class)
+public class PhoenixDialectFactory implements JdbcDialectFactory {
+
+    @Override
+    public boolean acceptsURL(@NonNull String url) {
+        // Support greenplum native driver: com.pivotal.jdbc.GreenplumDriver
+        return url.startsWith("jdbc:phoenix:");
+    }
+
+    @Override
+    public JdbcDialect create() {
+        return new MysqlDialect();

Review Comment:
   > Since I haven't used Phoenix, I'm not quite sure that Phoenix can use the mysql dialect directly? For example, the Time and Blob types of Phoenix consistent with MySQL?
   Thanks for your suggestion. I re-implemented PhoenixTypeMapper.
   Refer to the https://phoenix.apache.org/language/datatypes.html.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] 531651225 commented on a diff in pull request #2499: [Feature][Connector-V2] Add phoenix connector sink

Posted by GitBox <gi...@apache.org>.
531651225 commented on code in PR #2499:
URL: https://github.com/apache/incubator-seatunnel/pull/2499#discussion_r956744400


##########
seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcPhoenixIT.java:
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.flink.v2.jdbc;
+
+import org.apache.seatunnel.e2e.flink.FlinkContainer;
+
+import com.google.common.collect.Lists;
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Stream;
+
+@Slf4j
+public class JdbcPhoenixIT extends FlinkContainer {
+
+    private static final String PHOENIX_DOCKER_IMAGE = "iteblog/hbase-phoenix-docker:1.0";
+
+    private static final String PHOENIX_CONTAINER_HOST = "flink_e2e_phoenix_sink";
+    private static final String PHOENIX_HOST = "localhost";
+
+    private static final int PHOENIX_PORT = 8764;
+    private static final int PHOENIX_CONTAINER_PORT = 8765;
+
+    private static final String PHOENIX_CONNECT_URL = String.format("jdbc:phoenix:thin:url=http://%s:%s;serialization=PROTOBUF", PHOENIX_HOST, PHOENIX_PORT);
+    private static final String PHOENIX_JDBC_DRIVER = "org.apache.phoenix.queryserver.client.Driver";
+
+    private GenericContainer<?> phoenixServer;
+
+    private Connection connection;
+
+    @BeforeEach
+    public void startPhoenixContainer() throws ClassNotFoundException, SQLException {
+        phoenixServer = new GenericContainer<>(PHOENIX_DOCKER_IMAGE)
+                .withNetwork(NETWORK)
+                .withNetworkAliases(PHOENIX_CONTAINER_HOST)
+                .withLogConsumer(new Slf4jLogConsumer(log));
+        phoenixServer.setPortBindings(Lists.newArrayList(
+                String.format("%s:%s", PHOENIX_PORT, PHOENIX_CONTAINER_PORT)));
+        Startables.deepStart(Stream.of(phoenixServer)).join();
+        initializeJdbcConnection();
+        log.info("phoenix container started");
+        initializePhoenixTable();
+        batchInsertData();
+    }
+
+    @Test
+    public void testJdbcPhoenixSourceAndSink() throws IOException, InterruptedException, SQLException {
+        Container.ExecResult execResult = executeSeaTunnelFlinkJob("/jdbc/jdbc_phoenix_source_and_sink.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+
+        // query result
+        String sql = "select age, name from test.sink order by age asc";
+        List<List> result = new ArrayList<>();
+        try (Statement statement = connection.createStatement()) {
+            ResultSet resultSet = statement.executeQuery(sql);
+            while (resultSet.next()) {
+                result.add(Arrays.asList(
+                        resultSet.getInt(1),
+                        resultSet.getString(2)));
+            }
+        }
+        Assertions.assertIterableEquals(generateTestDataset(), result);
+    }
+
+    private void initializeJdbcConnection() throws SQLException, ClassNotFoundException {
+        Class.forName(PHOENIX_JDBC_DRIVER);
+        connection = DriverManager.getConnection(PHOENIX_CONNECT_URL);
+    }
+
+    private void initializePhoenixTable() {
+        try  {
+            Statement statement = connection.createStatement();
+            String createSource = "create table test.source(\n" +
+                    " name VARCHAR PRIMARY KEY,\n" +
+                    " age INTEGER)";
+            String createSink = "create table test.sink(\n" +
+                    " name VARCHAR PRIMARY KEY,\n" +
+                    " age INTEGER)";

Review Comment:
   > Can you test more data types? reference to PhoenixTypeMapper
   
   added more phoenix data types in testcase and these types different from MySQL 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] 531651225 commented on pull request #2499: [Feature][Connector-V2] Add phoenix connector sink

Posted by GitBox <gi...@apache.org>.
531651225 commented on PR #2499:
URL: https://github.com/apache/incubator-seatunnel/pull/2499#issuecomment-1226798739

   > > > Can phoenix drivers be loaded using jdbc connector?
   > > 
   > > 
   > > Can we still use this method ? Considering that more improvements will be added later, is this more flexible and less conflict
   > 
   > case #2429
   
   Reuse JDBC connectors. Does the Phoenix connector need to be added to the connector JDBC module


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] 531651225 commented on pull request #2499: [Feature][Connector-V2] Add phoenix connector sink

Posted by GitBox <gi...@apache.org>.
531651225 commented on PR #2499:
URL: https://github.com/apache/incubator-seatunnel/pull/2499#issuecomment-1229484396

   > Does Phoenix support XA transactions? If unsupported then add note to documentation
   
   i have added note to documentation in  new commit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] hailin0 commented on a diff in pull request #2499: [Feature][Connector-V2] Add phoenix connector sink

Posted by GitBox <gi...@apache.org>.
hailin0 commented on code in PR #2499:
URL: https://github.com/apache/incubator-seatunnel/pull/2499#discussion_r956722750


##########
seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcPhoenixIT.java:
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.flink.v2.jdbc;
+
+import org.apache.seatunnel.e2e.flink.FlinkContainer;
+
+import com.google.common.collect.Lists;
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Stream;
+
+@Slf4j
+public class JdbcPhoenixIT extends FlinkContainer {
+
+    private static final String PHOENIX_DOCKER_IMAGE = "iteblog/hbase-phoenix-docker:1.0";
+
+    private static final String PHOENIX_CONTAINER_HOST = "flink_e2e_phoenix_sink";
+    private static final String PHOENIX_HOST = "localhost";
+
+    private static final int PHOENIX_PORT = 8764;
+    private static final int PHOENIX_CONTAINER_PORT = 8765;
+
+    private static final String PHOENIX_CONNECT_URL = String.format("jdbc:phoenix:thin:url=http://%s:%s;serialization=PROTOBUF", PHOENIX_HOST, PHOENIX_PORT);
+    private static final String PHOENIX_JDBC_DRIVER = "org.apache.phoenix.queryserver.client.Driver";
+
+    private GenericContainer<?> phoenixServer;
+
+    private Connection connection;
+
+    @BeforeEach
+    public void startPhoenixContainer() throws ClassNotFoundException, SQLException {
+        phoenixServer = new GenericContainer<>(PHOENIX_DOCKER_IMAGE)
+                .withNetwork(NETWORK)
+                .withNetworkAliases(PHOENIX_CONTAINER_HOST)
+                .withLogConsumer(new Slf4jLogConsumer(log));
+        phoenixServer.setPortBindings(Lists.newArrayList(
+                String.format("%s:%s", PHOENIX_PORT, PHOENIX_CONTAINER_PORT)));
+        Startables.deepStart(Stream.of(phoenixServer)).join();
+        initializeJdbcConnection();
+        log.info("phoenix container started");
+        initializePhoenixTable();
+        batchInsertData();
+    }
+
+    @Test
+    public void testJdbcPhoenixSourceAndSink() throws IOException, InterruptedException, SQLException {
+        Container.ExecResult execResult = executeSeaTunnelFlinkJob("/jdbc/jdbc_phoenix_source_and_sink.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+
+        // query result
+        String sql = "select age, name from test.sink order by age asc";
+        List<List> result = new ArrayList<>();
+        try (Statement statement = connection.createStatement()) {
+            ResultSet resultSet = statement.executeQuery(sql);
+            while (resultSet.next()) {
+                result.add(Arrays.asList(
+                        resultSet.getInt(1),
+                        resultSet.getString(2)));
+            }
+        }
+        Assertions.assertIterableEquals(generateTestDataset(), result);
+    }
+
+    private void initializeJdbcConnection() throws SQLException, ClassNotFoundException {
+        Class.forName(PHOENIX_JDBC_DRIVER);
+        connection = DriverManager.getConnection(PHOENIX_CONNECT_URL);
+    }
+
+    private void initializePhoenixTable() {
+        try  {
+            Statement statement = connection.createStatement();
+            String createSource = "create table test.source(\n" +
+                    " name VARCHAR PRIMARY KEY,\n" +
+                    " age INTEGER)";
+            String createSink = "create table test.sink(\n" +
+                    " name VARCHAR PRIMARY KEY,\n" +
+                    " age INTEGER)";

Review Comment:
   Can you test more data types? reference to PhoenixTypeMapper



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] ashulin commented on a diff in pull request #2499: [Feature][Connector-V2] Add phoenix connector sink

Posted by GitBox <gi...@apache.org>.
ashulin commented on code in PR #2499:
URL: https://github.com/apache/incubator-seatunnel/pull/2499#discussion_r951645510


##########
seatunnel-connectors-v2/connector-phoenix/src/main/java/org/apache/seatunnel/connectors/seatunnel/phoenix/client/PhoenixStatementExecutor.java:
##########
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.phoenix.client;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.connectors.seatunnel.phoenix.config.PhoenixSinkConfig;
+import org.apache.seatunnel.connectors.seatunnel.phoenix.constant.Constant;
+import org.apache.seatunnel.connectors.seatunnel.phoenix.constant.NullModeType;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.math.BigDecimal;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.List;
+
+public class PhoenixStatementExecutor {
+    private static final Logger LOG = LoggerFactory.getLogger(PhoenixStatementExecutor.class);
+    private final PhoenixSinkConfig phoenixWriteConfig;
+    private final List<SeaTunnelRow> batch;
+    private transient PreparedStatement st;
+    private int[] columnTypes;
+
+    public PhoenixStatementExecutor(
+            PhoenixSinkConfig phoenixWriteConfig, int[] columnTypes) {
+        this.phoenixWriteConfig = phoenixWriteConfig;
+        this.batch = new ArrayList<>();
+        this.columnTypes = columnTypes;
+    }
+
+    public void prepareStatements(Connection connection) throws SQLException {
+        this.st = createPreparedStatement(connection);
+    }
+
+    public void addToBatch(SeaTunnelRow record) {
+        batch.add(record);
+    }
+
+    public void executeBatch() throws SQLException {
+        if (!batch.isEmpty()) {
+            for (SeaTunnelRow record : batch) {
+                setupStatement(st, record);
+                st.addBatch();
+            }
+            st.executeBatch();
+            batch.clear();
+        }
+    }
+
+    public void closeStatements() throws SQLException {
+        if (st != null) {
+            st.close();
+            st = null;
+        }
+    }
+
+    private PreparedStatement createPreparedStatement(Connection connection) throws SQLException {
+        StringBuilder columnNamesBuilder = new StringBuilder();
+        if (phoenixWriteConfig.isThinClient()) {
+            for (String col : phoenixWriteConfig.getColumns()) {
+                // thin client does not use double quotes
+                columnNamesBuilder.append(col);
+                columnNamesBuilder.append(",");
+            }
+        } else {
+            for (String col : phoenixWriteConfig.getColumns()) {
+                // Column names use double quotes
+                columnNamesBuilder.append("\"");
+                columnNamesBuilder.append(col);
+                columnNamesBuilder.append("\"");
+                columnNamesBuilder.append(",");
+            }
+        }
+        // The table name uses double quotation marks
+        columnNamesBuilder.setLength(columnNamesBuilder.length() - 1);
+        String columnNames = columnNamesBuilder.toString();
+        // Generate upsert template
+        String tableName = phoenixWriteConfig.getTableName();
+        StringBuilder upsertBuilder = null;
+        if (phoenixWriteConfig.isThinClient()) {
+            upsertBuilder = new StringBuilder("upsert into " + tableName + " (" + columnNames + " ) values (");
+        } else {
+            // The table name uses double quotation marks
+            upsertBuilder = new StringBuilder("upsert into \"" + tableName + "\" (" + columnNames + " ) values (");
+        }
+        for (int i = 0; i < phoenixWriteConfig.getColumns().size(); i++) {
+            upsertBuilder.append("?,");
+        }
+        //Remove the extra comma at the end
+        upsertBuilder.setLength(upsertBuilder.length() - 1);
+        upsertBuilder.append(")");
+
+        String sql = upsertBuilder.toString();
+        PreparedStatement ps = connection.prepareStatement(sql);
+        LOG.debug("SQL template generated: " + sql);
+        return ps;
+    }
+
+    private void setupStatement(PreparedStatement upload, SeaTunnelRow row) throws SQLException {
+        if (columnTypes != null && columnTypes.length > 0 && columnTypes.length != row.getFields().length) {
+            LOG.warn("Column SQL types array doesn't match arity of passed Row! Check the passed array...");
+        }
+        List<Integer> sinkColumnsIndex = phoenixWriteConfig.getSinkColumnsIndexInRow();
+        if (columnTypes == null) {
+            // no types provided
+            for (int index = 0; index < sinkColumnsIndex.size(); index++) {
+                upload.setObject(index + 1, row.getFields()[index]);
+            }
+        } else {
+            // types provided
+            for (int i = 0; i < sinkColumnsIndex.size(); i++) {
+                int sqlType = columnTypes[i];
+                setupColumn(upload, i + 1, sqlType, row.getFields()[i]);
+            }
+        }
+    }
+
+    private void setupColumn(PreparedStatement ps, int pos, int sqlType, Object col) throws SQLException {
+        if (col != null) {
+            switch (sqlType) {
+                case Types.CHAR:
+                case Types.VARCHAR:
+                    ps.setString(pos, (String) col);
+                    break;
+
+                case Types.BINARY:
+                case Types.VARBINARY:
+                    ps.setBytes(pos, (byte[]) col);
+                    break;
+
+                case Types.BOOLEAN:
+                    ps.setBoolean(pos, (Boolean) col);
+                    break;
+
+                case Types.TINYINT:
+                case Constant.TYPE_UNSIGNED_TINYINT:
+                    ps.setByte(pos, ((Long) col).byteValue());
+                    break;
+
+                case Types.SMALLINT:
+                case Constant.TYPE_UNSIGNED_SMALLINT:
+                    ps.setShort(pos, ((Long) col).shortValue());
+                    break;
+
+                case Types.INTEGER:
+                case Constant.TYPE_UNSIGNED_INTEGER:
+                    ps.setInt(pos, ((Long) col).intValue());
+                    break;
+
+                case Types.BIGINT:
+                case Constant.TYPE_UNSIGNED_LONG:
+                    ps.setLong(pos, (Long) col);
+                    break;
+
+                case Types.FLOAT:
+                    ps.setFloat(pos, ((Double) col).floatValue());
+                    break;
+
+                case Types.DOUBLE:
+                    ps.setDouble(pos, (Double) col);
+                    break;
+
+                case Types.DECIMAL:
+                    ps.setBigDecimal(pos, (BigDecimal) col);
+                    break;
+
+                case Types.DATE:
+                case Constant.TYPE_UNSIGNED_DATE:
+                    ps.setDate(pos, new java.sql.Date(new Date((Long) col).getTime()));
+                    break;
+
+                case Types.TIME:
+                case Constant.TYPE_UNSIGNED_TIME:
+                    ps.setTime(pos, new java.sql.Time(new Date((Long) col).getTime()));
+                    break;
+
+                case Types.TIMESTAMP:
+                case Constant.TYPE_UNSIGNED_TIMESTAMP:
+                    ps.setTimestamp(pos, new java.sql.Timestamp(new Date((Long) col).getTime()));
+                    break;

Review Comment:
   The time object is processed as a Local* time class in Seatunnel.
   you can see https://github.com/apache/incubator-seatunnel/issues/2024 or source code of data type.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] hailin0 commented on a diff in pull request #2499: [Feature][Connector-V2] Add phoenix connector sink

Posted by GitBox <gi...@apache.org>.
hailin0 commented on code in PR #2499:
URL: https://github.com/apache/incubator-seatunnel/pull/2499#discussion_r954512439


##########
seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/phoenix/FakeSourceToIoPhoenixIT.java:
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.spark.v2.phoenix;
+
+import org.apache.seatunnel.e2e.spark.SparkContainer;
+
+import com.google.common.collect.Lists;
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.List;
+import java.util.stream.Stream;
+
+@Slf4j
+public class FakeSourceToIoPhoenixIT extends SparkContainer {
+
+    private static final String PHOENIX_DOCKER_IMAGE = "iteblog/hbase-phoenix-docker:1.0";
+    private static final String PHOENIX_HOST = "flink_e2e_phoenix_sink";
+    private static final int PHOENIX_PORT = 8765;

Review Comment:
   use new port?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] hailin0 commented on a diff in pull request #2499: [Feature][Connector-V2] Add phoenix connector sink

Posted by GitBox <gi...@apache.org>.
hailin0 commented on code in PR #2499:
URL: https://github.com/apache/incubator-seatunnel/pull/2499#discussion_r952175297


##########
seatunnel-connectors-v2/connector-phoenix/src/main/java/org/apache/seatunnel/connectors/seatunnel/phoenix/client/PhoenixOutputFormat.java:
##########
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.phoenix.client;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.connectors.seatunnel.phoenix.config.PhoenixSinkConfig;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class PhoenixOutputFormat {
+
+    private static final long serialVersionUID = 1L;

Review Comment:
   remove serialVersionUID ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] 531651225 commented on a diff in pull request #2499: [Feature][Connector-V2] Add phoenix connector sink

Posted by GitBox <gi...@apache.org>.
531651225 commented on code in PR #2499:
URL: https://github.com/apache/incubator-seatunnel/pull/2499#discussion_r956660981


##########
seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/phoenix/PhoenixDialectFactory.java:
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.phoenix;
+
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectFactory;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql.MysqlDialect;
+
+import com.google.auto.service.AutoService;
+import lombok.NonNull;
+
+@AutoService(JdbcDialectFactory.class)
+public class PhoenixDialectFactory implements JdbcDialectFactory {
+
+    @Override
+    public boolean acceptsURL(@NonNull String url) {
+        // Support greenplum native driver: com.pivotal.jdbc.GreenplumDriver
+        return url.startsWith("jdbc:phoenix:");
+    }
+
+    @Override
+    public JdbcDialect create() {
+        return new MysqlDialect();

Review Comment:
   > Since I haven't used Phoenix, I'm not quite sure that Phoenix can use the mysql dialect directly? For example, the Time and Blob types of Phoenix consistent with MySQL?
   
   Thanks for your suggestion. I re-implemented PhoenixTypeMapper.
   Refer to the https://phoenix.apache.org/language/datatypes.html.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] hailin0 commented on pull request #2499: [Feature][Connector-V2] Add phoenix connector sink

Posted by GitBox <gi...@apache.org>.
hailin0 commented on PR #2499:
URL: https://github.com/apache/incubator-seatunnel/pull/2499#issuecomment-1226834187

   > > > > Can phoenix drivers be loaded using jdbc connector?
   > > > 
   > > > 
   > > > Can we still use this method ? Considering that more improvements will be added later, is this more flexible and less conflict
   > > 
   > > 
   > > case #2429
   > 
   > Reuse JDBC connectors. Does the Phoenix connector need to be added to the connector JDBC module
   
   
   You need add database dialect to here
   
   https://github.com/apache/incubator-seatunnel/tree/dev/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] 531651225 commented on pull request #2499: [Feature][Connector-V2] Add phoenix connector sink

Posted by GitBox <gi...@apache.org>.
531651225 commented on PR #2499:
URL: https://github.com/apache/incubator-seatunnel/pull/2499#issuecomment-1224205001

   > Can you add connector-v2 e2e-testcase?
   > 
   > reference: https://github.com/apache/incubator-seatunnel/tree/dev/seatunnel-e2e/seatunnel-flink-connector-v2-e2e https://github.com/apache/incubator-seatunnel/tree/dev/seatunnel-e2e/seatunnel-spark-connector-v2-e2e
   
   Ok, I will try to add it later. But the Phoenix Docker image is very large, I wonder if there is any problem


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] 531651225 commented on pull request #2499: [Feature][Connector-V2] Add phoenix connector sink

Posted by GitBox <gi...@apache.org>.
531651225 commented on PR #2499:
URL: https://github.com/apache/incubator-seatunnel/pull/2499#issuecomment-1222192114

   > Please update the pr title and add the `[Connector-V2]` label.
   
   ok,done


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] 531651225 commented on a diff in pull request #2499: [Feature][Connector-V2] Add phoenix connector sink

Posted by GitBox <gi...@apache.org>.
531651225 commented on code in PR #2499:
URL: https://github.com/apache/incubator-seatunnel/pull/2499#discussion_r952745396


##########
seatunnel-connectors-v2/connector-phoenix/src/main/java/org/apache/seatunnel/connectors/seatunnel/phoenix/client/PhoenixOutputFormat.java:
##########
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.phoenix.client;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.connectors.seatunnel.phoenix.config.PhoenixSinkConfig;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class PhoenixOutputFormat {
+
+    private static final long serialVersionUID = 1L;

Review Comment:
   ok, i have removed in new commit



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] hailin0 commented on pull request #2499: [Feature][Connector-V2] Add phoenix connector sink

Posted by GitBox <gi...@apache.org>.
hailin0 commented on PR #2499:
URL: https://github.com/apache/incubator-seatunnel/pull/2499#issuecomment-1223442418

   Can you add connector-v2 e2e-testcase?
   
   reference:
   https://github.com/apache/incubator-seatunnel/tree/dev/seatunnel-e2e/seatunnel-flink-connector-v2-e2e
   https://github.com/apache/incubator-seatunnel/tree/dev/seatunnel-e2e/seatunnel-spark-connector-v2-e2e


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] 531651225 commented on a diff in pull request #2499: [Feature][Connector-V2] Add phoenix connector sink

Posted by GitBox <gi...@apache.org>.
531651225 commented on code in PR #2499:
URL: https://github.com/apache/incubator-seatunnel/pull/2499#discussion_r954470721


##########
seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/phoenix/FakeSourceToPhoenixIT.java:
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.flink.v2.phoenix;
+
+import org.apache.seatunnel.e2e.flink.FlinkContainer;
+
+import com.google.common.collect.Lists;
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.List;
+import java.util.stream.Stream;
+
+@Slf4j
+public class FakeSourceToPhoenixIT extends FlinkContainer {
+
+    private static final String PHOENIX_DOCKER_IMAGE = "iteblog/hbase-phoenix-docker:1.0";
+    private static final String PHOENIX_HOST = "flink_e2e_phoenix_sink";
+    private static final int PHOENIX_PORT = 8765;
+    private static final String PHOENIX_CONNECT_URL = "jdbc:phoenix:thin:url=http://localhost:8765;serialization=PROTOBUF";
+    private static final String PHOENIX_JDBC_DRIVER = "org.apache.phoenix.queryserver.client.Driver";
+
+    private GenericContainer<?> phoenixServer;
+
+    @BeforeEach
+    public void startPhoenixContainer() throws Exception {
+        phoenixServer = new GenericContainer<>(PHOENIX_DOCKER_IMAGE)
+                .withNetwork(NETWORK)
+                .withNetworkAliases(PHOENIX_HOST)
+                .withLogConsumer(new Slf4jLogConsumer(log));
+        phoenixServer.setPortBindings(Lists.newArrayList(
+                String.format("%s:8765", PHOENIX_PORT)));
+        Startables.deepStart(Stream.of(phoenixServer)).join();
+        log.info("phoenix container started");
+        // wait for phoenix fully start
+        Thread.sleep(5000L);

Review Comment:
   I've removed it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] 531651225 commented on pull request #2499: [Feature][Connector-V2] Add phoenix connector sink

Posted by GitBox <gi...@apache.org>.
531651225 commented on PR #2499:
URL: https://github.com/apache/incubator-seatunnel/pull/2499#issuecomment-1229643272

   ![图片](https://user-images.githubusercontent.com/33744252/187106244-a03bc222-95ac-41c9-8e97-623c6126f4a3.png)
   The CodeQL/Analyze (Java) step in CI was cancelled,. I think the overall compilation of the project timed out. How to solve it? @ashulin @hailin0 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] 531651225 commented on a diff in pull request #2499: [Feature][Connector-V2] Add phoenix connector sink

Posted by GitBox <gi...@apache.org>.
531651225 commented on code in PR #2499:
URL: https://github.com/apache/incubator-seatunnel/pull/2499#discussion_r956497782


##########
seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/phoenix/FakeSourceToIoPhoenixIT.java:
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.spark.v2.phoenix;
+
+import org.apache.seatunnel.e2e.spark.SparkContainer;
+
+import com.google.common.collect.Lists;
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.List;
+import java.util.stream.Stream;
+
+@Slf4j
+public class FakeSourceToIoPhoenixIT extends SparkContainer {
+
+    private static final String PHOENIX_DOCKER_IMAGE = "iteblog/hbase-phoenix-docker:1.0";
+    private static final String PHOENIX_HOST = "flink_e2e_phoenix_sink";
+    private static final int PHOENIX_PORT = 8765;

Review Comment:
   ok



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] 531651225 commented on a diff in pull request #2499: [Feature][Connector-V2] Add phoenix connector sink

Posted by GitBox <gi...@apache.org>.
531651225 commented on code in PR #2499:
URL: https://github.com/apache/incubator-seatunnel/pull/2499#discussion_r954470801


##########
seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/phoenix/fakesource_to_phoenix.conf:
##########
@@ -0,0 +1,60 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+    # You can set spark configuration here
+    spark.app.name = "SeaTunnel"
+    spark.executor.instances = 2
+    spark.executor.cores = 1
+    spark.executor.memory = "1g"
+    spark.master = local
+    job.mode = "BATCH"
+}
+
+source {
+    FakeSource {
+        result_table_name = "fake"
+        field_name = "name, age"
+    }
+
+    # If you would like to get more information about how to configure seatunnel and see full list of source plugins,
+    # please go to https://seatunnel.apache.org/docs/connector-v2/source/FakeSource
+}
+
+transform {
+    sql {
+              sql = "select name, age from fake"
+            }
+
+    # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
+    # please go to https://seatunnel.apache.org/docs/transform/sql
+}
+
+sink {
+     Phoenix {
+
+            connect_url = "jdbc:phoenix:thin:url=http://flink_e2e_phoenix_sink:8765;serialization=PROTOBUF"

Review Comment:
   I've fix it
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] 531651225 commented on a diff in pull request #2499: [Feature][Connector-V2] Add phoenix connector sink

Posted by GitBox <gi...@apache.org>.
531651225 commented on code in PR #2499:
URL: https://github.com/apache/incubator-seatunnel/pull/2499#discussion_r954472578


##########
seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/phoenix/fakesource_to_phoenix.conf:
##########
@@ -0,0 +1,61 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+    # You can set flink configuration here
+    execution.parallelism = 1
+    job.mode = "BATCH"
+    #execution.checkpoint.interval = 10000
+    #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+    FakeSource {
+          result_table_name = "fake"
+          field_name = "name, age"
+    }
+
+    # If you would like to get more information about how to configure seatunnel and see full list of source plugins,
+    # please go to https://seatunnel.apache.org/docs/connector-v2/source/FakeSource
+}
+
+transform {
+    sql {
+          sql = "select name, age from fake"
+        }

Review Comment:
   i think i have fix it,Is there a command to check



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] hailin0 commented on pull request #2499: [Feature][Connector-V2] Add phoenix connector sink

Posted by GitBox <gi...@apache.org>.
hailin0 commented on PR #2499:
URL: https://github.com/apache/incubator-seatunnel/pull/2499#issuecomment-1226790123

   > > Can phoenix drivers be loaded using jdbc connector?
   > 
   > Can we still use this method ? Considering that more improvements will be added later, is this more flexible and less conflict
   
   case https://github.com/apache/incubator-seatunnel/pull/2429


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] ashulin commented on a diff in pull request #2499: [Feature][Connector-V2] Add phoenix connector sink

Posted by GitBox <gi...@apache.org>.
ashulin commented on code in PR #2499:
URL: https://github.com/apache/incubator-seatunnel/pull/2499#discussion_r956596900


##########
seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/phoenix/PhoenixDialectFactory.java:
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.phoenix;
+
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectFactory;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql.MysqlDialect;
+
+import com.google.auto.service.AutoService;
+import lombok.NonNull;
+
+@AutoService(JdbcDialectFactory.class)
+public class PhoenixDialectFactory implements JdbcDialectFactory {
+
+    @Override
+    public boolean acceptsURL(@NonNull String url) {
+        // Support greenplum native driver: com.pivotal.jdbc.GreenplumDriver
+        return url.startsWith("jdbc:phoenix:");
+    }
+
+    @Override
+    public JdbcDialect create() {
+        return new MysqlDialect();

Review Comment:
   Since I haven't used Phoenix, I'm not quite sure that Phoenix can use the mysql dialect directly? For example, the Time and Blob types of Phoenix consistent with MySQL?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] CalvinKirs merged pull request #2499: [Feature][Connector-V2] Add phoenix connector sink

Posted by GitBox <gi...@apache.org>.
CalvinKirs merged PR #2499:
URL: https://github.com/apache/incubator-seatunnel/pull/2499


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] 531651225 commented on a diff in pull request #2499: [Feature][Connector-V2] Add phoenix connector sink

Posted by GitBox <gi...@apache.org>.
531651225 commented on code in PR #2499:
URL: https://github.com/apache/incubator-seatunnel/pull/2499#discussion_r956744439


##########
seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/phoenix/PhoenixDialectFactory.java:
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.phoenix;
+
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectFactory;
+
+import com.google.auto.service.AutoService;
+import lombok.NonNull;
+
+@AutoService(JdbcDialectFactory.class)
+public class PhoenixDialectFactory implements JdbcDialectFactory {
+
+    @Override
+    public boolean acceptsURL(@NonNull String url) {
+        // Support greenplum native driver: com.pivotal.jdbc.GreenplumDriver

Review Comment:
   i have fixed



##########
seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/phoenix/PhoenixTypeMapper.java:
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.phoenix;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+
+public class PhoenixTypeMapper implements JdbcDialectTypeMapper {
+
+    private static final Logger LOG = LoggerFactory.getLogger(JdbcDialect.class);

Review Comment:
   i have fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] hailin0 commented on pull request #2499: [Feature][Connector-V2] Add phoenix connector sink

Posted by GitBox <gi...@apache.org>.
hailin0 commented on PR #2499:
URL: https://github.com/apache/incubator-seatunnel/pull/2499#issuecomment-1229623396

   LGTM


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] hailin0 commented on pull request #2499: [Feature][Connector-V2] Add phoenix connector sink

Posted by GitBox <gi...@apache.org>.
hailin0 commented on PR #2499:
URL: https://github.com/apache/incubator-seatunnel/pull/2499#issuecomment-1226709660

   Can phoenix drivers be loaded using jdbc connector?  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] 531651225 commented on a diff in pull request #2499: [Feature][Connector-V2] Add phoenix connector sink

Posted by GitBox <gi...@apache.org>.
531651225 commented on code in PR #2499:
URL: https://github.com/apache/incubator-seatunnel/pull/2499#discussion_r954471018


##########
seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/phoenix/FakeSourceToIoPhoenixIT.java:
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.spark.v2.phoenix;
+
+import org.apache.seatunnel.e2e.spark.SparkContainer;
+
+import com.google.common.collect.Lists;
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.List;
+import java.util.stream.Stream;
+
+@Slf4j
+public class FakeSourceToIoPhoenixIT extends SparkContainer {
+
+    private static final String PHOENIX_DOCKER_IMAGE = "iteblog/hbase-phoenix-docker:1.0";
+    private static final String PHOENIX_HOST = "flink_e2e_phoenix_sink";
+    private static final int PHOENIX_PORT = 8765;
+    private static final String PHOENIX_CONNECT_URL = "jdbc:phoenix:thin:url=http://localhost:8765;serialization=PROTOBUF";
+    private static final String PHOENIX_JDBC_DRIVER = "org.apache.phoenix.queryserver.client.Driver";
+
+    private GenericContainer<?> phoenixServer;
+
+    @BeforeEach
+    public void startPhoenixContainer() throws Exception {
+        phoenixServer = new GenericContainer<>(PHOENIX_DOCKER_IMAGE)
+                .withNetwork(NETWORK)
+                .withNetworkAliases(PHOENIX_HOST)
+                .withLogConsumer(new Slf4jLogConsumer(log));
+        phoenixServer.setPortBindings(Lists.newArrayList(
+                String.format("%s:8765", PHOENIX_PORT)));
+        Startables.deepStart(Stream.of(phoenixServer)).join();
+        log.info("phoenix container started");
+        // wait for phoenix fully start
+        Thread.sleep(5000L);

Review Comment:
   I've fix it
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] 531651225 commented on a diff in pull request #2499: [Feature][Connector-V2] Add phoenix connector sink

Posted by GitBox <gi...@apache.org>.
531651225 commented on code in PR #2499:
URL: https://github.com/apache/incubator-seatunnel/pull/2499#discussion_r954472357


##########
seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/phoenix/FakeSourceToIoPhoenixIT.java:
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.spark.v2.phoenix;
+
+import org.apache.seatunnel.e2e.spark.SparkContainer;
+
+import com.google.common.collect.Lists;
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.List;
+import java.util.stream.Stream;
+
+@Slf4j
+public class FakeSourceToIoPhoenixIT extends SparkContainer {
+
+    private static final String PHOENIX_DOCKER_IMAGE = "iteblog/hbase-phoenix-docker:1.0";
+    private static final String PHOENIX_HOST = "flink_e2e_phoenix_sink";
+    private static final int PHOENIX_PORT = 8765;

Review Comment:
   Docker image uses a fixed port, so the current solution is to comment out the E2E of spark



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] 531651225 commented on a diff in pull request #2499: [Feature][Connector-V2] Add phoenix connector sink

Posted by GitBox <gi...@apache.org>.
531651225 commented on code in PR #2499:
URL: https://github.com/apache/incubator-seatunnel/pull/2499#discussion_r954471312


##########
seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/phoenix/fakesource_to_phoenix.conf:
##########
@@ -0,0 +1,60 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+    # You can set spark configuration here
+    spark.app.name = "SeaTunnel"
+    spark.executor.instances = 2
+    spark.executor.cores = 1
+    spark.executor.memory = "1g"
+    spark.master = local
+    job.mode = "BATCH"
+}
+
+source {
+    FakeSource {
+        result_table_name = "fake"
+        field_name = "name, age"

Review Comment:
   I've used new config style in new commit
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] 531651225 commented on pull request #2499: [Feature][Connector-V2] Add phoenix connector sink

Posted by GitBox <gi...@apache.org>.
531651225 commented on PR #2499:
URL: https://github.com/apache/incubator-seatunnel/pull/2499#issuecomment-1226685119

   > Can you add connector-v2 e2e-testcase?
   > 
   > reference: https://github.com/apache/incubator-seatunnel/tree/dev/seatunnel-e2e/seatunnel-flink-connector-v2-e2e https://github.com/apache/incubator-seatunnel/tree/dev/seatunnel-e2e/seatunnel-spark-connector-v2-e2e
   
    E2E has been added and passed the test 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] 531651225 commented on pull request #2499: [Feature][Connector-V2] Add phoenix connector sink

Posted by GitBox <gi...@apache.org>.
531651225 commented on PR #2499:
URL: https://github.com/apache/incubator-seatunnel/pull/2499#issuecomment-1226738663

   > Can phoenix drivers be loaded using jdbc connector?
   
    Can we still use this method ? Considering that more improvements will be added later, is this more flexible and less conflict


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] 531651225 commented on pull request #2499: [Feature][Connector-V2] Add phoenix connector sink

Posted by GitBox <gi...@apache.org>.
531651225 commented on PR #2499:
URL: https://github.com/apache/incubator-seatunnel/pull/2499#issuecomment-1229037835

   > > > > > Can phoenix drivers be loaded using jdbc connector?
   > > > > 
   > > > > 
   > > > > Can we still use this method ? Considering that more improvements will be added later, is this more flexible and less conflict
   > > > 
   > > > 
   > > > case #2429
   > > 
   > > 
   > > Reuse JDBC connectors. Does the Phoenix connector need to be added to the connector JDBC module
   > 
   > You need add database dialect to here
   > 
   > https://github.com/apache/incubator-seatunnel/tree/dev/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect
   
   
   
   > > > > > Can phoenix drivers be loaded using jdbc connector?
   > > > > 
   > > > > 
   > > > > Can we still use this method ? Considering that more improvements will be added later, is this more flexible and less conflict
   > > > 
   > > > 
   > > > case #2429
   > > 
   > > 
   > > Reuse JDBC connectors. Does the Phoenix connector need to be added to the connector JDBC module
   > 
   > You need add database dialect to here
   > 
   > https://github.com/apache/incubator-seatunnel/tree/dev/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect
   
   Thanks for your advice, I have re implemented the phoenix connector reuse  jdbc connector in new commit. and and spark and flink e2e  and pass the test


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] hailin0 commented on a diff in pull request #2499: [Feature][Connector-V2] Add phoenix connector sink

Posted by GitBox <gi...@apache.org>.
hailin0 commented on code in PR #2499:
URL: https://github.com/apache/incubator-seatunnel/pull/2499#discussion_r956747053


##########
docs/en/connector-v2/sink/Phoenix.md:
##########
@@ -0,0 +1,41 @@
+# Phoenix
+
+> Phoenix sink connector
+
+## Description
+Write Phoenix data through [Jdbc connector](Jdbc.md).
+Support Batch mode and Streaming mode. The tested Phoenix version is 4.xx and 5.xx
+On the underlying implementation, through the jdbc driver of Phoenix, execute the upsert statement to write data to HBase.
+Two ways of connecting Phoenix with Java JDBC. One is to connect to zookeeper through JDBC, and the other is to connect to queryserver through JDBC thin client.
+
+> Tips: By default, the (thin) driver jar is used. If you want to use the (thick) driver  or other versions of Phoenix (thin) driver, you need to recompile the jdbc connector module
+
+> Tips: Not support exactly-once semantics (XA transaction is not yet supported in Greenplum database).

Review Comment:
   update `Greenplum` to `Phoenix`?



##########
docs/en/connector-v2/sink/Phoenix.md:
##########
@@ -0,0 +1,41 @@
+# Phoenix
+
+> Phoenix sink connector
+
+## Description
+Write Phoenix data through [Jdbc connector](Jdbc.md).
+Support Batch mode and Streaming mode. The tested Phoenix version is 4.xx and 5.xx
+On the underlying implementation, through the jdbc driver of Phoenix, execute the upsert statement to write data to HBase.
+Two ways of connecting Phoenix with Java JDBC. One is to connect to zookeeper through JDBC, and the other is to connect to queryserver through JDBC thin client.
+
+> Tips: By default, the (thin) driver jar is used. If you want to use the (thick) driver  or other versions of Phoenix (thin) driver, you need to recompile the jdbc connector module
+
+> Tips: Not support exactly-once semantics (XA transaction is not yet supported in Greenplum database).
+
+## Options
+
+### driver [string]
+if you use  phoenix (thick) driver the value is  `org.apache.phoenix.jdbc.PhoenixDriver` or you use (thin) driver  the value is `org.apache.phoenix.queryserver.client.Driver`
+
+### url [string]
+if you use  phoenix (thick) driver the value is  `jdbc:phoenix:localhost:2182/hbase` or you use (thin) driver  the value is `jdbc:phoenix:thin:url=http://localhost:8765;serialization=PROTOBUF`

Review Comment:
   update `if you use<space><space>phoenix` to `if you use<space>phoenix`



##########
docs/en/connector-v2/source/Phoenix.md:
##########
@@ -0,0 +1,39 @@
+# Phoenix
+
+> Phoenix source connector
+
+## Description
+Read Phoenix data through [Jdbc connector](Jdbc.md).
+Support Batch mode and Streaming mode. The tested Phoenix version is 4.xx and 5.xx
+On the underlying implementation, through the jdbc driver of Phoenix, execute the upsert statement to write data to HBase.
+Two ways of connecting Phoenix with Java JDBC. One is to connect to zookeeper through JDBC, and the other is to connect to queryserver through JDBC thin client.
+
+> Tips: By default, the (thin) driver jar is used. If you want to use the (thick) driver  or other versions of Phoenix (thin) driver, you need to recompile the jdbc connector module
+
+## Options
+
+### driver [string]
+if you use  phoenix (thick) driver the value is  `org.apache.phoenix.jdbc.PhoenixDriver` or you use (thin) driver  the value is `org.apache.phoenix.queryserver.client.Driver`
+
+### url [string]
+if you use  phoenix (thick) driver the value is  `jdbc:phoenix:localhost:2182/hbase` or you use (thin) driver  the value is `jdbc:phoenix:thin:url=http://localhost:8765;serialization=PROTOBUF`

Review Comment:
   as above



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] 531651225 commented on a diff in pull request #2499: [Feature][Connector-V2] Add phoenix connector sink

Posted by GitBox <gi...@apache.org>.
531651225 commented on code in PR #2499:
URL: https://github.com/apache/incubator-seatunnel/pull/2499#discussion_r956750487


##########
docs/en/connector-v2/source/Phoenix.md:
##########
@@ -0,0 +1,39 @@
+# Phoenix
+
+> Phoenix source connector
+
+## Description
+Read Phoenix data through [Jdbc connector](Jdbc.md).
+Support Batch mode and Streaming mode. The tested Phoenix version is 4.xx and 5.xx
+On the underlying implementation, through the jdbc driver of Phoenix, execute the upsert statement to write data to HBase.
+Two ways of connecting Phoenix with Java JDBC. One is to connect to zookeeper through JDBC, and the other is to connect to queryserver through JDBC thin client.
+
+> Tips: By default, the (thin) driver jar is used. If you want to use the (thick) driver  or other versions of Phoenix (thin) driver, you need to recompile the jdbc connector module
+
+## Options
+
+### driver [string]
+if you use  phoenix (thick) driver the value is  `org.apache.phoenix.jdbc.PhoenixDriver` or you use (thin) driver  the value is `org.apache.phoenix.queryserver.client.Driver`
+
+### url [string]
+if you use  phoenix (thick) driver the value is  `jdbc:phoenix:localhost:2182/hbase` or you use (thin) driver  the value is `jdbc:phoenix:thin:url=http://localhost:8765;serialization=PROTOBUF`

Review Comment:
   thanks,done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] hailin0 commented on a diff in pull request #2499: [Feature][Connector-V2] Add phoenix connector sink

Posted by GitBox <gi...@apache.org>.
hailin0 commented on code in PR #2499:
URL: https://github.com/apache/incubator-seatunnel/pull/2499#discussion_r956719779


##########
seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/phoenix/PhoenixTypeMapper.java:
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.phoenix;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+
+public class PhoenixTypeMapper implements JdbcDialectTypeMapper {
+
+    private static final Logger LOG = LoggerFactory.getLogger(JdbcDialect.class);

Review Comment:
   Use `LoggerFactory.getLogger(PhoenixTypeMapper.class)` ?



##########
seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/phoenix/PhoenixDialectFactory.java:
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.phoenix;
+
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectFactory;
+
+import com.google.auto.service.AutoService;
+import lombok.NonNull;
+
+@AutoService(JdbcDialectFactory.class)
+public class PhoenixDialectFactory implements JdbcDialectFactory {
+
+    @Override
+    public boolean acceptsURL(@NonNull String url) {
+        // Support greenplum native driver: com.pivotal.jdbc.GreenplumDriver

Review Comment:
   `greenplum`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] hailin0 commented on a diff in pull request #2499: [Feature][Connector-V2] Add phoenix connector sink

Posted by GitBox <gi...@apache.org>.
hailin0 commented on code in PR #2499:
URL: https://github.com/apache/incubator-seatunnel/pull/2499#discussion_r952175297


##########
seatunnel-connectors-v2/connector-phoenix/src/main/java/org/apache/seatunnel/connectors/seatunnel/phoenix/client/PhoenixOutputFormat.java:
##########
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.phoenix.client;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.connectors.seatunnel.phoenix.config.PhoenixSinkConfig;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class PhoenixOutputFormat {
+
+    private static final long serialVersionUID = 1L;

Review Comment:
   remove ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] 531651225 commented on a diff in pull request #2499: [Feature][Connector-V2] Add phoenix connector sink

Posted by GitBox <gi...@apache.org>.
531651225 commented on code in PR #2499:
URL: https://github.com/apache/incubator-seatunnel/pull/2499#discussion_r952754558


##########
seatunnel-connectors-v2/connector-phoenix/src/main/java/org/apache/seatunnel/connectors/seatunnel/phoenix/client/PhoenixStatementExecutor.java:
##########
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.phoenix.client;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.connectors.seatunnel.phoenix.config.PhoenixSinkConfig;
+import org.apache.seatunnel.connectors.seatunnel.phoenix.constant.Constant;
+import org.apache.seatunnel.connectors.seatunnel.phoenix.constant.NullModeType;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.math.BigDecimal;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.List;
+
+public class PhoenixStatementExecutor {
+    private static final Logger LOG = LoggerFactory.getLogger(PhoenixStatementExecutor.class);
+    private final PhoenixSinkConfig phoenixWriteConfig;
+    private final List<SeaTunnelRow> batch;
+    private transient PreparedStatement st;
+    private int[] columnTypes;
+
+    public PhoenixStatementExecutor(
+            PhoenixSinkConfig phoenixWriteConfig, int[] columnTypes) {
+        this.phoenixWriteConfig = phoenixWriteConfig;
+        this.batch = new ArrayList<>();
+        this.columnTypes = columnTypes;
+    }
+
+    public void prepareStatements(Connection connection) throws SQLException {
+        this.st = createPreparedStatement(connection);
+    }
+
+    public void addToBatch(SeaTunnelRow record) {
+        batch.add(record);
+    }
+
+    public void executeBatch() throws SQLException {
+        if (!batch.isEmpty()) {
+            for (SeaTunnelRow record : batch) {
+                setupStatement(st, record);
+                st.addBatch();
+            }
+            st.executeBatch();
+            batch.clear();
+        }
+    }
+
+    public void closeStatements() throws SQLException {
+        if (st != null) {
+            st.close();
+            st = null;
+        }
+    }
+
+    private PreparedStatement createPreparedStatement(Connection connection) throws SQLException {
+        StringBuilder columnNamesBuilder = new StringBuilder();
+        if (phoenixWriteConfig.isThinClient()) {
+            for (String col : phoenixWriteConfig.getColumns()) {
+                // thin client does not use double quotes
+                columnNamesBuilder.append(col);
+                columnNamesBuilder.append(",");
+            }
+        } else {
+            for (String col : phoenixWriteConfig.getColumns()) {
+                // Column names use double quotes
+                columnNamesBuilder.append("\"");
+                columnNamesBuilder.append(col);
+                columnNamesBuilder.append("\"");
+                columnNamesBuilder.append(",");
+            }
+        }
+        // The table name uses double quotation marks
+        columnNamesBuilder.setLength(columnNamesBuilder.length() - 1);
+        String columnNames = columnNamesBuilder.toString();
+        // Generate upsert template
+        String tableName = phoenixWriteConfig.getTableName();
+        StringBuilder upsertBuilder = null;
+        if (phoenixWriteConfig.isThinClient()) {
+            upsertBuilder = new StringBuilder("upsert into " + tableName + " (" + columnNames + " ) values (");
+        } else {
+            // The table name uses double quotation marks
+            upsertBuilder = new StringBuilder("upsert into \"" + tableName + "\" (" + columnNames + " ) values (");
+        }
+        for (int i = 0; i < phoenixWriteConfig.getColumns().size(); i++) {
+            upsertBuilder.append("?,");
+        }
+        //Remove the extra comma at the end
+        upsertBuilder.setLength(upsertBuilder.length() - 1);
+        upsertBuilder.append(")");
+
+        String sql = upsertBuilder.toString();
+        PreparedStatement ps = connection.prepareStatement(sql);
+        LOG.debug("SQL template generated: " + sql);
+        return ps;
+    }
+
+    private void setupStatement(PreparedStatement upload, SeaTunnelRow row) throws SQLException {
+        if (columnTypes != null && columnTypes.length > 0 && columnTypes.length != row.getFields().length) {
+            LOG.warn("Column SQL types array doesn't match arity of passed Row! Check the passed array...");
+        }
+        List<Integer> sinkColumnsIndex = phoenixWriteConfig.getSinkColumnsIndexInRow();
+        if (columnTypes == null) {
+            // no types provided
+            for (int index = 0; index < sinkColumnsIndex.size(); index++) {
+                upload.setObject(index + 1, row.getFields()[index]);
+            }
+        } else {
+            // types provided
+            for (int i = 0; i < sinkColumnsIndex.size(); i++) {
+                int sqlType = columnTypes[i];
+                setupColumn(upload, i + 1, sqlType, row.getFields()[i]);
+            }
+        }
+    }
+
+    private void setupColumn(PreparedStatement ps, int pos, int sqlType, Object col) throws SQLException {
+        if (col != null) {
+            switch (sqlType) {
+                case Types.CHAR:
+                case Types.VARCHAR:
+                    ps.setString(pos, (String) col);
+                    break;
+
+                case Types.BINARY:
+                case Types.VARBINARY:
+                    ps.setBytes(pos, (byte[]) col);
+                    break;
+
+                case Types.BOOLEAN:
+                    ps.setBoolean(pos, (Boolean) col);
+                    break;
+
+                case Types.TINYINT:
+                case Constant.TYPE_UNSIGNED_TINYINT:
+                    ps.setByte(pos, ((Long) col).byteValue());
+                    break;
+
+                case Types.SMALLINT:
+                case Constant.TYPE_UNSIGNED_SMALLINT:
+                    ps.setShort(pos, ((Long) col).shortValue());
+                    break;
+
+                case Types.INTEGER:
+                case Constant.TYPE_UNSIGNED_INTEGER:
+                    ps.setInt(pos, ((Long) col).intValue());
+                    break;
+
+                case Types.BIGINT:
+                case Constant.TYPE_UNSIGNED_LONG:
+                    ps.setLong(pos, (Long) col);
+                    break;
+
+                case Types.FLOAT:
+                    ps.setFloat(pos, ((Double) col).floatValue());
+                    break;
+
+                case Types.DOUBLE:
+                    ps.setDouble(pos, (Double) col);
+                    break;
+
+                case Types.DECIMAL:
+                    ps.setBigDecimal(pos, (BigDecimal) col);
+                    break;
+
+                case Types.DATE:
+                case Constant.TYPE_UNSIGNED_DATE:
+                    ps.setDate(pos, new java.sql.Date(new Date((Long) col).getTime()));
+                    break;
+
+                case Types.TIME:
+                case Constant.TYPE_UNSIGNED_TIME:
+                    ps.setTime(pos, new java.sql.Time(new Date((Long) col).getTime()));
+                    break;
+
+                case Types.TIMESTAMP:
+                case Constant.TYPE_UNSIGNED_TIMESTAMP:
+                    ps.setTimestamp(pos, new java.sql.Timestamp(new Date((Long) col).getTime()));
+                    break;

Review Comment:
   > The time object is processed as a Local* time class in Seatunnel. you can see #2024 or source code of data type.
   
   Thank you for your suggestion, I has been fixed in the new commit, refer to https://phoenix.apache.org/language/datatypes.html , transform seatunnel datatype  supported by Phoenix 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org