You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bahir.apache.org by lr...@apache.org on 2018/09/14 02:34:58 UTC
[2/2] bahir-flink git commit: [BAHIR-99] kudu connector
[BAHIR-99] kudu connector
Project: http://git-wip-us.apache.org/repos/asf/bahir-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/bahir-flink/commit/c760e3cf
Tree: http://git-wip-us.apache.org/repos/asf/bahir-flink/tree/c760e3cf
Diff: http://git-wip-us.apache.org/repos/asf/bahir-flink/diff/c760e3cf
Branch: refs/heads/master
Commit: c760e3cfbd23c6c550800a07ef652e7f28e3f213
Parents: ca795cc
Author: Joao Boto <bo...@boto.pro>
Authored: Wed Jul 25 20:17:36 2018 +0200
Committer: Luciano Resende <lr...@apache.org>
Committed: Thu Sep 13 19:32:13 2018 -0700
----------------------------------------------------------------------
.travis.yml | 21 +-
flink-connector-kudu/README.md | 98 +++++++++
flink-connector-kudu/dockers/docker-compose.yml | 92 ++++++++
flink-connector-kudu/dockers/role/Dockerfile | 41 ++++
.../dockers/role/docker-entrypoint.sh | 69 ++++++
flink-connector-kudu/dockers/run_kudu_tests.sh | 68 ++++++
flink-connector-kudu/dockers/start-images.sh | 42 ++++
flink-connector-kudu/dockers/stop-images.sh | 33 +++
flink-connector-kudu/pom.xml | 103 +++++++++
.../connectors/kudu/KuduInputFormat.java | 218 +++++++++++++++++++
.../connectors/kudu/KuduOutputFormat.java | 110 ++++++++++
.../streaming/connectors/kudu/KuduSink.java | 106 +++++++++
.../kudu/connector/KuduColumnInfo.java | 161 ++++++++++++++
.../kudu/connector/KuduConnector.java | 133 +++++++++++
.../kudu/connector/KuduFilterInfo.java | 173 +++++++++++++++
.../connectors/kudu/connector/KuduMapper.java | 146 +++++++++++++
.../connectors/kudu/connector/KuduRow.java | 137 ++++++++++++
.../kudu/connector/KuduTableInfo.java | 133 +++++++++++
.../connectors/kudu/KuduInputFormatTest.java | 91 ++++++++
.../connectors/kudu/KuduOuputFormatTest.java | 93 ++++++++
.../streaming/connectors/kudu/KuduSinkTest.java | 89 ++++++++
.../connectors/kudu/connector/KuduDatabase.java | 89 ++++++++
pom.xml | 3 +-
23 files changed, 2247 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/c760e3cf/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index 083e75d..691667c 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -28,20 +28,39 @@ before_cache:
language: java
+services:
+ - docker
+
matrix:
include:
- jdk: oraclejdk8
env:
- FLINK_VERSION="1.5.1" SCALA_VERSION="2.11"
+ - MAVEN_PROFILE="default"
- CACHE_NAME=JDK8_F130_A
- jdk: openjdk8
env:
- FLINK_VERSION="1.5.1" SCALA_VERSION="2.11"
- CACHE_NAME=JDK8_F130_C
+ - MAVEN_PROFILE="default"
+ - CACHE_NAME=JDK8_F130_B
+ - jdk: openjdk8
+ env:
+ - FLINK_VERSION="1.5.1" SCALA_VERSION="2.11"
+ - MAVEN_PROFILE="test-kudu"
+ - CACHE_NAME=JDK8_F130_KUDU
before_install:
- ./dev/change-scala-version.sh $SCALA_VERSION
install: true
-script: mvn clean verify -Pscala-$SCALA_VERSION -Dflink.version=$FLINK_VERSION
+script:
+ - |
+ if [[ $MAVEN_PROFILE == "default" ]]; then
+ mvn clean verify -Pscala-$SCALA_VERSION -Dflink.version=$FLINK_VERSION
+ fi
+ - |
+ if [[ $MAVEN_PROFILE == "test-kudu" ]]; then
+ flink-connector-kudu/dockers/run_kudu_tests.sh
+ fi
http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/c760e3cf/flink-connector-kudu/README.md
----------------------------------------------------------------------
diff --git a/flink-connector-kudu/README.md b/flink-connector-kudu/README.md
new file mode 100644
index 0000000..af2985b
--- /dev/null
+++ b/flink-connector-kudu/README.md
@@ -0,0 +1,98 @@
+# Flink Kudu Connector
+
+This connector provides a source (```KuduInputFormat```) and a sink/output (```KuduSink``` and ```KuduOutputFormat```, respectively) that can read and write to [Kudu](https://kudu.apache.org/). To use this connector, add the
+following dependency to your project:
+
+ <dependency>
+ <groupId>org.apache.bahir</groupId>
+ <artifactId>flink-connector-kudu_2.11</artifactId>
+ <version>1.1-SNAPSHOT</version>
+ </dependency>
+
+*Version Compatibility*: This module is compatible with Apache Kudu *1.7.1* (last stable version).
+
+Note that the streaming connectors are not part of the binary distribution of Flink. You need to link them into your job jar for cluster execution.
+See how to link with them for cluster execution [here](https://ci.apache.org/projects/flink/flink-docs-stable/start/dependencies.html).
+
+## Installing Kudu
+
+Follow the instructions from the [Kudu Installation Guide](https://kudu.apache.org/docs/installation.html).
+Optionally, you can use the docker images provided in dockers folder.
+
+## KuduInputFormat
+
+```
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+env.setParallelism(PARALLELISM);
+
+// create a table info object
+KuduTableInfo tableInfo = KuduTableInfo.Builder
+ .create("books")
+ .addColumn(KuduColumnInfo.Builder.create("id", Type.INT32).key(true).hashKey(true).build())
+ .addColumn(KuduColumnInfo.Builder.create("title", Type.STRING).build())
+ .addColumn(KuduColumnInfo.Builder.create("author", Type.STRING).build())
+ .addColumn(KuduColumnInfo.Builder.create("price", Type.DOUBLE).build())
+ .addColumn(KuduColumnInfo.Builder.create("quantity", Type.INT32).build())
+ .build();
+
+// Pass the tableInfo to the KuduInputFormat and provide kuduMaster ips
+env.createInput(new KuduInputFormat<>("172.25.0.6", tableInfo))
+ .count();
+
+env.execute();
+```
+
+## KuduOutputFormat
+
+```
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+env.setParallelism(PARALLELISM);
+
+// create a table info object
+KuduTableInfo tableInfo = KuduTableInfo.Builder
+ .create("books")
+ .createIfNotExist(true)
+ .replicas(1)
+ .addColumn(KuduColumnInfo.Builder.create("id", Type.INT32).key(true).hashKey(true).build())
+ .addColumn(KuduColumnInfo.Builder.create("title", Type.STRING).build())
+ .addColumn(KuduColumnInfo.Builder.create("author", Type.STRING).build())
+ .addColumn(KuduColumnInfo.Builder.create("price", Type.DOUBLE).build())
+ .addColumn(KuduColumnInfo.Builder.create("quantity", Type.INT32).build())
+ .build();
+
+...
+
+env.fromCollection(books)
+ .output(new KuduOutputFormat<>("172.25.0.6", tableInfo));
+
+env.execute();
+```
+
+## KuduSink
+
+```
+StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+env.setParallelism(PARALLELISM);
+
+// create a table info object
+KuduTableInfo tableInfo = KuduTableInfo.Builder
+ .create("books")
+ .createIfNotExist(true)
+ .replicas(1)
+ .addColumn(KuduColumnInfo.Builder.create("id", Type.INT32).key(true).hashKey(true).build())
+ .addColumn(KuduColumnInfo.Builder.create("title", Type.STRING).build())
+ .addColumn(KuduColumnInfo.Builder.create("author", Type.STRING).build())
+ .addColumn(KuduColumnInfo.Builder.create("price", Type.DOUBLE).build())
+ .addColumn(KuduColumnInfo.Builder.create("quantity", Type.INT32).build())
+ .build();
+
+...
+
+env.fromCollection(books)
+ .addSink(new KuduSink<>("172.25.0.6", tableInfo));
+
+env.execute();
+```
http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/c760e3cf/flink-connector-kudu/dockers/docker-compose.yml
----------------------------------------------------------------------
diff --git a/flink-connector-kudu/dockers/docker-compose.yml b/flink-connector-kudu/dockers/docker-compose.yml
new file mode 100644
index 0000000..d2c95bb
--- /dev/null
+++ b/flink-connector-kudu/dockers/docker-compose.yml
@@ -0,0 +1,92 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+version: '2'
+
+services:
+
+ kudu-master:
+ image: eskabetxe/kudu
+ container_name: kudu-master
+ hostname: 172.25.0.6
+ ports:
+ - "8051:8051"
+ volumes:
+ - /var/lib/kudu/master
+ command: master
+ networks:
+ mynet:
+ ipv4_address: 172.25.0.6
+
+ kudu-server1:
+ image: eskabetxe/kudu
+ container_name: kudu-server1
+ hostname: 172.25.0.7
+ environment:
+ - KUDU_MASTER=172.25.0.6
+ ports:
+ - "8054:8050"
+ volumes:
+ - /var/lib/kudu/server
+ command: tserver
+ networks:
+ mynet:
+ ipv4_address: 172.25.0.7
+ links:
+ - kudu-master
+
+ kudu-server2:
+ image: eskabetxe/kudu
+ container_name: kudu-server2
+ hostname: 172.25.0.8
+ environment:
+ - KUDU_MASTER=172.25.0.6
+ ports:
+ - "8052:8050"
+ volumes:
+ - /var/lib/kudu/server
+ command: tserver
+ networks:
+ mynet:
+ ipv4_address: 172.25.0.8
+ links:
+ - kudu-master
+
+ kudu-server3:
+ image: eskabetxe/kudu
+ container_name: kudu-server3
+ hostname: 172.25.0.9
+ environment:
+ - KUDU_MASTER=172.25.0.6
+ ports:
+ - "8053:8050"
+ volumes:
+ - /var/lib/kudu/server
+ command: tserver
+ networks:
+ mynet:
+ ipv4_address: 172.25.0.9
+ links:
+ - kudu-master
+
+networks:
+ mynet:
+ driver: bridge
+ ipam:
+ config:
+ - subnet: 172.25.0.0/24
+ IPRange: 172.25.0.2/24,
+ gateway: 172.25.0.1
http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/c760e3cf/flink-connector-kudu/dockers/role/Dockerfile
----------------------------------------------------------------------
diff --git a/flink-connector-kudu/dockers/role/Dockerfile b/flink-connector-kudu/dockers/role/Dockerfile
new file mode 100644
index 0000000..b14b087
--- /dev/null
+++ b/flink-connector-kudu/dockers/role/Dockerfile
@@ -0,0 +1,41 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+FROM bitnami/minideb:jessie
+MAINTAINER eskabetxe
+
+RUN set -x \
+ && apt-get update \
+ && apt-get install -y --no-install-recommends \
+ bzip2 unzip xz-utils wget \
+ && cd /etc/apt/sources.list.d \
+ && wget -qO - http://archive.cloudera.com/kudu/debian/jessie/amd64/kudu/archive.key | apt-key add - \
+ && wget http://archive.cloudera.com/kudu/debian/jessie/amd64/kudu/cloudera.list \
+ && apt-get update \
+ && apt-get install --no-install-recommends -y \
+ kudu kudu-master kudu-tserver libkuduclient0 libkuduclient-dev \
+ && rm -rf /var/lib/apt/lists/* \
+ && apt-get autoclean
+
+VOLUME /var/lib/kudu/master /var/lib/kudu/tserver
+
+COPY docker-entrypoint.sh /
+RUN chmod a+x /docker-entrypoint.sh
+
+ENTRYPOINT ["/docker-entrypoint.sh"]
+EXPOSE 8050 8051 7050 7051
+#CMD ["help"]
http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/c760e3cf/flink-connector-kudu/dockers/role/docker-entrypoint.sh
----------------------------------------------------------------------
diff --git a/flink-connector-kudu/dockers/role/docker-entrypoint.sh b/flink-connector-kudu/dockers/role/docker-entrypoint.sh
new file mode 100644
index 0000000..770850c
--- /dev/null
+++ b/flink-connector-kudu/dockers/role/docker-entrypoint.sh
@@ -0,0 +1,69 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+set -e
+
+function do_help {
+ echo HELP:
+ echo "Supported commands:"
+ echo " master - Start a Kudu Master"
+ echo " tserver - Start a Kudu TServer"
+ echo " single - Start a Kudu Master+TServer in one container"
+ echo " kudu - Run the Kudu CLI"
+ echo " help - print useful information and exit"l
+ echo ""
+ echo "Other commands can be specified to run shell commands."
+ echo "Set the environment variable KUDU_OPTS to pass additional"
+ echo "arguments to the kudu process. DEFAULT_KUDU_OPTS contains"
+ echo "a recommended base set of options."
+
+ exit 0
+}
+
+DEFAULT_KUDU_OPTS="-logtostderr \
+ -fs_wal_dir=/var/lib/kudu/$1 \
+ -fs_data_dirs=/var/lib/kudu/$1 \
+ -use_hybrid_clock=false"
+
+KUDU_OPTS=${KUDU_OPTS:-${DEFAULT_KUDU_OPTS}}
+
+if [ "$1" = 'master' ]; then
+ exec kudu-master -fs_wal_dir /var/lib/kudu/master ${KUDU_OPTS}
+elif [ "$1" = 'tserver' ]; then
+ exec kudu-tserver -fs_wal_dir /var/lib/kudu/tserver \
+ -tserver_master_addrs ${KUDU_MASTER} ${KUDU_OPTS}
+elif [ "$1" = 'single' ]; then
+ KUDU_MASTER=boot2docker
+ KUDU_MASTER_OPTS="-logtostderr \
+ -fs_wal_dir=/var/lib/kudu/master \
+ -fs_data_dirs=/var/lib/kudu/master \
+ -use_hybrid_clock=false"
+ KUDU_TSERVER_OPTS="-logtostderr \
+ -fs_wal_dir=/var/lib/kudu/tserver \
+ -fs_data_dirs=/var/lib/kudu/tserver \
+ -use_hybrid_clock=false"
+ exec kudu-master -fs_wal_dir /var/lib/kudu/master ${KUDU_MASTER_OPTS} &
+ sleep 5
+ exec kudu-tserver -fs_wal_dir /var/lib/kudu/tserver -tserver_master_addrs ${KUDU_MASTER} ${KUDU_TSERVER_OPTS}
+elif [ "$1" = 'kudu' ]; then
+ shift; # Remove first arg and pass remainder to kudu cli
+ exec kudu "$@"
+elif [ "$1" = 'help' ]; then
+ do_help
+fi
+
+exec "$@"
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/c760e3cf/flink-connector-kudu/dockers/run_kudu_tests.sh
----------------------------------------------------------------------
diff --git a/flink-connector-kudu/dockers/run_kudu_tests.sh b/flink-connector-kudu/dockers/run_kudu_tests.sh
new file mode 100755
index 0000000..58593d6
--- /dev/null
+++ b/flink-connector-kudu/dockers/run_kudu_tests.sh
@@ -0,0 +1,68 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+#
+# Runs all tests with Kudu server in docker containers.
+
+set -euo pipefail -x
+
+# http://stackoverflow.com/questions/3572030/bash-script-absolute-path-with-osx
+function absolutepath() {
+ [[ $1 = /* ]] && echo "$1" || echo "$PWD/${1#./}"
+}
+
+SCRIPT_DIR=$(dirname $(absolutepath "$0"))
+
+PROJECT_ROOT="${SCRIPT_DIR}/../.."
+
+DOCKER_COMPOSE_LOCATION="${SCRIPT_DIR}/docker-compose.yml"
+
+
+function build_image() {
+ docker build -t eskabetxe/kudu ${SCRIPT_DIR}/role
+
+ #docker-compose build -f "${DOCKER_COMPOSE_LOCATION}"
+}
+
+function start_docker_container() {
+ # stop already running containers
+ #docker-compose -f "${DOCKER_COMPOSE_LOCATION}" down || true
+
+ # start containers
+ docker-compose -f "${DOCKER_COMPOSE_LOCATION}" up -d
+}
+
+function cleanup_docker_container() {
+ docker-compose -f "${DOCKER_COMPOSE_LOCATION}" down
+ #true
+}
+
+build_image
+
+start_docker_container
+
+#run product tests
+pushd ${PROJECT_ROOT}
+set +e
+mvn test -pl flink-connector-kudu -P test-kudu
+EXIT_CODE=$?
+set -e
+popd
+
+cleanup_docker_container
+
+exit ${EXIT_CODE}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/c760e3cf/flink-connector-kudu/dockers/start-images.sh
----------------------------------------------------------------------
diff --git a/flink-connector-kudu/dockers/start-images.sh b/flink-connector-kudu/dockers/start-images.sh
new file mode 100755
index 0000000..fad3de6
--- /dev/null
+++ b/flink-connector-kudu/dockers/start-images.sh
@@ -0,0 +1,42 @@
+#!/usr/bin/env bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+function absolutepath() {
+ [[ $1 = /* ]] && echo "$1" || echo "$PWD/${1#./}"
+}
+
+function build_image() {
+ docker build -t eskabetxe/kudu ${SCRIPT_DIR}/role
+
+ #docker-compose build -f "${DOCKER_COMPOSE_LOCATION}"
+}
+
+function start_docker_container() {
+ # stop already running containers
+ docker-compose -f "${DOCKER_COMPOSE_LOCATION}" down || true
+
+ # start containers
+ docker-compose -f "${DOCKER_COMPOSE_LOCATION}" up -d
+}
+
+SCRIPT_DIR=$(dirname $(absolutepath "$0"))
+DOCKER_COMPOSE_LOCATION="${SCRIPT_DIR}/docker-compose.yml"
+
+build_image
+
+start_docker_container
http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/c760e3cf/flink-connector-kudu/dockers/stop-images.sh
----------------------------------------------------------------------
diff --git a/flink-connector-kudu/dockers/stop-images.sh b/flink-connector-kudu/dockers/stop-images.sh
new file mode 100755
index 0000000..9ae52c1
--- /dev/null
+++ b/flink-connector-kudu/dockers/stop-images.sh
@@ -0,0 +1,33 @@
+#!/usr/bin/env bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+function absolutepath() {
+ [[ $1 = /* ]] && echo "$1" || echo "$PWD/${1#./}"
+}
+
+function cleanup_docker_container() {
+ docker-compose -f "${DOCKER_COMPOSE_LOCATION}" down
+
+ docker rm eskabetxe/kudu
+}
+
+SCRIPT_DIR=$(dirname $(absolutepath "$0"))
+DOCKER_COMPOSE_LOCATION="${SCRIPT_DIR}/docker-compose.yml"
+
+cleanup_docker_container
+
http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/c760e3cf/flink-connector-kudu/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connector-kudu/pom.xml b/flink-connector-kudu/pom.xml
new file mode 100644
index 0000000..348371b
--- /dev/null
+++ b/flink-connector-kudu/pom.xml
@@ -0,0 +1,103 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.bahir</groupId>
+ <artifactId>bahir-flink-parent_2.11</artifactId>
+ <version>1.1-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <artifactId>flink-connector-kudu_2.11</artifactId>
+ <name>flink-connector-kudu</name>
+ <packaging>jar</packaging>
+
+ <properties>
+ <kudu.version>1.7.1</kudu.version>
+ <junit.version>5.2.0</junit.version>
+ </properties>
+
+ <dependencies>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.kudu</groupId>
+ <artifactId>kudu-client</artifactId>
+ <version>${kudu.version}</version>
+ </dependency>
+
+ <!--test dependencies-->
+
+ <dependency>
+ <groupId>org.apache.kudu</groupId>
+ <artifactId>kudu-client</artifactId>
+ <version>${kudu.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter-api</artifactId>
+ <version>${junit.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ </dependencies>
+
+ <profiles>
+ <profile>
+ <id>default</id>
+ <activation>
+ <activeByDefault>true</activeByDefault>
+ </activation>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <excludes>
+ <exclude>**/*Test.java</exclude>
+ </excludes>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ <profile>
+ <id>test-kudu</id>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ </profiles>
+
+</project>
http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/c760e3cf/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduInputFormat.java b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduInputFormat.java
new file mode 100644
index 0000000..617e317
--- /dev/null
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduInputFormat.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kudu;
+
+import org.apache.flink.api.common.io.LocatableInputSplitAssigner;
+import org.apache.flink.api.common.io.RichInputFormat;
+import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.InputSplitAssigner;
+import org.apache.flink.core.io.LocatableInputSplit;
+import org.apache.flink.streaming.connectors.kudu.connector.*;
+import org.apache.flink.util.Preconditions;
+import org.apache.kudu.client.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class KuduInputFormat extends RichInputFormat<KuduRow, KuduInputFormat.KuduInputSplit> {
+
+ private String kuduMasters;
+ private KuduTableInfo tableInfo;
+ private List<KuduFilterInfo> tableFilters;
+ private List<String> tableProjections;
+ private Long rowsLimit;
+ private boolean endReached;
+
+ private transient KuduConnector tableContext;
+ private transient KuduScanner scanner;
+ private transient RowResultIterator resultIterator;
+
+ private static final Logger LOG = LoggerFactory.getLogger(KuduInputFormat.class);
+
+ public KuduInputFormat(String kuduMasters, KuduTableInfo tableInfo) {
+ Preconditions.checkNotNull(kuduMasters,"kuduMasters could not be null");
+ this.kuduMasters = kuduMasters;
+
+ Preconditions.checkNotNull(tableInfo,"tableInfo could not be null");
+ this.tableInfo = tableInfo;
+
+ this.endReached = false;
+ }
+
+ public KuduInputFormat withTableFilters(KuduFilterInfo... tableFilters) {
+ return withTableFilters(Arrays.asList(tableFilters));
+ }
+
+ public KuduInputFormat withTableFilters(List<KuduFilterInfo> tableFilters) {
+ this.tableFilters = tableFilters;
+ return this;
+ }
+
+ public KuduInputFormat withTableProjections(String... tableProjections) {
+ return withTableProjections(Arrays.asList(tableProjections));
+ }
+ public KuduInputFormat withTableProjections(List<String> tableProjections) {
+ this.tableProjections = tableProjections;
+ return this;
+ }
+
+ public KuduInputFormat withRowsLimit(Long rowsLimit) {
+ this.rowsLimit = rowsLimit;
+ return this;
+ }
+
+ @Override
+ public void configure(Configuration parameters) {
+
+ }
+
+ @Override
+ public void open(KuduInputSplit split) throws IOException {
+ endReached = false;
+ startTableContext();
+
+ scanner = tableContext.scanner(split.getScanToken());
+ resultIterator = scanner.nextRows();
+ }
+
+ @Override
+ public void close() {
+ if (scanner != null) {
+ try {
+ scanner.close();
+ } catch (KuduException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ @Override
+ public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException {
+ return cachedStatistics;
+ }
+
+ @Override
+ public InputSplitAssigner getInputSplitAssigner(KuduInputSplit[] inputSplits) {
+ return new LocatableInputSplitAssigner(inputSplits);
+ }
+
+ private void startTableContext() throws IOException {
+ if (tableContext == null) {
+ tableContext = new KuduConnector(kuduMasters, tableInfo);
+ }
+ }
+
+ @Override
+ public KuduInputSplit[] createInputSplits(int minNumSplits) throws IOException {
+ startTableContext();
+ Preconditions.checkNotNull(tableContext,"tableContext should not be null");
+
+ List<KuduScanToken> tokens = tableContext.scanTokens(tableFilters, tableProjections, rowsLimit);
+
+ KuduInputSplit[] splits = new KuduInputSplit[tokens.size()];
+
+ for (int i = 0; i < tokens.size(); i++) {
+ KuduScanToken token = tokens.get(i);
+
+ List<String> locations = new ArrayList<>(token.getTablet().getReplicas().size());
+
+ for (LocatedTablet.Replica replica : token.getTablet().getReplicas()) {
+ locations.add(getLocation(replica.getRpcHost(), replica.getRpcPort()));
+ }
+
+ KuduInputSplit split = new KuduInputSplit(
+ token.serialize(),
+ i,
+ locations.toArray(new String[locations.size()])
+ );
+ splits[i] = split;
+ }
+
+ if (splits.length < minNumSplits) {
+ LOG.warn(" The minimum desired number of splits with your configured parallelism level " +
+ "is {}. Current kudu splits = {}. {} instances will remain idle.",
+ minNumSplits,
+ splits.length,
+ (minNumSplits - splits.length)
+ );
+ }
+
+ return splits;
+ }
+
+ @Override
+ public boolean reachedEnd() throws IOException {
+ return endReached;
+ }
+
+ @Override
+ public KuduRow nextRecord(KuduRow reuse) throws IOException {
+ // check that current iterator has next rows
+ if (this.resultIterator.hasNext()) {
+ RowResult row = this.resultIterator.next();
+ return KuduMapper.toKuduRow(row);
+ }
+ // if not, check that current scanner has more iterators
+ else if (scanner.hasMoreRows()) {
+ this.resultIterator = scanner.nextRows();
+ return nextRecord(reuse);
+ }
+ else {
+ endReached = true;
+ }
+ return null;
+ }
+
+ /**
+ * Returns a endpoint url in the following format: <host>:<ip>
+ *
+ * @param host Hostname
+ * @param port Port
+ * @return Formatted URL
+ */
+ private String getLocation(String host, Integer port) {
+ StringBuilder builder = new StringBuilder();
+ builder.append(host).append(":").append(port);
+ return builder.toString();
+ }
+
+
+ public class KuduInputSplit extends LocatableInputSplit {
+
+ private byte[] scanToken;
+
+ /**
+ * Creates a new KuduInputSplit
+ * @param splitNumber the number of the input split
+ * @param hostnames The names of the hosts storing the data this input split refers to.
+ */
+ public KuduInputSplit(byte[] scanToken, final int splitNumber, final String[] hostnames) {
+ super(splitNumber, hostnames);
+
+ this.scanToken = scanToken;
+ }
+
+ public byte[] getScanToken() {
+ return scanToken;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/c760e3cf/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduOutputFormat.java b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduOutputFormat.java
new file mode 100644
index 0000000..5c23f36
--- /dev/null
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduOutputFormat.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kudu;
+
+import org.apache.flink.api.common.io.OutputFormat;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.connectors.kudu.connector.KuduConnector;
+import org.apache.flink.streaming.connectors.kudu.connector.KuduRow;
+import org.apache.flink.streaming.connectors.kudu.connector.KuduTableInfo;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+public class KuduOutputFormat<OUT extends KuduRow> implements OutputFormat<OUT> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(KuduOutputFormat.class);
+
+ private String kuduMasters;
+ private KuduTableInfo tableInfo;
+ private KuduConnector.Consistency consistency;
+ private KuduConnector.WriteMode writeMode;
+
+ private transient KuduConnector tableContext;
+
+
+ public KuduOutputFormat(String kuduMasters, KuduTableInfo tableInfo) {
+ Preconditions.checkNotNull(kuduMasters,"kuduMasters could not be null");
+ this.kuduMasters = kuduMasters;
+
+ Preconditions.checkNotNull(tableInfo,"tableInfo could not be null");
+ this.tableInfo = tableInfo;
+ this.consistency = KuduConnector.Consistency.STRONG;
+ this.writeMode = KuduConnector.WriteMode.UPSERT;
+ }
+
+ public KuduOutputFormat<OUT> withEventualConsistency() {
+ this.consistency = KuduConnector.Consistency.EVENTUAL;
+ return this;
+ }
+
+ public KuduOutputFormat<OUT> withStrongConsistency() {
+ this.consistency = KuduConnector.Consistency.STRONG;
+ return this;
+ }
+
+ public KuduOutputFormat<OUT> withUpsertWriteMode() {
+ this.writeMode = KuduConnector.WriteMode.UPSERT;
+ return this;
+ }
+
+ public KuduOutputFormat<OUT> withInsertWriteMode() {
+ this.writeMode = KuduConnector.WriteMode.INSERT;
+ return this;
+ }
+
+ public KuduOutputFormat<OUT> withUpdateWriteMode() {
+ this.writeMode = KuduConnector.WriteMode.UPDATE;
+ return this;
+ }
+
+ @Override
+ public void configure(Configuration parameters) {
+
+ }
+
+ @Override
+ public void open(int taskNumber, int numTasks) throws IOException {
+ startTableContext();
+ }
+
+ private void startTableContext() throws IOException {
+ if (tableContext != null) return;
+ tableContext = new KuduConnector(kuduMasters, tableInfo);
+ }
+
+ @Override
+ public void writeRecord(OUT kuduRow) throws IOException {
+ try {
+ tableContext.writeRow(kuduRow, consistency, writeMode);
+ } catch (Exception e) {
+ throw new IOException(e.getLocalizedMessage(), e);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (this.tableContext == null) return;
+ try {
+ this.tableContext.close();
+ } catch (Exception e) {
+ throw new IOException(e.getLocalizedMessage(), e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/c760e3cf/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduSink.java
----------------------------------------------------------------------
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduSink.java b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduSink.java
new file mode 100644
index 0000000..120d5c5
--- /dev/null
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduSink.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kudu;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.connectors.kudu.connector.KuduConnector;
+import org.apache.flink.streaming.connectors.kudu.connector.KuduRow;
+import org.apache.flink.streaming.connectors.kudu.connector.KuduTableInfo;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+public class KuduSink<OUT extends KuduRow> extends RichSinkFunction<OUT> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(KuduOutputFormat.class);
+
+ private String kuduMasters;
+ private KuduTableInfo tableInfo;
+ private KuduConnector.Consistency consistency;
+ private KuduConnector.WriteMode writeMode;
+
+ private transient KuduConnector tableContext;
+
+
+ public KuduSink(String kuduMasters, KuduTableInfo tableInfo) {
+ Preconditions.checkNotNull(kuduMasters,"kuduMasters could not be null");
+ this.kuduMasters = kuduMasters;
+
+ Preconditions.checkNotNull(tableInfo,"tableInfo could not be null");
+ this.tableInfo = tableInfo;
+ this.consistency = KuduConnector.Consistency.STRONG;
+ this.writeMode = KuduConnector.WriteMode.UPSERT;
+ }
+
+ public KuduSink<OUT> withEventualConsistency() {
+ this.consistency = KuduConnector.Consistency.EVENTUAL;
+ return this;
+ }
+
+ public KuduSink<OUT> withStrongConsistency() {
+ this.consistency = KuduConnector.Consistency.STRONG;
+ return this;
+ }
+
+ public KuduSink<OUT> withUpsertWriteMode() {
+ this.writeMode = KuduConnector.WriteMode.UPSERT;
+ return this;
+ }
+
+ public KuduSink<OUT> withInsertWriteMode() {
+ this.writeMode = KuduConnector.WriteMode.INSERT;
+ return this;
+ }
+
+ public KuduSink<OUT> withUpdateWriteMode() {
+ this.writeMode = KuduConnector.WriteMode.UPDATE;
+ return this;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws IOException {
+ startTableContext();
+ }
+
+ private void startTableContext() throws IOException {
+ if (tableContext != null) return;
+ tableContext = new KuduConnector(kuduMasters, tableInfo);
+ }
+
+
+ @Override
+ public void invoke(OUT kuduRow) throws Exception {
+ try {
+ tableContext.writeRow(kuduRow, consistency, writeMode);
+ } catch (Exception e) {
+ throw new IOException(e.getLocalizedMessage(), e);
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (this.tableContext == null) return;
+ try {
+ this.tableContext.close();
+ } catch (Exception e) {
+ throw new IOException(e.getLocalizedMessage(), e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/c760e3cf/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduColumnInfo.java
----------------------------------------------------------------------
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduColumnInfo.java b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduColumnInfo.java
new file mode 100644
index 0000000..4dfc0b8
--- /dev/null
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduColumnInfo.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kudu.connector;
+
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.Type;
+
+import java.io.Serializable;
+
+public class KuduColumnInfo implements Serializable {
+
+ private String name;
+ private Type type;
+ private boolean key;
+ private boolean rangeKey;
+ private boolean hashKey;
+ private boolean nullable;
+ private Object defaultValue;
+ private int blockSize;
+ private Encoding encoding;
+ private Compression compression;
+
+ private KuduColumnInfo(String name, Type type) {
+ this.name = name;
+ this.type = type;
+ this.blockSize = 0;
+ this.key = false;
+ this.rangeKey = false;
+ this.hashKey = false;
+ this.nullable = false;
+ this.defaultValue = null;
+ this.encoding = Encoding.AUTO;
+ this.compression = Compression.DEFAULT;
+ }
+
+ protected String name() {
+ return name;
+ }
+
+ protected boolean isRangeKey() {
+ return rangeKey;
+ }
+
+ protected boolean isHashKey() {
+ return hashKey;
+ }
+
+ protected ColumnSchema columnSchema() {
+ return new ColumnSchema.ColumnSchemaBuilder(name, type)
+ .key(key)
+ .nullable(nullable)
+ .defaultValue(defaultValue)
+ .desiredBlockSize(blockSize)
+ .encoding(encoding.encode)
+ .compressionAlgorithm(compression.algorithm)
+ .build();
+ }
+
+ public static class Builder {
+ private KuduColumnInfo column;
+
+ private Builder(String name, Type type) {
+ this.column = new KuduColumnInfo(name, type);
+ }
+
+ public static Builder create(String name, Type type) {
+ return new Builder(name, type);
+ }
+
+ public Builder key(boolean key) {
+ this.column.key = key;
+ return this;
+ }
+
+ public Builder rangeKey(boolean rangeKey) {
+ this.column.rangeKey = rangeKey;
+ return this;
+ }
+
+ public Builder hashKey(boolean hashKey) {
+ this.column.hashKey = hashKey;
+ return this;
+ }
+
+ public Builder nullable(boolean nullable) {
+ this.column.nullable = nullable;
+ return this;
+ }
+
+ public Builder defaultValue(Object defaultValue) {
+ this.column.defaultValue = defaultValue;
+ return this;
+ }
+
+ public Builder desiredBlockSize(int blockSize) {
+ this.column.blockSize = blockSize;
+ return this;
+ }
+
+ public Builder encoding(Encoding encoding) {
+ this.column.encoding = encoding;
+ return this;
+ }
+
+ public Builder compressionAlgorithm(Compression compression) {
+ this.column.compression = compression;
+ return this;
+ }
+
+ public KuduColumnInfo build() {
+ return column;
+ }
+ }
+
+ public enum Compression {
+ UNKNOWN(ColumnSchema.CompressionAlgorithm.UNKNOWN),
+ DEFAULT(ColumnSchema.CompressionAlgorithm.DEFAULT_COMPRESSION),
+ WITHOUT(ColumnSchema.CompressionAlgorithm.NO_COMPRESSION),
+ SNAPPY(ColumnSchema.CompressionAlgorithm.SNAPPY),
+ LZ4(ColumnSchema.CompressionAlgorithm.LZ4),
+ ZLIB(ColumnSchema.CompressionAlgorithm.ZLIB);
+
+ final ColumnSchema.CompressionAlgorithm algorithm;
+
+ Compression(ColumnSchema.CompressionAlgorithm algorithm) {
+ this.algorithm = algorithm;
+ }
+ }
+
+ public enum Encoding {
+ UNKNOWN(ColumnSchema.Encoding.UNKNOWN),
+ AUTO(ColumnSchema.Encoding.AUTO_ENCODING),
+ PLAIN(ColumnSchema.Encoding.PLAIN_ENCODING),
+ PREFIX(ColumnSchema.Encoding.PREFIX_ENCODING),
+ GROUP_VARINT(ColumnSchema.Encoding.GROUP_VARINT),
+ RLE(ColumnSchema.Encoding.RLE),
+ DICT(ColumnSchema.Encoding.DICT_ENCODING),
+ BIT_SHUFFLE(ColumnSchema.Encoding.BIT_SHUFFLE);
+
+ final ColumnSchema.Encoding encode;
+
+ Encoding(ColumnSchema.Encoding encode) {
+ this.encode = encode;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/c760e3cf/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduConnector.java
----------------------------------------------------------------------
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduConnector.java b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduConnector.java
new file mode 100644
index 0000000..0e2e6bc
--- /dev/null
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduConnector.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kudu.connector;
+
+import com.stumbleupon.async.Callback;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.kudu.client.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+
+public class KuduConnector implements AutoCloseable {
+
+ private final Logger LOG = LoggerFactory.getLogger(this.getClass());
+
+ public enum Consistency {EVENTUAL, STRONG};
+ public enum WriteMode {INSERT,UPDATE,UPSERT}
+
+ private AsyncKuduClient client;
+ private KuduTable table;
+
+ public KuduConnector(String kuduMasters, KuduTableInfo tableInfo) throws IOException {
+ client = client(kuduMasters);
+ table = table(tableInfo);
+ }
+
+ private AsyncKuduClient client(String kuduMasters) {
+ return new AsyncKuduClient.AsyncKuduClientBuilder(kuduMasters).build();
+ }
+
+ private KuduTable table(KuduTableInfo infoTable) throws IOException {
+ KuduClient syncClient = client.syncClient();
+
+ String tableName = infoTable.getName();
+ if (syncClient.tableExists(tableName)) {
+ return syncClient.openTable(tableName);
+ }
+ if (infoTable.createIfNotExist()) {
+ return syncClient.createTable(tableName, infoTable.getSchema(), infoTable.getCreateTableOptions());
+ }
+ throw new UnsupportedOperationException("table not exists and is marketed to not be created");
+ }
+
+ public boolean deleteTable() throws IOException {
+ String tableName = table.getName();
+ client.syncClient().deleteTable(tableName);
+ return true;
+ }
+
+ public KuduScanner scanner(byte[] token) throws IOException {
+ return KuduScanToken.deserializeIntoScanner(token, client.syncClient());
+ }
+
+ public List<KuduScanToken> scanTokens(List<KuduFilterInfo> tableFilters, List<String> tableProjections, Long rowLimit) {
+ KuduScanToken.KuduScanTokenBuilder tokenBuilder = client.syncClient().newScanTokenBuilder(table);
+
+ if (CollectionUtils.isNotEmpty(tableProjections)) {
+ tokenBuilder.setProjectedColumnNames(tableProjections);
+ }
+
+ if (CollectionUtils.isNotEmpty(tableFilters)) {
+ tableFilters.stream()
+ .map(filter -> filter.toPredicate(table.getSchema()))
+ .forEach(tokenBuilder::addPredicate);
+ }
+
+ if (rowLimit !=null && rowLimit > 0) {
+ tokenBuilder.limit(rowLimit);
+ // FIXME: https://issues.apache.org/jira/browse/KUDU-16
+ // Server side limit() operator for java-based scanners are not implemented yet
+ }
+
+ return tokenBuilder.build();
+ }
+
+ public boolean writeRow(KuduRow row, Consistency consistency, WriteMode writeMode) throws Exception {
+ final Operation operation = KuduMapper.toOperation(table, writeMode, row);
+
+ if (Consistency.EVENTUAL.equals(consistency)) {
+ AsyncKuduSession session = client.newSession();
+ session.apply(operation);
+ session.flush();
+ return session.close().addCallback(new ResponseCallback()).join();
+ } else {
+ KuduSession session = client.syncClient().newSession();
+ session.apply(operation);
+ session.flush();
+ return processResponse(session.close());
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (client == null) return;
+
+ client.close();
+ }
+
+ private Boolean processResponse(List<OperationResponse> operationResponses) {
+ Boolean isOk = operationResponses.isEmpty();
+ for(OperationResponse operationResponse : operationResponses) {
+ logResponseError(operationResponse.getRowError());
+ }
+ return isOk;
+ }
+
+ private void logResponseError(RowError error) {
+ LOG.error("Error {} on {}: {} ", error.getErrorStatus(), error.getOperation(), error.toString());
+ }
+
+ private class ResponseCallback implements Callback<Boolean, List<OperationResponse>> {
+ @Override
+ public Boolean call(List<OperationResponse> operationResponses) {
+ return processResponse(operationResponses);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/c760e3cf/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduFilterInfo.java
----------------------------------------------------------------------
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduFilterInfo.java b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduFilterInfo.java
new file mode 100644
index 0000000..bd20fc8
--- /dev/null
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduFilterInfo.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kudu.connector;
+
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.Schema;
+import org.apache.kudu.client.KuduPredicate;
+
+import java.util.List;
+
+
+public class KuduFilterInfo {
+
+ private String column;
+ private FilterType type;
+ private Object value;
+
+ private KuduFilterInfo() { }
+
+ public KuduPredicate toPredicate(Schema schema) {
+ return toPredicate(schema.getColumn(this.column));
+ }
+ public KuduPredicate toPredicate(ColumnSchema column) {
+ KuduPredicate predicate;
+ switch (this.type) {
+ case IS_IN:
+ predicate = KuduPredicate.newInListPredicate(column, (List) this.value);
+ break;
+ case IS_NULL:
+ predicate = KuduPredicate.newIsNullPredicate(column);
+ break;
+ case IS_NOT_NULL:
+ predicate = KuduPredicate.newIsNotNullPredicate(column);
+ break;
+ default:
+ predicate = predicateComparator(column);
+ break;
+ }
+ return predicate;
+ }
+
+ private KuduPredicate predicateComparator(ColumnSchema column) {
+
+ KuduPredicate.ComparisonOp comparison = this.type.comparator;
+
+ KuduPredicate predicate;
+
+ switch (column.getType()) {
+ case STRING:
+ predicate = KuduPredicate.newComparisonPredicate(column, comparison, (String)this.value);
+ break;
+ case FLOAT:
+ predicate = KuduPredicate.newComparisonPredicate(column, comparison, (Float)this.value);
+ break;
+ case INT8:
+ predicate = KuduPredicate.newComparisonPredicate(column, comparison, (Byte)this.value);
+ break;
+ case INT16:
+ predicate = KuduPredicate.newComparisonPredicate(column, comparison, (Short)this.value);
+ break;
+ case INT32:
+ predicate = KuduPredicate.newComparisonPredicate(column, comparison, (Integer)this.value);
+ break;
+ case INT64:
+ predicate = KuduPredicate.newComparisonPredicate(column, comparison, (Long)this.value);
+ break;
+ case DOUBLE:
+ predicate = KuduPredicate.newComparisonPredicate(column, comparison, (Double)this.value);
+ break;
+ case BOOL:
+ predicate = KuduPredicate.newComparisonPredicate(column, comparison, (Boolean)this.value);
+ break;
+ case UNIXTIME_MICROS:
+ Long time = (Long)this.value;
+ predicate = KuduPredicate.newComparisonPredicate(column, comparison, time*1000);
+ break;
+ case BINARY:
+ predicate = KuduPredicate.newComparisonPredicate(column, comparison, (byte[])this.value);
+ break;
+ default:
+ throw new IllegalArgumentException("Illegal var type: " + column.getType());
+ }
+ return predicate;
+ }
+
+ public static class Builder {
+ private KuduFilterInfo filter;
+
+ private Builder(String column) {
+ this.filter = new KuduFilterInfo();
+ this.filter.column = column;
+ }
+
+ public static Builder create(String column) {
+ return new Builder(column);
+ }
+
+ public Builder greaterThan(Object value) {
+ return filter(FilterType.GREATER, value);
+ }
+
+ public Builder lessThan(Object value) {
+ return filter(FilterType.LESS, value);
+ }
+
+ public Builder equalTo(Object value) {
+ return filter(FilterType.EQUAL, value);
+ }
+
+ public Builder greaterOrEqualTo(Object value) {
+ return filter(FilterType.GREATER_EQUAL, value);
+ }
+
+ public Builder lessOrEqualTo(Object value) {
+ return filter(FilterType.LESS_EQUAL, value);
+ }
+
+ public Builder isNotNull() {
+ return filter(FilterType.IS_NOT_NULL, null);
+ }
+
+ public Builder isNull() {
+ return filter(FilterType.IS_NULL, null);
+ }
+
+ public Builder isIn(List values) {
+ return filter(FilterType.IS_IN, values);
+ }
+
+ public Builder filter(FilterType type, Object value) {
+ this.filter.type = type;
+ this.filter.value = value;
+ return this;
+ }
+
+ public KuduFilterInfo build() {
+ return filter;
+ }
+ }
+
+ public enum FilterType {
+ GREATER(KuduPredicate.ComparisonOp.GREATER),
+ GREATER_EQUAL(KuduPredicate.ComparisonOp.GREATER_EQUAL),
+ EQUAL(KuduPredicate.ComparisonOp.EQUAL),
+ LESS(KuduPredicate.ComparisonOp.LESS),
+ LESS_EQUAL(KuduPredicate.ComparisonOp.LESS_EQUAL),
+ IS_NOT_NULL(null),
+ IS_NULL(null),
+ IS_IN(null);
+
+ final KuduPredicate.ComparisonOp comparator;
+
+ FilterType(KuduPredicate.ComparisonOp comparator) {
+ this.comparator = comparator;
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/c760e3cf/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduMapper.java
----------------------------------------------------------------------
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduMapper.java b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduMapper.java
new file mode 100644
index 0000000..b1366ba
--- /dev/null
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduMapper.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kudu.connector;
+
+
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.Schema;
+import org.apache.kudu.Type;
+import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.client.Operation;
+import org.apache.kudu.client.PartialRow;
+import org.apache.kudu.client.RowResult;
+
+import java.util.List;
+
+public final class KuduMapper {
+
+ private KuduMapper() { }
+
+ public static KuduRow toKuduRow(RowResult row) {
+ Schema schema = row.getColumnProjection();
+ List<ColumnSchema> columns = schema.getColumns();
+
+ KuduRow values = new KuduRow(columns.size());
+ for (int i = 0; i < columns.size(); i++) {
+ String name = schema.getColumnByIndex(i).getName();
+ if(row.isNull(i)) {
+ values.setField(i, name, null);
+ } else {
+ Type type = schema.getColumnByIndex(i).getType();
+ switch (type) {
+ case BINARY:
+ values.setField(i, name, row.getBinary(i));
+ break;
+ case STRING:
+ values.setField(i, name, row.getString(i));
+ break;
+ case BOOL:
+ values.setField(i, name, row.getBoolean(i));
+ break;
+ case DOUBLE:
+ values.setField(i, name, row.getDouble(i));
+ break;
+ case FLOAT:
+ values.setField(i, name, row.getFloat(i));
+ break;
+ case INT8:
+ values.setField(i, name, row.getByte(i));
+ break;
+ case INT16:
+ values.setField(i, name, row.getShort(i));
+ break;
+ case INT32:
+ values.setField(i, name, row.getInt(i));
+ break;
+ case INT64:
+ case UNIXTIME_MICROS:
+ values.setField(i, name, row.getLong(i));
+ break;
+ default:
+ throw new IllegalArgumentException("Illegal var type: " + type);
+ }
+ }
+ }
+ return values;
+ }
+
+
+ public static Operation toOperation(KuduTable table, KuduConnector.WriteMode writeMode, KuduRow row) {
+ final Operation operation = toOperation(table, writeMode);
+ final PartialRow partialRow = operation.getRow();
+
+ Schema schema = table.getSchema();
+ List<ColumnSchema> columns = schema.getColumns();
+
+ for (int i = 0; i < columns.size(); i++) {
+ String columnName = schema.getColumnByIndex(i).getName();
+ Object value = row.getField(i);
+ if (value == null) {
+ partialRow.setNull(columnName);
+ } else {
+ Type type = schema.getColumnByIndex(i).getType();
+ switch (type) {
+ case STRING:
+ partialRow.addString(columnName, (String) value);
+ break;
+ case FLOAT:
+ partialRow.addFloat(columnName, (Float) value);
+ break;
+ case INT8:
+ partialRow.addByte(columnName, (Byte) value);
+ break;
+ case INT16:
+ partialRow.addShort(columnName, (Short) value);
+ break;
+ case INT32:
+ partialRow.addInt(columnName, (Integer) value);
+ break;
+ case INT64:
+ partialRow.addLong(columnName, (Long) value);
+ break;
+ case DOUBLE:
+ partialRow.addDouble(columnName, (Double) value);
+ break;
+ case BOOL:
+ partialRow.addBoolean(columnName, (Boolean) value);
+ break;
+ case UNIXTIME_MICROS:
+ //*1000 to correctly create date on kudu
+ partialRow.addLong(columnName, ((Long) value) * 1000);
+ break;
+ case BINARY:
+ partialRow.addBinary(columnName, (byte[]) value);
+ break;
+ default:
+ throw new IllegalArgumentException("Illegal var type: " + type);
+ }
+ }
+ }
+ return operation;
+ }
+
+ public static Operation toOperation(KuduTable table, KuduConnector.WriteMode writeMode) {
+ switch (writeMode) {
+ case INSERT: return table.newInsert();
+ case UPDATE: return table.newUpdate();
+ case UPSERT: return table.newUpsert();
+ }
+ return table.newUpsert();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/c760e3cf/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduRow.java
----------------------------------------------------------------------
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduRow.java b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduRow.java
new file mode 100644
index 0000000..03f5e5c
--- /dev/null
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduRow.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kudu.connector;
+
+import org.apache.flink.types.Row;
+import org.apache.kudu.Schema;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.util.*;
+import java.util.stream.Stream;
+
+public class KuduRow extends Row {
+
+ private Map<String, Integer> rowNames;
+
+ public KuduRow(Integer arity) {
+ super(arity);
+ rowNames = new LinkedHashMap<>();
+ }
+
+ public KuduRow(Object object, Schema schema) {
+ super(validFields(object));
+ for (Class<?> c = object.getClass(); c != null; c = c.getSuperclass()) {
+ basicValidation(c.getDeclaredFields())
+ .filter(field -> schema.getColumn(field.getName()) != null)
+ .forEach(cField -> {
+ try {
+ cField.setAccessible(true);
+ setField(schema.getColumnIndex(cField.getName()), cField.getName(), cField.get(object));
+ } catch (IllegalAccessException e) {
+ String error = String.format("Cannot get value for %s", cField.getName());
+ throw new IllegalArgumentException(error, e);
+ }
+ });
+ }
+ }
+
+
+ public Object getField(String name) {
+ return super.getField(rowNames.get(name));
+ }
+
+ public void setField(int pos, String name, Object value) {
+ super.setField(pos, value);
+ this.rowNames.put(name, pos);
+ }
+
+ public boolean isNull(int pos) {
+ return getField(pos) == null;
+ }
+
+ private static int validFields(Object object) {
+ Long validField = 0L;
+ for (Class<?> c = object.getClass(); c != null; c = c.getSuperclass()) {
+ validField += basicValidation(c.getDeclaredFields()).count();
+ }
+ return validField.intValue();
+ }
+
+ private static Stream<Field> basicValidation(Field[] fields) {
+ return Arrays.stream(fields)
+ .filter(cField -> !Modifier.isStatic(cField.getModifiers()))
+ .filter(cField -> !Modifier.isTransient(cField.getModifiers()));
+ }
+
+ public Map<String,Object> blindMap() {
+ Map<String,Object> toRet = new LinkedHashMap<>();
+ rowNames.entrySet().stream()
+ .sorted(Comparator.comparing(Map.Entry::getValue))
+ .forEach(entry -> toRet.put(entry.getKey(), super.getField(entry.getValue())));
+ return toRet;
+ }
+
+ public <P> P blind(Class<P> clazz) {
+ P o = createInstance(clazz);
+
+ for (Class<?> c = clazz; c != null; c = c.getSuperclass()) {
+ Field[] fields = c.getDeclaredFields();
+ for (Field cField : fields) {
+ try {
+ if(rowNames.containsKey(cField.getName())
+ && !Modifier.isStatic(cField.getModifiers())
+ && !Modifier.isTransient(cField.getModifiers())) {
+
+ cField.setAccessible(true);
+ Object value = getField(cField.getName());
+ if (value != null) {
+ if (cField.getType() == value.getClass()) {
+ cField.set(o, value);
+ } else if (cField.getType() == Long.class && value.getClass() == Date.class) {
+ cField.set(o, ((Date) value).getTime());
+ } else {
+ cField.set(o, value);
+ }
+ }
+ }
+ } catch (IllegalAccessException e) {
+ String error = String.format("Cannot get value for %s", cField.getName());
+ throw new IllegalArgumentException(error, e);
+ }
+ }
+ }
+
+ return o;
+
+ }
+
+
+ private <P> P createInstance(Class<P> clazz) {
+ try {
+ return clazz.getConstructor().newInstance();
+ } catch (ReflectiveOperationException e) {
+ String error = String.format("Cannot create instance for %s", clazz.getSimpleName());
+ throw new IllegalArgumentException(error, e);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return blindMap().toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/c760e3cf/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduTableInfo.java
----------------------------------------------------------------------
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduTableInfo.java b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduTableInfo.java
new file mode 100644
index 0000000..dfea382
--- /dev/null
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduTableInfo.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kudu.connector;
+
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.Schema;
+import org.apache.kudu.client.CreateTableOptions;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+public class KuduTableInfo implements Serializable {
+
+ private static final Integer DEFAULT_REPLICAS = 1;
+ private static final boolean DEFAULT_CREATE_IF_NOT_EXIST = false;
+
+ private Integer replicas;
+ private String name;
+ private boolean createIfNotExist;
+ private List<KuduColumnInfo> columns;
+
+ private KuduTableInfo(String name){
+ this.name = name;
+ this.replicas = DEFAULT_REPLICAS;
+ this.createIfNotExist = DEFAULT_CREATE_IF_NOT_EXIST;
+ this.columns = new ArrayList<>();
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public Schema getSchema() {
+ if(hasNotColumns()) return null;
+ List<ColumnSchema> schemaColumns = new ArrayList<>();
+ for(KuduColumnInfo column : columns){
+ schemaColumns.add(column.columnSchema());
+ }
+ return new Schema(schemaColumns);
+ }
+
+ public boolean createIfNotExist() {
+ return createIfNotExist;
+ }
+
+ public CreateTableOptions getCreateTableOptions() {
+ CreateTableOptions options = new CreateTableOptions();
+ if(replicas!=null){
+ options.setNumReplicas(replicas);
+ }
+ if(hasColummns()) {
+ List<String> rangeKeys = new ArrayList<>();
+ List<String> hashKeys = new ArrayList<>();
+ for(KuduColumnInfo column : columns){
+ if(column.isRangeKey()){
+ rangeKeys.add(column.name());
+ }
+ if(column.isHashKey()){
+ hashKeys.add(column.name());
+ }
+ }
+ options.setRangePartitionColumns(rangeKeys);
+ options.addHashPartitions(hashKeys, replicas*2);
+ }
+
+ return options;
+ }
+
+ public boolean hasNotColumns(){
+ return !hasColummns();
+ }
+ public boolean hasColummns(){
+ return (columns!=null && columns.size()>0);
+ }
+
+ public static class Builder {
+ KuduTableInfo table;
+
+ private Builder(String name) {
+ table = new KuduTableInfo(name);
+ }
+
+ public static Builder create(String name) {
+ return new Builder(name);
+ }
+
+ public static Builder open(String name) {
+ return new Builder(name);
+ }
+
+ public Builder createIfNotExist(boolean createIfNotExist) {
+ this.table.createIfNotExist = createIfNotExist;
+ return this;
+ }
+
+ public Builder replicas(int replicas) {
+ if (replicas == 0) return this;
+ this.table.replicas = replicas;
+ return this;
+ }
+
+ public Builder columns(List<KuduColumnInfo> columns) {
+ if(columns==null) return this;
+ this.table.columns.addAll(columns);
+ return this;
+ }
+
+ public Builder addColumn(KuduColumnInfo column) {
+ if(column==null) return this;
+ this.table.columns.add(column);
+ return this;
+ }
+
+ public KuduTableInfo build() {
+ return table;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/c760e3cf/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduInputFormatTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduInputFormatTest.java
new file mode 100644
index 0000000..8cfc102
--- /dev/null
+++ b/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduInputFormatTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kudu;
+
+import org.apache.flink.streaming.connectors.kudu.connector.KuduDatabase;
+import org.apache.flink.streaming.connectors.kudu.connector.KuduRow;
+import org.apache.flink.streaming.connectors.kudu.connector.KuduTableInfo;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class KuduInputFormatTest extends KuduDatabase {
+
+
+
+
+ @Test
+ public void testInvalidKuduMaster() throws IOException {
+ KuduTableInfo tableInfo = booksTableInfo("books",false);
+ Assertions.assertThrows(NullPointerException.class, () -> new KuduInputFormat(null, tableInfo));
+ }
+
+ @Test
+ public void testInvalidTableInfo() throws IOException {
+ Assertions.assertThrows(NullPointerException.class, () -> new KuduInputFormat(hostsCluster, null));
+ }
+
+ @Test
+ public void testInputFormat() throws Exception {
+ KuduTableInfo tableInfo = booksTableInfo("books",true);
+ setUpDatabase(tableInfo);
+
+ List<KuduRow> rows = readRows(tableInfo);
+ Assertions.assertEquals(5, rows.size());
+
+ cleanDatabase(tableInfo);
+ }
+
+ @Test
+ public void testInputFormatWithProjection() throws Exception {
+ KuduTableInfo tableInfo = booksTableInfo("books",true);
+ setUpDatabase(tableInfo);
+
+ List<KuduRow> rows = readRows(tableInfo,"title","id");
+ Assertions.assertEquals(5, rows.size());
+
+ for (KuduRow row: rows) {
+ Assertions.assertEquals(2, row.getArity());
+ }
+
+ cleanDatabase(tableInfo);
+ }
+
+
+ public static List<KuduRow> readRows(KuduTableInfo tableInfo, String... fieldProjection) throws Exception {
+ KuduInputFormat inputFormat = new KuduInputFormat(hostsCluster, tableInfo)
+ .withTableProjections(fieldProjection);
+
+ KuduInputFormat.KuduInputSplit[] splits = inputFormat.createInputSplits(1);
+ List<KuduRow> rows = new ArrayList<>();
+ for (KuduInputFormat.KuduInputSplit split : splits) {
+ inputFormat.open(split);
+ while(!inputFormat.reachedEnd()) {
+ KuduRow row = inputFormat.nextRecord(new KuduRow(5));
+ if(row != null) {
+ rows.add(row);
+ }
+ }
+ }
+ inputFormat.close();
+
+ return rows;
+ }
+}
http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/c760e3cf/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduOuputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduOuputFormatTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduOuputFormatTest.java
new file mode 100644
index 0000000..6eb5ebe
--- /dev/null
+++ b/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduOuputFormatTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kudu;
+
+import org.apache.flink.streaming.connectors.kudu.connector.KuduDatabase;
+import org.apache.flink.streaming.connectors.kudu.connector.KuduRow;
+import org.apache.flink.streaming.connectors.kudu.connector.KuduTableInfo;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.UUID;
+
+public class KuduOuputFormatTest extends KuduDatabase {
+
+
+
+ @Test
+ public void testInvalidKuduMaster() throws IOException {
+ KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(),false);
+ Assertions.assertThrows(NullPointerException.class, () -> new KuduOutputFormat<>(null, tableInfo));
+ }
+
+ @Test
+ public void testInvalidTableInfo() throws IOException {
+ Assertions.assertThrows(NullPointerException.class, () -> new KuduOutputFormat<>(hostsCluster, null));
+ }
+
+ @Test
+ public void testNotTableExist() throws IOException {
+ KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(),false);
+ KuduOutputFormat outputFormat = new KuduOutputFormat<>(hostsCluster, tableInfo);
+ Assertions.assertThrows(UnsupportedOperationException.class, () -> outputFormat.open(0,1));
+ }
+
+ @Test
+ public void testOutputWithStrongConsistency() throws Exception {
+
+ KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(),true);
+ KuduOutputFormat outputFormat = new KuduOutputFormat<>(hostsCluster, tableInfo)
+ .withStrongConsistency();
+ outputFormat.open(0,1);
+
+ for (KuduRow kuduRow : booksDataRow()) {
+ outputFormat.writeRecord(kuduRow);
+ }
+ outputFormat.close();
+
+ List<KuduRow> rows = KuduInputFormatTest.readRows(tableInfo);
+ Assertions.assertEquals(5, rows.size());
+
+ cleanDatabase(tableInfo);
+ }
+
+ @Test
+ public void testOutputWithEventualConsistency() throws Exception {
+
+ KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(),true);
+ KuduOutputFormat outputFormat = new KuduOutputFormat<>(hostsCluster, tableInfo)
+ .withEventualConsistency();
+ outputFormat.open(0,1);
+
+ for (KuduRow kuduRow : booksDataRow()) {
+ outputFormat.writeRecord(kuduRow);
+ }
+
+ // sleep to allow eventual consistency to finish
+ Thread.sleep(1000);
+
+ outputFormat.close();
+
+ List<KuduRow> rows = KuduInputFormatTest.readRows(tableInfo);
+ Assertions.assertEquals(5, rows.size());
+
+ cleanDatabase(tableInfo);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/c760e3cf/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduSinkTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduSinkTest.java
new file mode 100644
index 0000000..9e9ae93
--- /dev/null
+++ b/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduSinkTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kudu;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.connectors.kudu.connector.KuduDatabase;
+import org.apache.flink.streaming.connectors.kudu.connector.KuduRow;
+import org.apache.flink.streaming.connectors.kudu.connector.KuduTableInfo;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.UUID;
+
+public class KuduSinkTest extends KuduDatabase {
+
+
+ @Test
+ public void testInvalidKuduMaster() throws IOException {
+ KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(),false);
+ Assertions.assertThrows(NullPointerException.class, () -> new KuduOutputFormat<>(null, tableInfo));
+ }
+
+ @Test
+ public void testInvalidTableInfo() throws IOException {
+ Assertions.assertThrows(NullPointerException.class, () -> new KuduOutputFormat<>(hostsCluster, null));
+ }
+
+ @Test
+ public void testNotTableExist() throws IOException {
+ KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(),false);
+ KuduSink sink = new KuduSink<>(hostsCluster, tableInfo);
+ Assertions.assertThrows(UnsupportedOperationException.class, () -> sink.open(new Configuration()));
+ }
+
+ @Test
+ public void testOutputWithStrongConsistency() throws Exception {
+
+ KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(),true);
+ KuduSink sink = new KuduSink<>(hostsCluster, tableInfo)
+ .withStrongConsistency();
+ sink.open(new Configuration());
+
+ for (KuduRow kuduRow : booksDataRow()) {
+ sink.invoke(kuduRow);
+ }
+ sink.close();
+
+ List<KuduRow> rows = KuduInputFormatTest.readRows(tableInfo);
+ Assertions.assertEquals(5, rows.size());
+
+ }
+
+ @Test
+ public void testOutputWithEventualConsistency() throws Exception {
+ KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(),true);
+ KuduSink sink = new KuduSink<>(hostsCluster, tableInfo)
+ .withEventualConsistency();
+ sink.open(new Configuration());
+
+ for (KuduRow kuduRow : booksDataRow()) {
+ sink.invoke(kuduRow);
+ }
+
+ // sleep to allow eventual consistency to finish
+ Thread.sleep(1000);
+
+ sink.close();
+
+ List<KuduRow> rows = KuduInputFormatTest.readRows(tableInfo);
+ Assertions.assertEquals(5, rows.size());
+ }
+
+}