You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bahir.apache.org by lr...@apache.org on 2018/09/14 02:34:58 UTC

[2/2] bahir-flink git commit: [BAHIR-99] kudu connector

[BAHIR-99] kudu connector


Project: http://git-wip-us.apache.org/repos/asf/bahir-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/bahir-flink/commit/c760e3cf
Tree: http://git-wip-us.apache.org/repos/asf/bahir-flink/tree/c760e3cf
Diff: http://git-wip-us.apache.org/repos/asf/bahir-flink/diff/c760e3cf

Branch: refs/heads/master
Commit: c760e3cfbd23c6c550800a07ef652e7f28e3f213
Parents: ca795cc
Author: Joao Boto <bo...@boto.pro>
Authored: Wed Jul 25 20:17:36 2018 +0200
Committer: Luciano Resende <lr...@apache.org>
Committed: Thu Sep 13 19:32:13 2018 -0700

----------------------------------------------------------------------
 .travis.yml                                     |  21 +-
 flink-connector-kudu/README.md                  |  98 +++++++++
 flink-connector-kudu/dockers/docker-compose.yml |  92 ++++++++
 flink-connector-kudu/dockers/role/Dockerfile    |  41 ++++
 .../dockers/role/docker-entrypoint.sh           |  69 ++++++
 flink-connector-kudu/dockers/run_kudu_tests.sh  |  68 ++++++
 flink-connector-kudu/dockers/start-images.sh    |  42 ++++
 flink-connector-kudu/dockers/stop-images.sh     |  33 +++
 flink-connector-kudu/pom.xml                    | 103 +++++++++
 .../connectors/kudu/KuduInputFormat.java        | 218 +++++++++++++++++++
 .../connectors/kudu/KuduOutputFormat.java       | 110 ++++++++++
 .../streaming/connectors/kudu/KuduSink.java     | 106 +++++++++
 .../kudu/connector/KuduColumnInfo.java          | 161 ++++++++++++++
 .../kudu/connector/KuduConnector.java           | 133 +++++++++++
 .../kudu/connector/KuduFilterInfo.java          | 173 +++++++++++++++
 .../connectors/kudu/connector/KuduMapper.java   | 146 +++++++++++++
 .../connectors/kudu/connector/KuduRow.java      | 137 ++++++++++++
 .../kudu/connector/KuduTableInfo.java           | 133 +++++++++++
 .../connectors/kudu/KuduInputFormatTest.java    |  91 ++++++++
 .../connectors/kudu/KuduOuputFormatTest.java    |  93 ++++++++
 .../streaming/connectors/kudu/KuduSinkTest.java |  89 ++++++++
 .../connectors/kudu/connector/KuduDatabase.java |  89 ++++++++
 pom.xml                                         |   3 +-
 23 files changed, 2247 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/c760e3cf/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index 083e75d..691667c 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -28,20 +28,39 @@ before_cache:
 
 language: java
 
+services:
+  - docker
+
 matrix:
   include:
     - jdk: oraclejdk8
       env:
         - FLINK_VERSION="1.5.1" SCALA_VERSION="2.11"
+        - MAVEN_PROFILE="default"
         - CACHE_NAME=JDK8_F130_A
     - jdk: openjdk8
       env:
         - FLINK_VERSION="1.5.1" SCALA_VERSION="2.11"
         - CACHE_NAME=JDK8_F130_C
+        - MAVEN_PROFILE="default"
+        - CACHE_NAME=JDK8_F130_B
+    - jdk: openjdk8
+      env:
+        - FLINK_VERSION="1.5.1" SCALA_VERSION="2.11"
+        - MAVEN_PROFILE="test-kudu"
+        - CACHE_NAME=JDK8_F130_KUDU
 
 before_install:
   - ./dev/change-scala-version.sh $SCALA_VERSION
 
 install: true
 
-script: mvn clean verify -Pscala-$SCALA_VERSION -Dflink.version=$FLINK_VERSION
+script:
+  - |
+    if [[ $MAVEN_PROFILE == "default" ]]; then
+      mvn clean verify -Pscala-$SCALA_VERSION -Dflink.version=$FLINK_VERSION
+    fi
+  - |
+    if [[ $MAVEN_PROFILE == "test-kudu" ]]; then
+      flink-connector-kudu/dockers/run_kudu_tests.sh
+    fi

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/c760e3cf/flink-connector-kudu/README.md
----------------------------------------------------------------------
diff --git a/flink-connector-kudu/README.md b/flink-connector-kudu/README.md
new file mode 100644
index 0000000..af2985b
--- /dev/null
+++ b/flink-connector-kudu/README.md
@@ -0,0 +1,98 @@
+# Flink Kudu Connector
+
+This connector provides a source (```KuduInputFormat```) and a sink/output (```KuduSink``` and ```KuduOutputFormat```, respectively) that can read and write to [Kudu](https://kudu.apache.org/). To use this connector, add the
+following dependency to your project:
+
+    <dependency>
+      <groupId>org.apache.bahir</groupId>
+      <artifactId>flink-connector-kudu_2.11</artifactId>
+      <version>1.1-SNAPSHOT</version>
+    </dependency>
+
+*Version Compatibility*: This module is compatible with Apache Kudu *1.7.1* (last stable version).
+
+Note that the streaming connectors are not part of the binary distribution of Flink. You need to link them into your job jar for cluster execution.
+See how to link with them for cluster execution [here](https://ci.apache.org/projects/flink/flink-docs-stable/start/dependencies.html).
+
+## Installing Kudu
+
+Follow the instructions from the [Kudu Installation Guide](https://kudu.apache.org/docs/installation.html).
+Optionally, you can use the docker images provided in dockers folder. 
+
+## KuduInputFormat
+
+```
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+env.setParallelism(PARALLELISM);
+
+// create a table info object
+KuduTableInfo tableInfo = KuduTableInfo.Builder
+        .create("books")
+        .addColumn(KuduColumnInfo.Builder.create("id", Type.INT32).key(true).hashKey(true).build())
+        .addColumn(KuduColumnInfo.Builder.create("title", Type.STRING).build())
+        .addColumn(KuduColumnInfo.Builder.create("author", Type.STRING).build())
+        .addColumn(KuduColumnInfo.Builder.create("price", Type.DOUBLE).build())
+        .addColumn(KuduColumnInfo.Builder.create("quantity", Type.INT32).build())
+        .build();
+    
+// Pass the tableInfo to the KuduInputFormat and provide kuduMaster ips
+env.createInput(new KuduInputFormat<>("172.25.0.6", tableInfo))
+        .count();
+        
+env.execute();
+```
+
+## KuduOutputFormat
+
+```
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+env.setParallelism(PARALLELISM);
+
+// create a table info object
+KuduTableInfo tableInfo = KuduTableInfo.Builder
+        .create("books")
+        .createIfNotExist(true)
+        .replicas(1)
+        .addColumn(KuduColumnInfo.Builder.create("id", Type.INT32).key(true).hashKey(true).build())
+        .addColumn(KuduColumnInfo.Builder.create("title", Type.STRING).build())
+        .addColumn(KuduColumnInfo.Builder.create("author", Type.STRING).build())
+        .addColumn(KuduColumnInfo.Builder.create("price", Type.DOUBLE).build())
+        .addColumn(KuduColumnInfo.Builder.create("quantity", Type.INT32).build())
+        .build();
+
+...
+
+env.fromCollection(books)
+        .output(new KuduOutputFormat<>("172.25.0.6", tableInfo));
+
+env.execute();
+```
+
+## KuduSink
+
+```
+StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+env.setParallelism(PARALLELISM);
+
+// create a table info object
+KuduTableInfo tableInfo = KuduTableInfo.Builder
+        .create("books")
+        .createIfNotExist(true)
+        .replicas(1)
+        .addColumn(KuduColumnInfo.Builder.create("id", Type.INT32).key(true).hashKey(true).build())
+        .addColumn(KuduColumnInfo.Builder.create("title", Type.STRING).build())
+        .addColumn(KuduColumnInfo.Builder.create("author", Type.STRING).build())
+        .addColumn(KuduColumnInfo.Builder.create("price", Type.DOUBLE).build())
+        .addColumn(KuduColumnInfo.Builder.create("quantity", Type.INT32).build())
+        .build();
+
+...
+
+env.fromCollection(books)
+    .addSink(new KuduSink<>("172.25.0.6", tableInfo));
+
+env.execute();
+```

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/c760e3cf/flink-connector-kudu/dockers/docker-compose.yml
----------------------------------------------------------------------
diff --git a/flink-connector-kudu/dockers/docker-compose.yml b/flink-connector-kudu/dockers/docker-compose.yml
new file mode 100644
index 0000000..d2c95bb
--- /dev/null
+++ b/flink-connector-kudu/dockers/docker-compose.yml
@@ -0,0 +1,92 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+version: '2'
+
+services:
+
+  kudu-master:
+    image: eskabetxe/kudu
+    container_name: kudu-master
+    hostname: 172.25.0.6
+    ports:
+      - "8051:8051"
+    volumes:
+      - /var/lib/kudu/master
+    command: master
+    networks:
+      mynet:
+        ipv4_address: 172.25.0.6
+
+  kudu-server1:
+    image: eskabetxe/kudu
+    container_name: kudu-server1
+    hostname: 172.25.0.7
+    environment:
+      - KUDU_MASTER=172.25.0.6
+    ports:
+      - "8054:8050"
+    volumes:
+      - /var/lib/kudu/server
+    command: tserver
+    networks:
+      mynet:
+        ipv4_address: 172.25.0.7
+    links:
+      - kudu-master
+
+  kudu-server2:
+    image: eskabetxe/kudu
+    container_name: kudu-server2
+    hostname: 172.25.0.8
+    environment:
+      - KUDU_MASTER=172.25.0.6
+    ports:
+      - "8052:8050"
+    volumes:
+      - /var/lib/kudu/server
+    command: tserver
+    networks:
+      mynet:
+        ipv4_address: 172.25.0.8
+    links:
+      - kudu-master
+
+  kudu-server3:
+    image: eskabetxe/kudu
+    container_name: kudu-server3
+    hostname: 172.25.0.9
+    environment:
+      - KUDU_MASTER=172.25.0.6
+    ports:
+      - "8053:8050"
+    volumes:
+      - /var/lib/kudu/server
+    command: tserver
+    networks:
+      mynet:
+        ipv4_address: 172.25.0.9
+    links:
+      - kudu-master
+
+networks:
+  mynet:
+    driver: bridge
+    ipam:
+      config:
+        - subnet: 172.25.0.0/24
+          IPRange: 172.25.0.2/24,
+          gateway: 172.25.0.1

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/c760e3cf/flink-connector-kudu/dockers/role/Dockerfile
----------------------------------------------------------------------
diff --git a/flink-connector-kudu/dockers/role/Dockerfile b/flink-connector-kudu/dockers/role/Dockerfile
new file mode 100644
index 0000000..b14b087
--- /dev/null
+++ b/flink-connector-kudu/dockers/role/Dockerfile
@@ -0,0 +1,41 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+FROM bitnami/minideb:jessie
+MAINTAINER eskabetxe
+
+RUN set -x \
+    && apt-get update \
+    && apt-get install -y --no-install-recommends \
+	   bzip2 unzip xz-utils wget \
+    && cd /etc/apt/sources.list.d \
+    && wget -qO - http://archive.cloudera.com/kudu/debian/jessie/amd64/kudu/archive.key | apt-key add - \
+    && wget http://archive.cloudera.com/kudu/debian/jessie/amd64/kudu/cloudera.list \
+    && apt-get update \
+    && apt-get install  --no-install-recommends -y \
+       kudu kudu-master kudu-tserver libkuduclient0 libkuduclient-dev \
+    && rm -rf /var/lib/apt/lists/* \
+    && apt-get autoclean
+
+VOLUME /var/lib/kudu/master /var/lib/kudu/tserver
+
+COPY docker-entrypoint.sh /
+RUN chmod a+x /docker-entrypoint.sh
+
+ENTRYPOINT ["/docker-entrypoint.sh"]
+EXPOSE 8050 8051 7050 7051
+#CMD ["help"]

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/c760e3cf/flink-connector-kudu/dockers/role/docker-entrypoint.sh
----------------------------------------------------------------------
diff --git a/flink-connector-kudu/dockers/role/docker-entrypoint.sh b/flink-connector-kudu/dockers/role/docker-entrypoint.sh
new file mode 100644
index 0000000..770850c
--- /dev/null
+++ b/flink-connector-kudu/dockers/role/docker-entrypoint.sh
@@ -0,0 +1,69 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+set -e
+
+function do_help {
+  echo HELP:
+  echo "Supported commands:"
+  echo "   master              - Start a Kudu Master"
+  echo "   tserver             - Start a Kudu TServer"
+  echo "   single              - Start a Kudu Master+TServer in one container"
+  echo "   kudu                - Run the Kudu CLI"
+  echo "   help                - print useful information and exit"l
+  echo ""
+  echo "Other commands can be specified to run shell commands."
+  echo "Set the environment variable KUDU_OPTS to pass additional"
+  echo "arguments to the kudu process. DEFAULT_KUDU_OPTS contains"
+  echo "a recommended base set of options."
+
+  exit 0
+}
+
+DEFAULT_KUDU_OPTS="-logtostderr \
+ -fs_wal_dir=/var/lib/kudu/$1 \
+ -fs_data_dirs=/var/lib/kudu/$1 \
+ -use_hybrid_clock=false"
+
+KUDU_OPTS=${KUDU_OPTS:-${DEFAULT_KUDU_OPTS}}
+
+if [ "$1" = 'master' ]; then
+  exec kudu-master -fs_wal_dir /var/lib/kudu/master ${KUDU_OPTS}
+elif [ "$1" = 'tserver' ]; then
+  exec kudu-tserver -fs_wal_dir /var/lib/kudu/tserver \
+  -tserver_master_addrs ${KUDU_MASTER} ${KUDU_OPTS}
+elif [ "$1" = 'single' ]; then
+  KUDU_MASTER=boot2docker
+  KUDU_MASTER_OPTS="-logtostderr \
+   -fs_wal_dir=/var/lib/kudu/master \
+   -fs_data_dirs=/var/lib/kudu/master \
+   -use_hybrid_clock=false"
+  KUDU_TSERVER_OPTS="-logtostderr \
+   -fs_wal_dir=/var/lib/kudu/tserver \
+   -fs_data_dirs=/var/lib/kudu/tserver \
+   -use_hybrid_clock=false"
+  exec kudu-master -fs_wal_dir /var/lib/kudu/master ${KUDU_MASTER_OPTS} &
+  sleep 5
+  exec kudu-tserver -fs_wal_dir /var/lib/kudu/tserver -tserver_master_addrs ${KUDU_MASTER} ${KUDU_TSERVER_OPTS}
+elif [ "$1" = 'kudu' ]; then
+  shift; # Remove first arg and pass remainder to kudu cli
+  exec kudu "$@"
+elif [ "$1" = 'help' ]; then
+  do_help
+fi
+
+exec "$@"
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/c760e3cf/flink-connector-kudu/dockers/run_kudu_tests.sh
----------------------------------------------------------------------
diff --git a/flink-connector-kudu/dockers/run_kudu_tests.sh b/flink-connector-kudu/dockers/run_kudu_tests.sh
new file mode 100755
index 0000000..58593d6
--- /dev/null
+++ b/flink-connector-kudu/dockers/run_kudu_tests.sh
@@ -0,0 +1,68 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+#
+# Runs all tests with Kudu server in docker containers.
+
+set -euo pipefail -x
+
+# http://stackoverflow.com/questions/3572030/bash-script-absolute-path-with-osx
+function absolutepath() {
+    [[ $1 = /* ]] && echo "$1" || echo "$PWD/${1#./}"
+}
+
+SCRIPT_DIR=$(dirname $(absolutepath "$0"))
+
+PROJECT_ROOT="${SCRIPT_DIR}/../.."
+
+DOCKER_COMPOSE_LOCATION="${SCRIPT_DIR}/docker-compose.yml"
+
+
+function build_image() {
+   docker build -t eskabetxe/kudu ${SCRIPT_DIR}/role
+
+   #docker-compose build -f "${DOCKER_COMPOSE_LOCATION}"
+}
+
+function start_docker_container() {
+  # stop already running containers
+  #docker-compose -f "${DOCKER_COMPOSE_LOCATION}" down || true
+
+  # start containers
+  docker-compose -f "${DOCKER_COMPOSE_LOCATION}" up -d
+}
+
+function cleanup_docker_container() {
+  docker-compose -f "${DOCKER_COMPOSE_LOCATION}" down
+  #true
+}
+
+build_image
+
+start_docker_container
+
+#run product tests
+pushd ${PROJECT_ROOT}
+set +e
+mvn test -pl flink-connector-kudu -P test-kudu
+EXIT_CODE=$?
+set -e
+popd
+
+cleanup_docker_container
+
+exit ${EXIT_CODE}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/c760e3cf/flink-connector-kudu/dockers/start-images.sh
----------------------------------------------------------------------
diff --git a/flink-connector-kudu/dockers/start-images.sh b/flink-connector-kudu/dockers/start-images.sh
new file mode 100755
index 0000000..fad3de6
--- /dev/null
+++ b/flink-connector-kudu/dockers/start-images.sh
@@ -0,0 +1,42 @@
+#!/usr/bin/env bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+function absolutepath() {
+    [[ $1 = /* ]] && echo "$1" || echo "$PWD/${1#./}"
+}
+
+function build_image() {
+   docker build -t eskabetxe/kudu ${SCRIPT_DIR}/role
+
+   #docker-compose build -f "${DOCKER_COMPOSE_LOCATION}"
+}
+
+function start_docker_container() {
+  # stop already running containers
+  docker-compose -f "${DOCKER_COMPOSE_LOCATION}" down || true
+
+  # start containers
+  docker-compose -f "${DOCKER_COMPOSE_LOCATION}" up -d
+}
+
+SCRIPT_DIR=$(dirname $(absolutepath "$0"))
+DOCKER_COMPOSE_LOCATION="${SCRIPT_DIR}/docker-compose.yml"
+
+build_image
+
+start_docker_container

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/c760e3cf/flink-connector-kudu/dockers/stop-images.sh
----------------------------------------------------------------------
diff --git a/flink-connector-kudu/dockers/stop-images.sh b/flink-connector-kudu/dockers/stop-images.sh
new file mode 100755
index 0000000..9ae52c1
--- /dev/null
+++ b/flink-connector-kudu/dockers/stop-images.sh
@@ -0,0 +1,33 @@
+#!/usr/bin/env bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+function absolutepath() {
+    [[ $1 = /* ]] && echo "$1" || echo "$PWD/${1#./}"
+}
+
+function cleanup_docker_container() {
+  docker-compose -f "${DOCKER_COMPOSE_LOCATION}" down
+
+  docker rm eskabetxe/kudu
+}
+
+SCRIPT_DIR=$(dirname $(absolutepath "$0"))
+DOCKER_COMPOSE_LOCATION="${SCRIPT_DIR}/docker-compose.yml"
+
+cleanup_docker_container
+

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/c760e3cf/flink-connector-kudu/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connector-kudu/pom.xml b/flink-connector-kudu/pom.xml
new file mode 100644
index 0000000..348371b
--- /dev/null
+++ b/flink-connector-kudu/pom.xml
@@ -0,0 +1,103 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.bahir</groupId>
+    <artifactId>bahir-flink-parent_2.11</artifactId>
+    <version>1.1-SNAPSHOT</version>
+    <relativePath>..</relativePath>
+  </parent>
+
+  <artifactId>flink-connector-kudu_2.11</artifactId>
+  <name>flink-connector-kudu</name>
+  <packaging>jar</packaging>
+
+  <properties>
+    <kudu.version>1.7.1</kudu.version>
+    <junit.version>5.2.0</junit.version>
+  </properties>
+
+  <dependencies>
+
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
+      <version>${flink.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.kudu</groupId>
+      <artifactId>kudu-client</artifactId>
+      <version>${kudu.version}</version>
+    </dependency>
+
+    <!--test dependencies-->
+
+    <dependency>
+      <groupId>org.apache.kudu</groupId>
+      <artifactId>kudu-client</artifactId>
+      <version>${kudu.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.junit.jupiter</groupId>
+      <artifactId>junit-jupiter-api</artifactId>
+      <version>${junit.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+  </dependencies>
+
+  <profiles>
+    <profile>
+      <id>default</id>
+      <activation>
+        <activeByDefault>true</activeByDefault>
+      </activation>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-surefire-plugin</artifactId>
+            <configuration>
+              <excludes>
+                <exclude>**/*Test.java</exclude>
+              </excludes>
+            </configuration>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+    <profile>
+      <id>test-kudu</id>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-surefire-plugin</artifactId>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+  </profiles>
+
+</project>

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/c760e3cf/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduInputFormat.java b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduInputFormat.java
new file mode 100644
index 0000000..617e317
--- /dev/null
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduInputFormat.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kudu;
+
+import org.apache.flink.api.common.io.LocatableInputSplitAssigner;
+import org.apache.flink.api.common.io.RichInputFormat;
+import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.InputSplitAssigner;
+import org.apache.flink.core.io.LocatableInputSplit;
+import org.apache.flink.streaming.connectors.kudu.connector.*;
+import org.apache.flink.util.Preconditions;
+import org.apache.kudu.client.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class KuduInputFormat extends RichInputFormat<KuduRow, KuduInputFormat.KuduInputSplit> {
+
+    private String kuduMasters;
+    private KuduTableInfo tableInfo;
+    private List<KuduFilterInfo> tableFilters;
+    private List<String> tableProjections;
+    private Long rowsLimit;
+    private boolean endReached;
+
+    private transient KuduConnector tableContext;
+    private transient KuduScanner scanner;
+    private transient RowResultIterator resultIterator;
+
+    private static final Logger LOG = LoggerFactory.getLogger(KuduInputFormat.class);
+
+    public KuduInputFormat(String kuduMasters, KuduTableInfo tableInfo) {
+        Preconditions.checkNotNull(kuduMasters,"kuduMasters could not be null");
+        this.kuduMasters = kuduMasters;
+
+        Preconditions.checkNotNull(tableInfo,"tableInfo could not be null");
+        this.tableInfo = tableInfo;
+
+        this.endReached = false;
+    }
+
+    public KuduInputFormat withTableFilters(KuduFilterInfo... tableFilters) {
+        return withTableFilters(Arrays.asList(tableFilters));
+    }
+
+    public KuduInputFormat withTableFilters(List<KuduFilterInfo> tableFilters) {
+        this.tableFilters = tableFilters;
+        return this;
+    }
+
+    public KuduInputFormat withTableProjections(String... tableProjections) {
+        return withTableProjections(Arrays.asList(tableProjections));
+    }
+    public KuduInputFormat withTableProjections(List<String> tableProjections) {
+        this.tableProjections = tableProjections;
+        return this;
+    }
+
+    public KuduInputFormat withRowsLimit(Long rowsLimit) {
+        this.rowsLimit = rowsLimit;
+        return this;
+    }
+
+    @Override
+    public void configure(Configuration parameters) {
+
+    }
+
+    @Override
+    public void open(KuduInputSplit split) throws IOException {
+        endReached = false;
+        startTableContext();
+
+        scanner = tableContext.scanner(split.getScanToken());
+        resultIterator = scanner.nextRows();
+    }
+
+    @Override
+    public void close() {
+        if (scanner != null) {
+            try {
+                scanner.close();
+            } catch (KuduException e) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+    @Override
+    public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException {
+        return cachedStatistics;
+    }
+
+    @Override
+    public InputSplitAssigner getInputSplitAssigner(KuduInputSplit[] inputSplits) {
+        return new LocatableInputSplitAssigner(inputSplits);
+    }
+
+    private void startTableContext() throws IOException {
+        if (tableContext == null) {
+            tableContext = new KuduConnector(kuduMasters, tableInfo);
+        }
+    }
+
+    @Override
+    public KuduInputSplit[] createInputSplits(int minNumSplits) throws IOException {
+        startTableContext();
+        Preconditions.checkNotNull(tableContext,"tableContext should not be null");
+
+        List<KuduScanToken> tokens = tableContext.scanTokens(tableFilters, tableProjections, rowsLimit);
+
+        KuduInputSplit[] splits = new KuduInputSplit[tokens.size()];
+
+        for (int i = 0; i < tokens.size(); i++) {
+            KuduScanToken token = tokens.get(i);
+
+            List<String> locations = new ArrayList<>(token.getTablet().getReplicas().size());
+
+            for (LocatedTablet.Replica replica : token.getTablet().getReplicas()) {
+                locations.add(getLocation(replica.getRpcHost(), replica.getRpcPort()));
+            }
+
+            KuduInputSplit split = new KuduInputSplit(
+                    token.serialize(),
+                    i,
+                    locations.toArray(new String[locations.size()])
+            );
+            splits[i] = split;
+        }
+
+        if (splits.length < minNumSplits) {
+            LOG.warn(" The minimum desired number of splits with your configured parallelism level " +
+                            "is {}. Current kudu splits = {}. {} instances will remain idle.",
+                    minNumSplits,
+                    splits.length,
+                    (minNumSplits - splits.length)
+            );
+        }
+
+        return splits;
+    }
+
+    @Override
+    public boolean reachedEnd() throws IOException {
+        return endReached;
+    }
+
+    @Override
+    public KuduRow nextRecord(KuduRow reuse) throws IOException {
+        // check that current iterator has next rows
+        if (this.resultIterator.hasNext()) {
+            RowResult row = this.resultIterator.next();
+            return KuduMapper.toKuduRow(row);
+        }
+        // if not, check that current scanner has more iterators
+        else if (scanner.hasMoreRows()) {
+            this.resultIterator = scanner.nextRows();
+            return nextRecord(reuse);
+        }
+        else {
+            endReached = true;
+        }
+        return null;
+    }
+
+    /**
+     * Returns a endpoint url in the following format: <host>:<ip>
+     *
+     * @param host Hostname
+     * @param port Port
+     * @return Formatted URL
+     */
+    private String getLocation(String host, Integer port) {
+        StringBuilder builder = new StringBuilder();
+        builder.append(host).append(":").append(port);
+        return builder.toString();
+    }
+
+
+    public class KuduInputSplit extends LocatableInputSplit {
+
+        private byte[] scanToken;
+
+        /**
+         * Creates a new KuduInputSplit
+         * @param splitNumber the number of the input split
+         * @param hostnames The names of the hosts storing the data this input split refers to.
+         */
+        public KuduInputSplit(byte[] scanToken, final int splitNumber, final String[] hostnames) {
+            super(splitNumber, hostnames);
+
+            this.scanToken = scanToken;
+        }
+
+        public byte[] getScanToken() {
+            return scanToken;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/c760e3cf/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduOutputFormat.java b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduOutputFormat.java
new file mode 100644
index 0000000..5c23f36
--- /dev/null
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduOutputFormat.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kudu;
+
+import org.apache.flink.api.common.io.OutputFormat;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.connectors.kudu.connector.KuduConnector;
+import org.apache.flink.streaming.connectors.kudu.connector.KuduRow;
+import org.apache.flink.streaming.connectors.kudu.connector.KuduTableInfo;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+public class KuduOutputFormat<OUT extends KuduRow> implements OutputFormat<OUT> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(KuduOutputFormat.class);
+
+    private String kuduMasters;
+    private KuduTableInfo tableInfo;
+    private KuduConnector.Consistency consistency;
+    private KuduConnector.WriteMode writeMode;
+
+    private transient KuduConnector tableContext;
+
+
+    public KuduOutputFormat(String kuduMasters, KuduTableInfo tableInfo) {
+        Preconditions.checkNotNull(kuduMasters,"kuduMasters could not be null");
+        this.kuduMasters = kuduMasters;
+
+        Preconditions.checkNotNull(tableInfo,"tableInfo could not be null");
+        this.tableInfo = tableInfo;
+        this.consistency = KuduConnector.Consistency.STRONG;
+        this.writeMode = KuduConnector.WriteMode.UPSERT;
+    }
+
+    public KuduOutputFormat<OUT> withEventualConsistency() {
+        this.consistency = KuduConnector.Consistency.EVENTUAL;
+        return this;
+    }
+
+    public KuduOutputFormat<OUT> withStrongConsistency() {
+        this.consistency = KuduConnector.Consistency.STRONG;
+        return this;
+    }
+
+    public KuduOutputFormat<OUT> withUpsertWriteMode() {
+        this.writeMode = KuduConnector.WriteMode.UPSERT;
+        return this;
+    }
+
+    public KuduOutputFormat<OUT> withInsertWriteMode() {
+        this.writeMode = KuduConnector.WriteMode.INSERT;
+        return this;
+    }
+
+    public KuduOutputFormat<OUT> withUpdateWriteMode() {
+        this.writeMode = KuduConnector.WriteMode.UPDATE;
+        return this;
+    }
+
+    @Override
+    public void configure(Configuration parameters) {
+
+    }
+
+    @Override
+    public void open(int taskNumber, int numTasks) throws IOException {
+        startTableContext();
+    }
+
+    private void startTableContext() throws IOException {
+        if (tableContext != null) return;
+        tableContext = new KuduConnector(kuduMasters, tableInfo);
+    }
+
+    @Override
+    public void writeRecord(OUT kuduRow) throws IOException {
+        try {
+            tableContext.writeRow(kuduRow, consistency, writeMode);
+        } catch (Exception e) {
+            throw new IOException(e.getLocalizedMessage(), e);
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (this.tableContext == null) return;
+        try {
+            this.tableContext.close();
+        } catch (Exception e) {
+            throw new IOException(e.getLocalizedMessage(), e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/c760e3cf/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduSink.java
----------------------------------------------------------------------
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduSink.java b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduSink.java
new file mode 100644
index 0000000..120d5c5
--- /dev/null
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduSink.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kudu;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.connectors.kudu.connector.KuduConnector;
+import org.apache.flink.streaming.connectors.kudu.connector.KuduRow;
+import org.apache.flink.streaming.connectors.kudu.connector.KuduTableInfo;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+public class KuduSink<OUT extends KuduRow> extends RichSinkFunction<OUT> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(KuduOutputFormat.class);
+
+    private String kuduMasters;
+    private KuduTableInfo tableInfo;
+    private KuduConnector.Consistency consistency;
+    private KuduConnector.WriteMode writeMode;
+
+    private transient KuduConnector tableContext;
+
+
+    public KuduSink(String kuduMasters, KuduTableInfo tableInfo) {
+        Preconditions.checkNotNull(kuduMasters,"kuduMasters could not be null");
+        this.kuduMasters = kuduMasters;
+
+        Preconditions.checkNotNull(tableInfo,"tableInfo could not be null");
+        this.tableInfo = tableInfo;
+        this.consistency = KuduConnector.Consistency.STRONG;
+        this.writeMode = KuduConnector.WriteMode.UPSERT;
+    }
+
+    public KuduSink<OUT> withEventualConsistency() {
+        this.consistency = KuduConnector.Consistency.EVENTUAL;
+        return this;
+    }
+
+    public KuduSink<OUT> withStrongConsistency() {
+        this.consistency = KuduConnector.Consistency.STRONG;
+        return this;
+    }
+
+    public KuduSink<OUT> withUpsertWriteMode() {
+        this.writeMode = KuduConnector.WriteMode.UPSERT;
+        return this;
+    }
+
+    public KuduSink<OUT> withInsertWriteMode() {
+        this.writeMode = KuduConnector.WriteMode.INSERT;
+        return this;
+    }
+
+    public KuduSink<OUT> withUpdateWriteMode() {
+        this.writeMode = KuduConnector.WriteMode.UPDATE;
+        return this;
+    }
+
+    @Override
+    public void open(Configuration parameters) throws IOException {
+        startTableContext();
+    }
+
+    private void startTableContext() throws IOException {
+        if (tableContext != null) return;
+        tableContext = new KuduConnector(kuduMasters, tableInfo);
+    }
+
+
+    @Override
+    public void invoke(OUT kuduRow) throws Exception {
+        try {
+            tableContext.writeRow(kuduRow, consistency, writeMode);
+        } catch (Exception e) {
+            throw new IOException(e.getLocalizedMessage(), e);
+        }
+    }
+
+    @Override
+    public void close() throws Exception {
+        if (this.tableContext == null) return;
+        try {
+            this.tableContext.close();
+        } catch (Exception e) {
+            throw new IOException(e.getLocalizedMessage(), e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/c760e3cf/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduColumnInfo.java
----------------------------------------------------------------------
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduColumnInfo.java b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduColumnInfo.java
new file mode 100644
index 0000000..4dfc0b8
--- /dev/null
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduColumnInfo.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kudu.connector;
+
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.Type;
+
+import java.io.Serializable;
+
+public class KuduColumnInfo implements Serializable {
+
+    private String name;
+    private Type type;
+    private boolean key;
+    private boolean rangeKey;
+    private boolean hashKey;
+    private boolean nullable;
+    private Object defaultValue;
+    private int blockSize;
+    private Encoding encoding;
+    private Compression compression;
+
+    private KuduColumnInfo(String name, Type type) {
+        this.name = name;
+        this.type = type;
+        this.blockSize = 0;
+        this.key = false;
+        this.rangeKey = false;
+        this.hashKey = false;
+        this.nullable = false;
+        this.defaultValue = null;
+        this.encoding = Encoding.AUTO;
+        this.compression = Compression.DEFAULT;
+    }
+
+    protected String name() {
+        return name;
+    }
+
+    protected boolean isRangeKey() {
+        return rangeKey;
+    }
+
+    protected boolean isHashKey() {
+        return hashKey;
+    }
+
+    protected ColumnSchema columnSchema() {
+        return new ColumnSchema.ColumnSchemaBuilder(name, type)
+                    .key(key)
+                    .nullable(nullable)
+                    .defaultValue(defaultValue)
+                    .desiredBlockSize(blockSize)
+                    .encoding(encoding.encode)
+                    .compressionAlgorithm(compression.algorithm)
+                    .build();
+    }
+
+    public static class Builder {
+        private KuduColumnInfo column;
+
+        private Builder(String name, Type type) {
+            this.column = new KuduColumnInfo(name, type);
+        }
+
+        public static Builder create(String name, Type type) {
+            return new Builder(name, type);
+        }
+
+        public Builder key(boolean key) {
+            this.column.key = key;
+            return this;
+        }
+
+        public Builder rangeKey(boolean rangeKey) {
+            this.column.rangeKey = rangeKey;
+            return this;
+        }
+
+        public Builder hashKey(boolean hashKey) {
+            this.column.hashKey = hashKey;
+            return this;
+        }
+
+        public Builder nullable(boolean nullable) {
+            this.column.nullable = nullable;
+            return this;
+        }
+
+        public Builder defaultValue(Object defaultValue) {
+            this.column.defaultValue = defaultValue;
+            return this;
+        }
+
+        public Builder desiredBlockSize(int blockSize) {
+            this.column.blockSize = blockSize;
+            return this;
+        }
+
+        public Builder encoding(Encoding encoding) {
+            this.column.encoding = encoding;
+            return this;
+        }
+
+        public Builder compressionAlgorithm(Compression compression) {
+            this.column.compression = compression;
+            return this;
+        }
+
+        public KuduColumnInfo build() {
+            return column;
+        }
+    }
+
+    public enum Compression {
+        UNKNOWN(ColumnSchema.CompressionAlgorithm.UNKNOWN),
+        DEFAULT(ColumnSchema.CompressionAlgorithm.DEFAULT_COMPRESSION),
+        WITHOUT(ColumnSchema.CompressionAlgorithm.NO_COMPRESSION),
+        SNAPPY(ColumnSchema.CompressionAlgorithm.SNAPPY),
+        LZ4(ColumnSchema.CompressionAlgorithm.LZ4),
+        ZLIB(ColumnSchema.CompressionAlgorithm.ZLIB);
+
+        final ColumnSchema.CompressionAlgorithm algorithm;
+
+        Compression(ColumnSchema.CompressionAlgorithm algorithm) {
+            this.algorithm = algorithm;
+        }
+    }
+
+    public enum Encoding {
+        UNKNOWN(ColumnSchema.Encoding.UNKNOWN),
+        AUTO(ColumnSchema.Encoding.AUTO_ENCODING),
+        PLAIN(ColumnSchema.Encoding.PLAIN_ENCODING),
+        PREFIX(ColumnSchema.Encoding.PREFIX_ENCODING),
+        GROUP_VARINT(ColumnSchema.Encoding.GROUP_VARINT),
+        RLE(ColumnSchema.Encoding.RLE),
+        DICT(ColumnSchema.Encoding.DICT_ENCODING),
+        BIT_SHUFFLE(ColumnSchema.Encoding.BIT_SHUFFLE);
+
+        final ColumnSchema.Encoding encode;
+
+        Encoding(ColumnSchema.Encoding encode) {
+            this.encode = encode;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/c760e3cf/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduConnector.java
----------------------------------------------------------------------
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduConnector.java b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduConnector.java
new file mode 100644
index 0000000..0e2e6bc
--- /dev/null
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduConnector.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kudu.connector;
+
+import com.stumbleupon.async.Callback;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.kudu.client.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+
+public class KuduConnector implements AutoCloseable {
+
+    private final Logger LOG = LoggerFactory.getLogger(this.getClass());
+
+    public enum Consistency {EVENTUAL, STRONG};
+    public enum WriteMode {INSERT,UPDATE,UPSERT}
+
+    private AsyncKuduClient client;
+    private KuduTable table;
+
+    public KuduConnector(String kuduMasters, KuduTableInfo tableInfo) throws IOException {
+        client = client(kuduMasters);
+        table = table(tableInfo);
+    }
+
+    private AsyncKuduClient client(String kuduMasters) {
+        return new AsyncKuduClient.AsyncKuduClientBuilder(kuduMasters).build();
+    }
+
+    private KuduTable table(KuduTableInfo infoTable) throws IOException {
+        KuduClient syncClient = client.syncClient();
+
+        String tableName = infoTable.getName();
+        if (syncClient.tableExists(tableName)) {
+            return syncClient.openTable(tableName);
+        }
+        if (infoTable.createIfNotExist()) {
+            return syncClient.createTable(tableName, infoTable.getSchema(), infoTable.getCreateTableOptions());
+        }
+        throw new UnsupportedOperationException("table not exists and is marketed to not be created");
+    }
+
+    public boolean deleteTable() throws IOException {
+        String tableName = table.getName();
+        client.syncClient().deleteTable(tableName);
+        return true;
+    }
+
+    public KuduScanner scanner(byte[] token) throws IOException {
+        return KuduScanToken.deserializeIntoScanner(token, client.syncClient());
+    }
+
+    public List<KuduScanToken> scanTokens(List<KuduFilterInfo> tableFilters, List<String> tableProjections, Long rowLimit) {
+        KuduScanToken.KuduScanTokenBuilder tokenBuilder = client.syncClient().newScanTokenBuilder(table);
+
+        if (CollectionUtils.isNotEmpty(tableProjections)) {
+            tokenBuilder.setProjectedColumnNames(tableProjections);
+        }
+
+        if (CollectionUtils.isNotEmpty(tableFilters)) {
+            tableFilters.stream()
+                    .map(filter -> filter.toPredicate(table.getSchema()))
+                    .forEach(tokenBuilder::addPredicate);
+        }
+
+        if (rowLimit !=null && rowLimit > 0) {
+            tokenBuilder.limit(rowLimit);
+            // FIXME: https://issues.apache.org/jira/browse/KUDU-16
+            // Server side limit() operator for java-based scanners are not implemented yet
+        }
+
+        return tokenBuilder.build();
+    }
+
+    public boolean writeRow(KuduRow row, Consistency consistency, WriteMode writeMode) throws Exception {
+        final Operation operation = KuduMapper.toOperation(table, writeMode, row);
+
+        if (Consistency.EVENTUAL.equals(consistency)) {
+            AsyncKuduSession session = client.newSession();
+            session.apply(operation);
+            session.flush();
+            return session.close().addCallback(new ResponseCallback()).join();
+        } else {
+            KuduSession session = client.syncClient().newSession();
+            session.apply(operation);
+            session.flush();
+            return processResponse(session.close());
+        }
+    }
+
+    @Override
+    public void close() throws Exception {
+        if (client == null) return;
+
+        client.close();
+    }
+
+    private Boolean processResponse(List<OperationResponse> operationResponses) {
+        Boolean isOk = operationResponses.isEmpty();
+        for(OperationResponse operationResponse : operationResponses) {
+            logResponseError(operationResponse.getRowError());
+        }
+        return isOk;
+    }
+
+    private void logResponseError(RowError error) {
+        LOG.error("Error {} on {}: {} ", error.getErrorStatus(), error.getOperation(), error.toString());
+    }
+
+    private class ResponseCallback implements Callback<Boolean, List<OperationResponse>> {
+        @Override
+        public Boolean call(List<OperationResponse> operationResponses) {
+            return processResponse(operationResponses);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/c760e3cf/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduFilterInfo.java
----------------------------------------------------------------------
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduFilterInfo.java b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduFilterInfo.java
new file mode 100644
index 0000000..bd20fc8
--- /dev/null
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduFilterInfo.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kudu.connector;
+
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.Schema;
+import org.apache.kudu.client.KuduPredicate;
+
+import java.util.List;
+
+
+public class KuduFilterInfo {
+
+    private String column;
+    private FilterType type;
+    private Object value;
+
+    private KuduFilterInfo() { }
+
+    public KuduPredicate toPredicate(Schema schema) {
+        return toPredicate(schema.getColumn(this.column));
+    }
+    public KuduPredicate toPredicate(ColumnSchema column) {
+        KuduPredicate predicate;
+        switch (this.type) {
+            case IS_IN:
+                predicate = KuduPredicate.newInListPredicate(column, (List) this.value);
+                break;
+            case IS_NULL:
+                predicate = KuduPredicate.newIsNullPredicate(column);
+                break;
+            case IS_NOT_NULL:
+                predicate = KuduPredicate.newIsNotNullPredicate(column);
+                break;
+            default:
+                predicate = predicateComparator(column);
+                break;
+        }
+        return predicate;
+    }
+
+    private KuduPredicate predicateComparator(ColumnSchema column) {
+
+        KuduPredicate.ComparisonOp comparison = this.type.comparator;
+
+        KuduPredicate predicate;
+
+        switch (column.getType()) {
+            case STRING:
+                predicate = KuduPredicate.newComparisonPredicate(column, comparison, (String)this.value);
+                break;
+            case FLOAT:
+                predicate = KuduPredicate.newComparisonPredicate(column, comparison, (Float)this.value);
+                break;
+            case INT8:
+                predicate = KuduPredicate.newComparisonPredicate(column, comparison, (Byte)this.value);
+                break;
+            case INT16:
+                predicate = KuduPredicate.newComparisonPredicate(column, comparison, (Short)this.value);
+                break;
+            case INT32:
+                predicate = KuduPredicate.newComparisonPredicate(column, comparison, (Integer)this.value);
+                break;
+            case INT64:
+                predicate = KuduPredicate.newComparisonPredicate(column, comparison, (Long)this.value);
+                break;
+            case DOUBLE:
+                predicate = KuduPredicate.newComparisonPredicate(column, comparison, (Double)this.value);
+                break;
+            case BOOL:
+                predicate = KuduPredicate.newComparisonPredicate(column, comparison, (Boolean)this.value);
+                break;
+            case UNIXTIME_MICROS:
+                Long time = (Long)this.value;
+                predicate = KuduPredicate.newComparisonPredicate(column, comparison, time*1000);
+                break;
+            case BINARY:
+                predicate = KuduPredicate.newComparisonPredicate(column, comparison, (byte[])this.value);
+                break;
+            default:
+                throw new IllegalArgumentException("Illegal var type: " + column.getType());
+        }
+        return predicate;
+    }
+
+    public static class Builder {
+        private KuduFilterInfo filter;
+
+        private Builder(String column) {
+            this.filter = new KuduFilterInfo();
+            this.filter.column = column;
+        }
+
+        public static Builder create(String column) {
+            return new Builder(column);
+        }
+
+        public Builder greaterThan(Object value) {
+            return filter(FilterType.GREATER, value);
+        }
+
+        public Builder lessThan(Object value) {
+            return filter(FilterType.LESS, value);
+        }
+
+        public Builder equalTo(Object value) {
+            return filter(FilterType.EQUAL, value);
+        }
+
+        public Builder greaterOrEqualTo(Object value) {
+            return filter(FilterType.GREATER_EQUAL, value);
+        }
+
+        public Builder lessOrEqualTo(Object value) {
+            return filter(FilterType.LESS_EQUAL, value);
+        }
+
+        public Builder isNotNull() {
+            return filter(FilterType.IS_NOT_NULL, null);
+        }
+
+        public Builder isNull() {
+            return filter(FilterType.IS_NULL, null);
+        }
+
+        public Builder isIn(List values) {
+            return filter(FilterType.IS_IN, values);
+        }
+
+        public Builder filter(FilterType type, Object value) {
+            this.filter.type = type;
+            this.filter.value = value;
+            return this;
+        }
+
+        public KuduFilterInfo build() {
+            return filter;
+        }
+    }
+
+    public enum FilterType {
+        GREATER(KuduPredicate.ComparisonOp.GREATER),
+        GREATER_EQUAL(KuduPredicate.ComparisonOp.GREATER_EQUAL),
+        EQUAL(KuduPredicate.ComparisonOp.EQUAL),
+        LESS(KuduPredicate.ComparisonOp.LESS),
+        LESS_EQUAL(KuduPredicate.ComparisonOp.LESS_EQUAL),
+        IS_NOT_NULL(null),
+        IS_NULL(null),
+        IS_IN(null);
+
+        final KuduPredicate.ComparisonOp comparator;
+
+        FilterType(KuduPredicate.ComparisonOp comparator) {
+            this.comparator = comparator;
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/c760e3cf/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduMapper.java
----------------------------------------------------------------------
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduMapper.java b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduMapper.java
new file mode 100644
index 0000000..b1366ba
--- /dev/null
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduMapper.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kudu.connector;
+
+
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.Schema;
+import org.apache.kudu.Type;
+import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.client.Operation;
+import org.apache.kudu.client.PartialRow;
+import org.apache.kudu.client.RowResult;
+
+import java.util.List;
+
+public final class KuduMapper {
+
+    private KuduMapper() { }
+
+    public static KuduRow toKuduRow(RowResult row) {
+        Schema schema = row.getColumnProjection();
+        List<ColumnSchema> columns = schema.getColumns();
+
+        KuduRow values = new KuduRow(columns.size());
+        for (int i = 0; i < columns.size(); i++) {
+            String name = schema.getColumnByIndex(i).getName();
+            if(row.isNull(i)) {
+                values.setField(i, name, null);
+            } else {
+                Type type = schema.getColumnByIndex(i).getType();
+                switch (type) {
+                    case BINARY:
+                        values.setField(i, name, row.getBinary(i));
+                        break;
+                    case STRING:
+                        values.setField(i, name, row.getString(i));
+                        break;
+                    case BOOL:
+                        values.setField(i, name, row.getBoolean(i));
+                        break;
+                    case DOUBLE:
+                        values.setField(i, name, row.getDouble(i));
+                        break;
+                    case FLOAT:
+                        values.setField(i, name, row.getFloat(i));
+                        break;
+                    case INT8:
+                        values.setField(i, name, row.getByte(i));
+                        break;
+                    case INT16:
+                        values.setField(i, name, row.getShort(i));
+                        break;
+                    case INT32:
+                        values.setField(i, name, row.getInt(i));
+                        break;
+                    case INT64:
+                    case UNIXTIME_MICROS:
+                        values.setField(i, name, row.getLong(i));
+                        break;
+                    default:
+                        throw new IllegalArgumentException("Illegal var type: " + type);
+                }
+            }
+        }
+        return values;
+    }
+
+
+    public static Operation toOperation(KuduTable table, KuduConnector.WriteMode writeMode, KuduRow row) {
+        final Operation operation = toOperation(table, writeMode);
+        final PartialRow partialRow = operation.getRow();
+
+        Schema schema = table.getSchema();
+        List<ColumnSchema> columns = schema.getColumns();
+
+        for (int i = 0; i < columns.size(); i++) {
+            String columnName = schema.getColumnByIndex(i).getName();
+            Object value = row.getField(i);
+            if (value == null) {
+                partialRow.setNull(columnName);
+            } else {
+                Type type = schema.getColumnByIndex(i).getType();
+                switch (type) {
+                    case STRING:
+                        partialRow.addString(columnName, (String) value);
+                        break;
+                    case FLOAT:
+                        partialRow.addFloat(columnName, (Float) value);
+                        break;
+                    case INT8:
+                        partialRow.addByte(columnName, (Byte) value);
+                        break;
+                    case INT16:
+                        partialRow.addShort(columnName, (Short) value);
+                        break;
+                    case INT32:
+                        partialRow.addInt(columnName, (Integer) value);
+                        break;
+                    case INT64:
+                        partialRow.addLong(columnName, (Long) value);
+                        break;
+                    case DOUBLE:
+                        partialRow.addDouble(columnName, (Double) value);
+                        break;
+                    case BOOL:
+                        partialRow.addBoolean(columnName, (Boolean) value);
+                        break;
+                    case UNIXTIME_MICROS:
+                        //*1000 to correctly create date on kudu
+                        partialRow.addLong(columnName, ((Long) value) * 1000);
+                        break;
+                    case BINARY:
+                        partialRow.addBinary(columnName, (byte[]) value);
+                        break;
+                    default:
+                        throw new IllegalArgumentException("Illegal var type: " + type);
+                }
+            }
+        }
+        return operation;
+    }
+
+    public static Operation toOperation(KuduTable table, KuduConnector.WriteMode writeMode) {
+        switch (writeMode) {
+            case INSERT: return table.newInsert();
+            case UPDATE: return table.newUpdate();
+            case UPSERT: return table.newUpsert();
+        }
+        return table.newUpsert();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/c760e3cf/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduRow.java
----------------------------------------------------------------------
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduRow.java b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduRow.java
new file mode 100644
index 0000000..03f5e5c
--- /dev/null
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduRow.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kudu.connector;
+
+import org.apache.flink.types.Row;
+import org.apache.kudu.Schema;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.util.*;
+import java.util.stream.Stream;
+
+public class KuduRow extends Row {
+
+    private Map<String, Integer> rowNames;
+
+    public KuduRow(Integer arity) {
+        super(arity);
+        rowNames = new LinkedHashMap<>();
+    }
+
+    public KuduRow(Object object, Schema schema) {
+        super(validFields(object));
+        for (Class<?> c = object.getClass(); c != null; c = c.getSuperclass()) {
+            basicValidation(c.getDeclaredFields())
+                    .filter(field -> schema.getColumn(field.getName()) != null)
+                    .forEach(cField -> {
+                        try {
+                            cField.setAccessible(true);
+                            setField(schema.getColumnIndex(cField.getName()), cField.getName(), cField.get(object));
+                        } catch (IllegalAccessException e) {
+                            String error = String.format("Cannot get value for %s", cField.getName());
+                            throw new IllegalArgumentException(error, e);
+                        }
+                    });
+        }
+    }
+
+
+    public Object getField(String name) {
+        return super.getField(rowNames.get(name));
+    }
+
+    public void setField(int pos, String name, Object value) {
+        super.setField(pos, value);
+        this.rowNames.put(name, pos);
+    }
+
+    public boolean isNull(int pos) {
+        return getField(pos) == null;
+    }
+
+    private static int validFields(Object object) {
+        Long validField = 0L;
+        for (Class<?> c = object.getClass(); c != null; c = c.getSuperclass()) {
+            validField += basicValidation(c.getDeclaredFields()).count();
+        }
+        return validField.intValue();
+    }
+
+    private static Stream<Field> basicValidation(Field[] fields) {
+        return Arrays.stream(fields)
+                .filter(cField -> !Modifier.isStatic(cField.getModifiers()))
+                .filter(cField -> !Modifier.isTransient(cField.getModifiers()));
+    }
+
+    public Map<String,Object> blindMap() {
+        Map<String,Object> toRet = new LinkedHashMap<>();
+        rowNames.entrySet().stream()
+                .sorted(Comparator.comparing(Map.Entry::getValue))
+                .forEach(entry -> toRet.put(entry.getKey(), super.getField(entry.getValue())));
+        return  toRet;
+    }
+
+    public <P> P blind(Class<P> clazz) {
+        P o = createInstance(clazz);
+
+        for (Class<?> c = clazz; c != null; c = c.getSuperclass()) {
+            Field[] fields = c.getDeclaredFields();
+            for (Field cField : fields) {
+                try {
+                    if(rowNames.containsKey(cField.getName())
+                            && !Modifier.isStatic(cField.getModifiers())
+                            && !Modifier.isTransient(cField.getModifiers())) {
+
+                        cField.setAccessible(true);
+                        Object value = getField(cField.getName());
+                        if (value != null) {
+                            if (cField.getType() == value.getClass()) {
+                                cField.set(o, value);
+                            } else if (cField.getType() == Long.class && value.getClass() == Date.class) {
+                                cField.set(o, ((Date) value).getTime());
+                            } else {
+                                cField.set(o, value);
+                            }
+                        }
+                    }
+                } catch (IllegalAccessException e) {
+                    String error = String.format("Cannot get value for %s", cField.getName());
+                    throw new IllegalArgumentException(error, e);
+                }
+            }
+        }
+
+        return o;
+
+    }
+
+
+    private <P> P createInstance(Class<P> clazz) {
+        try {
+            return clazz.getConstructor().newInstance();
+        } catch (ReflectiveOperationException e) {
+            String error = String.format("Cannot create instance for %s", clazz.getSimpleName());
+            throw new IllegalArgumentException(error, e);
+        }
+    }
+
+    @Override
+    public String toString() {
+        return blindMap().toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/c760e3cf/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduTableInfo.java
----------------------------------------------------------------------
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduTableInfo.java b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduTableInfo.java
new file mode 100644
index 0000000..dfea382
--- /dev/null
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduTableInfo.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kudu.connector;
+
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.Schema;
+import org.apache.kudu.client.CreateTableOptions;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+public class KuduTableInfo implements Serializable {
+
+    private static final Integer DEFAULT_REPLICAS = 1;
+    private static final boolean DEFAULT_CREATE_IF_NOT_EXIST = false;
+
+    private Integer replicas;
+    private String name;
+    private boolean createIfNotExist;
+    private List<KuduColumnInfo> columns;
+
+    private KuduTableInfo(String name){
+        this.name = name;
+        this.replicas = DEFAULT_REPLICAS;
+        this.createIfNotExist = DEFAULT_CREATE_IF_NOT_EXIST;
+        this.columns = new ArrayList<>();
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public Schema getSchema() {
+        if(hasNotColumns()) return null;
+        List<ColumnSchema> schemaColumns = new ArrayList<>();
+        for(KuduColumnInfo column : columns){
+            schemaColumns.add(column.columnSchema());
+        }
+        return new Schema(schemaColumns);
+    }
+
+    public boolean createIfNotExist() {
+        return createIfNotExist;
+    }
+
+    public CreateTableOptions getCreateTableOptions() {
+        CreateTableOptions options = new CreateTableOptions();
+        if(replicas!=null){
+            options.setNumReplicas(replicas);
+        }
+        if(hasColummns()) {
+            List<String> rangeKeys = new ArrayList<>();
+            List<String> hashKeys = new ArrayList<>();
+            for(KuduColumnInfo column : columns){
+                if(column.isRangeKey()){
+                    rangeKeys.add(column.name());
+                }
+                if(column.isHashKey()){
+                    hashKeys.add(column.name());
+                }
+            }
+            options.setRangePartitionColumns(rangeKeys);
+            options.addHashPartitions(hashKeys, replicas*2);
+        }
+
+        return options;
+    }
+
+    public boolean hasNotColumns(){
+        return !hasColummns();
+    }
+    public boolean hasColummns(){
+        return (columns!=null && columns.size()>0);
+    }
+
+    public static class Builder {
+        KuduTableInfo table;
+
+        private Builder(String name) {
+            table = new KuduTableInfo(name);
+        }
+
+        public static Builder create(String name) {
+            return new Builder(name);
+        }
+
+        public static Builder open(String name) {
+            return new Builder(name);
+        }
+
+        public Builder createIfNotExist(boolean createIfNotExist) {
+            this.table.createIfNotExist = createIfNotExist;
+            return this;
+        }
+
+        public Builder replicas(int replicas) {
+            if (replicas == 0) return this;
+            this.table.replicas = replicas;
+            return this;
+        }
+
+        public Builder columns(List<KuduColumnInfo> columns) {
+            if(columns==null) return this;
+            this.table.columns.addAll(columns);
+            return this;
+        }
+
+        public Builder addColumn(KuduColumnInfo column) {
+            if(column==null) return this;
+            this.table.columns.add(column);
+            return this;
+        }
+
+        public KuduTableInfo build() {
+            return table;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/c760e3cf/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduInputFormatTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduInputFormatTest.java
new file mode 100644
index 0000000..8cfc102
--- /dev/null
+++ b/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduInputFormatTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kudu;
+
+import org.apache.flink.streaming.connectors.kudu.connector.KuduDatabase;
+import org.apache.flink.streaming.connectors.kudu.connector.KuduRow;
+import org.apache.flink.streaming.connectors.kudu.connector.KuduTableInfo;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class KuduInputFormatTest extends KuduDatabase {
+
+
+
+
+    @Test
+    public void testInvalidKuduMaster() throws IOException {
+        KuduTableInfo tableInfo = booksTableInfo("books",false);
+        Assertions.assertThrows(NullPointerException.class, () -> new KuduInputFormat(null, tableInfo));
+    }
+
+    @Test
+    public void testInvalidTableInfo() throws IOException {
+        Assertions.assertThrows(NullPointerException.class, () -> new KuduInputFormat(hostsCluster, null));
+    }
+
+    @Test
+    public void testInputFormat() throws Exception {
+        KuduTableInfo tableInfo = booksTableInfo("books",true);
+        setUpDatabase(tableInfo);
+
+        List<KuduRow> rows = readRows(tableInfo);
+        Assertions.assertEquals(5, rows.size());
+
+        cleanDatabase(tableInfo);
+    }
+
+    @Test
+    public void testInputFormatWithProjection() throws Exception {
+        KuduTableInfo tableInfo = booksTableInfo("books",true);
+        setUpDatabase(tableInfo);
+
+        List<KuduRow> rows = readRows(tableInfo,"title","id");
+        Assertions.assertEquals(5, rows.size());
+
+        for (KuduRow row: rows) {
+            Assertions.assertEquals(2, row.getArity());
+        }
+
+        cleanDatabase(tableInfo);
+    }
+
+
+    public static List<KuduRow> readRows(KuduTableInfo tableInfo, String... fieldProjection) throws Exception {
+        KuduInputFormat inputFormat = new KuduInputFormat(hostsCluster, tableInfo)
+                .withTableProjections(fieldProjection);
+
+        KuduInputFormat.KuduInputSplit[] splits = inputFormat.createInputSplits(1);
+        List<KuduRow> rows = new ArrayList<>();
+        for (KuduInputFormat.KuduInputSplit split : splits) {
+            inputFormat.open(split);
+            while(!inputFormat.reachedEnd()) {
+                KuduRow row = inputFormat.nextRecord(new KuduRow(5));
+                if(row != null) {
+                    rows.add(row);
+                }
+            }
+        }
+        inputFormat.close();
+
+        return rows;
+    }
+}

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/c760e3cf/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduOuputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduOuputFormatTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduOuputFormatTest.java
new file mode 100644
index 0000000..6eb5ebe
--- /dev/null
+++ b/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduOuputFormatTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kudu;
+
+import org.apache.flink.streaming.connectors.kudu.connector.KuduDatabase;
+import org.apache.flink.streaming.connectors.kudu.connector.KuduRow;
+import org.apache.flink.streaming.connectors.kudu.connector.KuduTableInfo;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.UUID;
+
+public class KuduOuputFormatTest extends KuduDatabase {
+
+
+
+    @Test
+    public void testInvalidKuduMaster() throws IOException {
+        KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(),false);
+        Assertions.assertThrows(NullPointerException.class, () -> new KuduOutputFormat<>(null, tableInfo));
+    }
+
+    @Test
+    public void testInvalidTableInfo() throws IOException {
+        Assertions.assertThrows(NullPointerException.class, () -> new KuduOutputFormat<>(hostsCluster, null));
+    }
+
+    @Test
+    public void testNotTableExist() throws IOException {
+        KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(),false);
+        KuduOutputFormat outputFormat = new KuduOutputFormat<>(hostsCluster, tableInfo);
+        Assertions.assertThrows(UnsupportedOperationException.class, () -> outputFormat.open(0,1));
+    }
+
+    @Test
+    public void testOutputWithStrongConsistency() throws Exception {
+
+        KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(),true);
+        KuduOutputFormat outputFormat = new KuduOutputFormat<>(hostsCluster, tableInfo)
+                .withStrongConsistency();
+        outputFormat.open(0,1);
+
+        for (KuduRow kuduRow : booksDataRow()) {
+            outputFormat.writeRecord(kuduRow);
+        }
+        outputFormat.close();
+
+        List<KuduRow> rows = KuduInputFormatTest.readRows(tableInfo);
+        Assertions.assertEquals(5, rows.size());
+
+        cleanDatabase(tableInfo);
+    }
+
+    @Test
+    public void testOutputWithEventualConsistency() throws Exception {
+
+        KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(),true);
+        KuduOutputFormat outputFormat = new KuduOutputFormat<>(hostsCluster, tableInfo)
+                .withEventualConsistency();
+        outputFormat.open(0,1);
+
+        for (KuduRow kuduRow : booksDataRow()) {
+            outputFormat.writeRecord(kuduRow);
+        }
+
+        // sleep to allow eventual consistency to finish
+        Thread.sleep(1000);
+
+        outputFormat.close();
+
+        List<KuduRow> rows = KuduInputFormatTest.readRows(tableInfo);
+        Assertions.assertEquals(5, rows.size());
+
+        cleanDatabase(tableInfo);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/c760e3cf/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduSinkTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduSinkTest.java
new file mode 100644
index 0000000..9e9ae93
--- /dev/null
+++ b/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduSinkTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kudu;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.connectors.kudu.connector.KuduDatabase;
+import org.apache.flink.streaming.connectors.kudu.connector.KuduRow;
+import org.apache.flink.streaming.connectors.kudu.connector.KuduTableInfo;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.UUID;
+
+public class KuduSinkTest extends KuduDatabase {
+
+
+    @Test
+    public void testInvalidKuduMaster() throws IOException {
+        KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(),false);
+        Assertions.assertThrows(NullPointerException.class, () -> new KuduOutputFormat<>(null, tableInfo));
+    }
+
+    @Test
+    public void testInvalidTableInfo() throws IOException {
+        Assertions.assertThrows(NullPointerException.class, () -> new KuduOutputFormat<>(hostsCluster, null));
+    }
+
+    @Test
+    public void testNotTableExist() throws IOException {
+        KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(),false);
+        KuduSink sink = new KuduSink<>(hostsCluster, tableInfo);
+        Assertions.assertThrows(UnsupportedOperationException.class, () -> sink.open(new Configuration()));
+    }
+
+    @Test
+    public void testOutputWithStrongConsistency() throws Exception {
+
+        KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(),true);
+        KuduSink sink = new KuduSink<>(hostsCluster, tableInfo)
+                .withStrongConsistency();
+        sink.open(new Configuration());
+
+        for (KuduRow kuduRow : booksDataRow()) {
+            sink.invoke(kuduRow);
+        }
+        sink.close();
+
+        List<KuduRow> rows = KuduInputFormatTest.readRows(tableInfo);
+        Assertions.assertEquals(5, rows.size());
+
+    }
+
+    @Test
+    public void testOutputWithEventualConsistency() throws Exception {
+        KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(),true);
+        KuduSink sink = new KuduSink<>(hostsCluster, tableInfo)
+                .withEventualConsistency();
+        sink.open(new Configuration());
+
+        for (KuduRow kuduRow : booksDataRow()) {
+            sink.invoke(kuduRow);
+        }
+
+        // sleep to allow eventual consistency to finish
+        Thread.sleep(1000);
+
+        sink.close();
+
+        List<KuduRow> rows = KuduInputFormatTest.readRows(tableInfo);
+        Assertions.assertEquals(5, rows.size());
+    }
+
+}