You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2017/08/04 08:59:34 UTC

[3/5] camel git commit: CAMEL-11571: Added Google Cloud BigQuery component

CAMEL-11571: Added Google Cloud BigQuery component

Added a new component for Goolge Cloud BigQuery with Producer only support


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/7759b260
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/7759b260
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/7759b260

Branch: refs/heads/master
Commit: 7759b26059fbe6b437bc30be5db9f63aec8dc846
Parents: 78a7bdc
Author: Sarel Lugtenburg <sa...@scentregroup.com>
Authored: Wed Aug 2 20:03:17 2017 +1000
Committer: Andrea Cosentino <an...@gmail.com>
Committed: Fri Aug 4 10:26:26 2017 +0200

----------------------------------------------------------------------
 components/camel-google-bigquery/.gitignore     |   7 +
 components/camel-google-bigquery/ReadMe.md      |  43 ++++
 components/camel-google-bigquery/pom.xml        | 155 +++++++++++++
 .../main/docs/google-bigquery-component.adoc    | 165 ++++++++++++++
 .../bigquery/GoogleBigQueryComponent.java       | 101 +++++++++
 .../bigquery/GoogleBigQueryConfiguration.java   | 124 +++++++++++
 .../GoogleBigQueryConnectionFactory.java        | 216 ++++++++++++++++++
 .../bigquery/GoogleBigQueryConstants.java       |  24 ++
 .../google/bigquery/GoogleBigQueryEndpoint.java | 108 +++++++++
 .../google/bigquery/GoogleBigQueryProducer.java | 219 +++++++++++++++++++
 .../src/main/resources/META-INF/LICENSE.txt     | 203 +++++++++++++++++
 .../src/main/resources/META-INF/NOTICE.txt      |  11 +
 .../org/apache/camel/component/google-bigquery  |  18 ++
 .../google/bigquery/integration/AsyncTest.java  |  98 +++++++++
 .../integration/BigQueryTestSupport.java        | 156 +++++++++++++
 .../integration/DynamicTableIdTest.java         | 101 +++++++++
 .../bigquery/integration/InsertIdTest.java      | 148 +++++++++++++
 .../bigquery/integration/SingleRowTest.java     |  84 +++++++
 .../google/bigquery/unit/BaseBigQueryTest.java  |  70 ++++++
 .../unit/GoogleBigQueryProducerTest.java        | 120 ++++++++++
 .../src/test/resources/log4j.properties         |  37 ++++
 .../src/test/resources/logging.properties       |  21 ++
 .../src/test/resources/schema/simple-table.json |   6 +
 .../src/test/resources/simple.properties        |  24 ++
 .../src/main/resources/application.properties   |  40 ++++
 parent/pom.xml                                  |   1 +
 .../camel-google-bigquery-starter/pom.xml       |  61 ++++++
 .../src/main/resources/META-INF/LICENSE.txt     | 203 +++++++++++++++++
 .../src/main/resources/META-INF/NOTICE.txt      |  11 +
 .../src/main/resources/META-INF/spring.provides |  17 ++
 .../spring-boot/components-starter/pom.xml      |   1 +
 31 files changed, 2593 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/7759b260/components/camel-google-bigquery/.gitignore
----------------------------------------------------------------------
diff --git a/components/camel-google-bigquery/.gitignore b/components/camel-google-bigquery/.gitignore
new file mode 100644
index 0000000..d10190d
--- /dev/null
+++ b/components/camel-google-bigquery/.gitignore
@@ -0,0 +1,7 @@
+.idea
+*.iml
+.settings
+.project
+.classpath
+target
+deploy*.cmd

http://git-wip-us.apache.org/repos/asf/camel/blob/7759b260/components/camel-google-bigquery/ReadMe.md
----------------------------------------------------------------------
diff --git a/components/camel-google-bigquery/ReadMe.md b/components/camel-google-bigquery/ReadMe.md
new file mode 100644
index 0000000..d29580b
--- /dev/null
+++ b/components/camel-google-bigquery/ReadMe.md
@@ -0,0 +1,43 @@
+## Camel Google BigQuery Component testing
+
+The unit tests provided are somewhat limited.
+
+Due to the nature of the component, it needs to be tested against a google BigQuery instance as no
+emulator is available.
+
+* Unit : <br>
+  Standalone tests that can be conducted on their own
+* Integration : <br>
+  Tests against a Google BigQuery instance
+
+### Execution of integration tests
+
+A Google Cloud account with a configured BigQuery instance is required with a dataset created.
+
+Google BigQuery component authentication is targeted for use with the GCP Service Accounts.
+For more information please refer to https://cloud.google.com/docs/authentication[Google Cloud Platform Auth Guide]
+
+Google security credentials for the tests can be set in the `src/test/resources/simple.properties` file by setting
+either one of the following in order of preference:
+
+* Service Account Email and Service Account Key (PEM format) (`service.account` and `service.key`)
+* GCP credentials file location (`service.credentialsFileLocation`)
+
+Or implicitly, where the connection factory falls back on
+https://developers.google.com/identity/protocols/application-default-credentials#howtheywork[Application Default Credentials].
+
+*OBS!* The location of the default credentials file is configurable - via GOOGLE_APPLICATION_CREDENTIALS environment variable.
+
+Service Account Email and Service Account Key can be found in the GCP JSON credentials file as client_email and private_key respectively.
+
+For the tests the `project.id` and `bigquery.datasetId` needs to be configured. By default
+the current google user will be used to connect but credentials can be provided either by
+account/key (via `service.account` and `service.key`) or a credentials file (`service.credentialsFileLocation`)
+
+Running tests against BigQuery instance:
+
+```
+mvn -Pgoogle-bigquery-test verify
+```
+
+

http://git-wip-us.apache.org/repos/asf/camel/blob/7759b260/components/camel-google-bigquery/pom.xml
----------------------------------------------------------------------
diff --git a/components/camel-google-bigquery/pom.xml b/components/camel-google-bigquery/pom.xml
new file mode 100644
index 0000000..2581787
--- /dev/null
+++ b/components/camel-google-bigquery/pom.xml
@@ -0,0 +1,155 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+         http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.camel</groupId>
+        <artifactId>components</artifactId>
+        <version>2.20.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>camel-google-bigquery</artifactId>
+    <packaging>jar</packaging>
+    <name>Camel :: GoogleBigQuery</name>
+    <description>Camel Component for Google Cloud Platform BigQuery</description>
+
+    <properties>
+        <schemeName>google-bigquery</schemeName>
+        <componentName>GoogleBigQuery</componentName>
+        <componentPackage>org.apache.camel.component.google.bigquery</componentPackage>
+        <camel.osgi.export.pkg>org.apache.camel.component.google.bigquery</camel.osgi.export.pkg>
+        <camel.osgi.export.service>
+            org.apache.camel.spi.ComponentResolver;component=google-bigquery
+        </camel.osgi.export.service>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.google.apis</groupId>
+            <artifactId>google-api-services-bigquery</artifactId>
+            <version>${google-api-services-bigquery-version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-test</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-databind</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-log4j12</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-core</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+
+            <plugin>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <configuration>
+                    <includes>
+                        <include>**/unit/*Test.java</include>
+                    </includes>
+                </configuration>
+            </plugin>
+
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-resources-plugin</artifactId>
+                <version>2.6</version>
+                <configuration>
+                    <encoding>UTF-8</encoding>
+                </configuration>
+            </plugin>
+        </plugins>
+
+        <pluginManagement>
+            <plugins>
+                <plugin>
+                    <groupId>org.apache.camel</groupId>
+                    <artifactId>camel-api-component-maven-plugin</artifactId>
+                    <version>${project.version}</version>
+                    <configuration>
+                        <scheme>${schemeName}</scheme>
+                        <componentName>${componentName}</componentName>
+                        <componentPackage>${componentPackage}</componentPackage>
+                        <outPackage>${outPackage}</outPackage>
+                    </configuration>
+                </plugin>
+            </plugins>
+        </pluginManagement>
+
+    </build>
+
+    <reporting>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.camel</groupId>
+                <artifactId>camel-api-component-maven-plugin</artifactId>
+                <version>${project.version}</version>
+                <configuration>
+                    <scheme>${schemeName}</scheme>
+                    <componentName>${componentName}</componentName>
+                    <componentPackage>${componentPackage}</componentPackage>
+                    <outPackage>${outPackage}</outPackage>
+                </configuration>
+            </plugin>
+        </plugins>
+    </reporting>
+
+    <profiles>
+        <profile>
+            <id>google-bigquery-test</id>
+            <build>
+                <plugins>
+                    <plugin>
+                        <artifactId>maven-surefire-plugin</artifactId>
+                        <configuration>
+                            <childDelegation>false</childDelegation>
+                            <useFile>true</useFile>
+                            <forkCount>1</forkCount>
+                            <reuseForks>true</reuseForks>
+                            <forkedProcessTimeoutInSeconds>300</forkedProcessTimeoutInSeconds>
+                            <includes>
+                                <include>**/*Test.java</include>
+                            </includes>
+                        </configuration>
+                    </plugin>
+                </plugins>
+            </build>
+        </profile>
+    </profiles>
+</project>

http://git-wip-us.apache.org/repos/asf/camel/blob/7759b260/components/camel-google-bigquery/src/main/docs/google-bigquery-component.adoc
----------------------------------------------------------------------
diff --git a/components/camel-google-bigquery/src/main/docs/google-bigquery-component.adoc b/components/camel-google-bigquery/src/main/docs/google-bigquery-component.adoc
new file mode 100644
index 0000000..ce65295
--- /dev/null
+++ b/components/camel-google-bigquery/src/main/docs/google-bigquery-component.adoc
@@ -0,0 +1,165 @@
+## Google BigQuery Component
+
+*Available as of Camel version 2.20*
+
+### Component Description
+
+The Google Bigquery component provides access
+to https://cloud.google.com/bigquery/[Cloud BigQuery Infrastructure] via
+the https://developers.google.com/api-client-library/java/apis/bigquery/v2[Google Client Services API].
+
+The current implementation does not use gRPC.
+
+The current implementation does not support querying BigQuery i.e. is a producer only.
+
+Maven users will need to add the following dependency to their pom.xml
+for this component:
+
+[source,xml]
+------------------------------------------------------
+<dependency>
+    <groupId>org.apache.camel</groupId>
+    <artifactId>camel-google-bigquery</artifactId>
+    <version>x.x.x</version>
+    <!-- use the same version as your Camel core version -->
+</dependency>
+
+------------------------------------------------------
+
+[[GoogleBigQuery-AuthenticationConfiguration]]
+
+### Authentication Configuration
+
+Google BigQuery component authentication is targeted for use with the GCP Service Accounts.
+For more information please refer to https://cloud.google.com/docs/authentication[Google Cloud Platform Auth Guide]
+
+Google security credentials can be set explicitly via one of the two options:
+
+* Service Account Email and Service Account Key (PEM format)
+* GCP credentials file location
+
+If both are set, the Service Account Email/Key will take precedence.
+
+Or implicitly, where the connection factory falls back on
+https://developers.google.com/identity/protocols/application-default-credentials#howtheywork[Application Default Credentials].
+
+*OBS!* The location of the default credentials file is configurable - via GOOGLE_APPLICATION_CREDENTIALS environment variable.
+
+Service Account Email and Service Account Key can be found in the GCP JSON credentials file as client_email and private_key respectively.
+
+### URI Format
+
+[source,java]
+--------------------------------------------------------
+        google-bigquery://project-id:datasetId[:tableId]?[options]
+--------------------------------------------------------
+
+
+### Options
+
+// component options: START
+The Google BigQuery component supports 2 options which are listed below.
+
+
+
+[width="100%",cols="2,5,^1,2",options="header"]
+|=======================================================================
+| Name | Description | Default | Type
+| **connectionFactory** (common) | Sets the connection factory to use: provides the ability to explicitly manage connection credentials: - the path to the key file - the Service Account Key / Email pair |  | GoogleBigQueryConnection Factory
+| **resolveProperty Placeholders** (advanced) | Whether the component should resolve property placeholders on itself when starting. Only properties which are of String type can use property placeholders. | true | boolean
+|=======================================================================
+// component options: END
+
+// endpoint options: START
+The Google BigQuery endpoint is configured using URI syntax:
+
+    google-bigquery:projectId:datasetId[:tableId]
+
+with the following path and query parameters:
+
+#### Path Parameters (3 parameters):
+
+[width="100%",cols="2,5,^1,2",options="header"]
+|=======================================================================
+| Name | Description | Default | Type
+| **projectId** | *Required* Project Id |  | String
+| **datasetId** | *Required* Dataset Id |  | String
+| **tableId** | Table Id. If not supplied a table name need to be set in the exchange header |  | String
+|=======================================================================
+
+#### Query Parameters (9 parameters):
+
+[width="100%",cols="2,5,^1,2",options="header"]
+|=======================================================================
+| Name | Description | Default | Type
+| **concurrentProducers** (producer) | The maximum number of simultaneous requests when running in async mode |  | Integer
+| **connectionFactory** (common) | ConnectionFactory to obtain connection to BigQuery Service. If non provided the default will be used. |  | GoogleBigQueryConnection Factory
+| **loggerId** (common) | Logger ID to use when a match to the parent route required |  | String
+| **useAsInsertId** (producer) | Field in message to use as insert id. |  | String
+| **bridgeErrorHandler** (consumer) | Allows for bridging the consumer to the Camel routing Error Handler which mean any exceptions occurred while the consumer is trying to pickup incoming messages or the likes will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions that will be logged at WARN or ERROR level and ignored. | false | boolean
+| **exceptionHandler** (consumer) | To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this options is not in use. By default the consumer will deal with exceptions that will be logged at WARN or ERROR level and ignored. |  | ExceptionHandler
+| **exchangePattern** (consumer) | Sets the exchange pattern when the consumer creates an exchange. |  | ExchangePattern
+| **synchronous** (advanced) | Sets whether synchronous processing should be strictly used or Camel is allowed to use asynchronous processing (if supported). | false | boolean
+|=======================================================================
+// endpoint options: END
+
+### Message Headers
+
+[width="100%",cols="10%,10%,80%",options="header",]
+|=======================================================================
+|Name |Type |Description
+|`CamelGoogleBigQuery.TableSuffix` |`String` |Table suffix to use when inserting data
+|`CamelGoogleBigQuery.InsertId` |`String` |InsertId to use when inserting data
+|`CamelGoogleBigQuery.PartitionDecorator` |`String` |Partition decorator to indicate partition to use when inserting data
+|`CamelGoogleBigQuery.TableId` |`String` |Table id where data will be submitted. If specified will override endpoint configuration
+|=======================================================================
+
+
+### Producer Endpoints
+
+Producer endpoints can accept and deliver to BigQuery individual and grouped
+exchanges alike. Grouped exchanges have `Exchange.GROUPED_EXCHANGE` property set.
+
+Goole BigQuery producer will send a grouped exchange in a single api call unless different table suffix or
+partition decorators are specified in which case it will break it down to ensure data is written with the
+correct suffix or partition decorator.
+
+Google BigQuery endpoint expects the payload to be either a map or list of maps. A payload containing a map
+will insert a single row and a payload containing a list of map's will insert a row for each entry in the list.
+
+### Template tables
+
+Reference: https://cloud.google.com/bigquery/streaming-data-into-bigquery#template-tables
+
+Templated tables can be specified using the `GoogleBigQueryConstants.TABLE_SUFFIX` header.
+
+I.e. the following route will create tables and insert records sharded on a per day basis:
+
+[source,java]
+------------------------------------------------------
+from("direct:start")
+.header(GoogleBigQueryConstants.TABLE_SUFFIX, "_${date:now:yyyyMMdd}")
+.to("google-bigquery:sampleDataset:sampleTable")
+
+------------------------------------------------------
+Note it is recommended to use partitioning for this use case.
+
+### Partitioning
+
+Reference: https://cloud.google.com/bigquery/docs/creating-partitioned-tables
+
+Partitioning is specified when creating a table and if set data will be automatically partitioned into
+separate tables. When inserting data a specific partition can be specified by setting the
+`GoogleBigQueryConstants.PARTITION_DECORATOR` header on the exchange.
+
+### Ensuring data consistency
+
+Reference: https://cloud.google.com/bigquery/streaming-data-into-bigquery#dataconsistency
+
+A insert id can be set on the exchange with the header `GoogleBigQueryConstants.INSERT_ID` or by specifying
+query parameter `useAsInsertId`. As an insert id need to be specified per row inserted the exchange header can't
+be used when the payload is a list - if the payload is a list the `GoogleBigQueryConstants.INSERT_ID` will
+be ignored. In that case use the query parameter `useAsInsertId`.
+
+
+

http://git-wip-us.apache.org/repos/asf/camel/blob/7759b260/components/camel-google-bigquery/src/main/java/org/apache/camel/component/google/bigquery/GoogleBigQueryComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-google-bigquery/src/main/java/org/apache/camel/component/google/bigquery/GoogleBigQueryComponent.java b/components/camel-google-bigquery/src/main/java/org/apache/camel/component/google/bigquery/GoogleBigQueryComponent.java
new file mode 100644
index 0000000..ab03f2a
--- /dev/null
+++ b/components/camel-google-bigquery/src/main/java/org/apache/camel/component/google/bigquery/GoogleBigQueryComponent.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.google.bigquery;
+
+import java.util.Map;
+
+import com.google.api.client.http.HttpTransport;
+import com.google.api.client.http.javanet.NetHttpTransport;
+import com.google.api.client.json.JsonFactory;
+import com.google.api.client.json.jackson2.JacksonFactory;
+import com.google.api.services.bigquery.Bigquery;
+import org.apache.camel.Endpoint;
+import org.apache.camel.impl.DefaultComponent;
+
+public class GoogleBigQueryComponent extends DefaultComponent {
+    private Bigquery bigQuery;
+    private String projectId;
+    private String datasetId;
+
+    private GoogleBigQueryConnectionFactory connectionFactory;
+
+    private final HttpTransport transport = new NetHttpTransport();
+    private final JsonFactory jsonFactory = new JacksonFactory();
+
+    private Bigquery getConnection() throws Exception {
+        return getConnectionFactory().getDefaultClient();
+    }
+
+
+    // Endpoint represents a single table
+    @Override
+    protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
+        String[] parts = remaining.split(":");
+
+        if (parts.length < 2) {
+            throw new IllegalArgumentException("Google BigQuery Endpoint format \"projectId:datasetId:tableName\"");
+        }
+
+        GoogleBigQueryConfiguration configuration = new GoogleBigQueryConfiguration();
+        setProperties(configuration, parameters);
+        configuration.parseRemaining(remaining);
+
+        if (configuration.getConnectionFactory() == null) {
+            configuration.setConnectionFactory(getConnectionFactory());
+        }
+
+        if (bigQuery == null) {
+            bigQuery = getConnection();
+        }
+
+        return new GoogleBigQueryEndpoint(uri, bigQuery, configuration);
+    }
+
+    public void setProjectId(String projectId) {
+        this.projectId = projectId;
+    }
+
+    public void setDatasetId(String datasetId) {
+        this.datasetId = datasetId;
+    }
+
+    public String getProjectId() {
+        return projectId;
+    }
+
+    public String getDatasetId() {
+        return datasetId;
+    }
+
+    /**
+     * Sets the connection factory to use:
+     * provides the ability to explicitly manage connection credentials:
+     * - the path to the key file
+     * - the Service Account Key / Email pair
+     */
+    public GoogleBigQueryConnectionFactory getConnectionFactory() {
+        if (connectionFactory == null) {
+            connectionFactory = new GoogleBigQueryConnectionFactory();
+        }
+        return connectionFactory;
+    }
+
+    public void setConnectionFactory(GoogleBigQueryConnectionFactory connectionFactory) {
+        this.connectionFactory = connectionFactory;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/7759b260/components/camel-google-bigquery/src/main/java/org/apache/camel/component/google/bigquery/GoogleBigQueryConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-google-bigquery/src/main/java/org/apache/camel/component/google/bigquery/GoogleBigQueryConfiguration.java b/components/camel-google-bigquery/src/main/java/org/apache/camel/component/google/bigquery/GoogleBigQueryConfiguration.java
new file mode 100644
index 0000000..9565f93
--- /dev/null
+++ b/components/camel-google-bigquery/src/main/java/org/apache/camel/component/google/bigquery/GoogleBigQueryConfiguration.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.google.bigquery;
+
+import org.apache.camel.spi.Metadata;
+import org.apache.camel.spi.UriParam;
+import org.apache.camel.spi.UriParams;
+import org.apache.camel.spi.UriPath;
+
+@UriParams
+public class GoogleBigQueryConfiguration {
+    @UriParam(name = "concurrentProducers", description = "Maximum number of simultaneous consumers when using async processing")
+    private int concurrentProducers;
+
+    @UriParam(name = "connectionFactory", description = "ConnectionFactory to obtain connection to Bigquery Service. If non provided the default one will be used")
+    private GoogleBigQueryConnectionFactory connectionFactory;
+
+    @UriParam(name = "loggerId", description = "Logger ID to use when a match to the parent route required")
+    private String loggerId;
+
+    @UriParam(name = "useAsInsertId", description = "Field name to use as insert id")
+    private String useAsInsertId;
+
+    @UriPath(label = "common", description = "Google Cloud Project Id") @Metadata(required = "true")
+    private String projectId;
+    @UriPath(label = "common", description = "BigQuery Dataset Id") @Metadata(required = "true")
+    private String datasetId;
+    @UriPath(label = "common", description = "BigQuery table id") @Metadata(required = "false")
+    private String tableId;
+
+    public void parseRemaining(String remaining) {
+        String[] parts = remaining.split(":");
+
+        if (parts.length < 2) {
+            throw new IllegalArgumentException("Google BigQuery Endpoint format \"projectId:datasetId[:tableName]\"");
+        }
+
+        int c = 0;
+        projectId = parts[c++];
+        datasetId = parts[c++];
+        if (parts.length > 2) {
+            tableId = parts[c++];
+        }
+    }
+
+    public String getLoggerId() {
+        return loggerId;
+    }
+
+    public GoogleBigQueryConfiguration setLoggerId(String loggerId) {
+        this.loggerId = loggerId;
+        return this;
+    }
+    /**
+     * ConnectionFactory to obtain connection to Bigquery Service. If non provided the default will be used.
+     */
+    public GoogleBigQueryConnectionFactory getConnectionFactory() {
+        return connectionFactory;
+    }
+
+    public void setConnectionFactory(GoogleBigQueryConnectionFactory connectionFactory) {
+        this.connectionFactory = connectionFactory;
+    }
+
+    public int getConcurrentProducers() {
+        return concurrentProducers;
+    }
+
+    public GoogleBigQueryConfiguration setConcurrentProducers(int concurrentProducers) {
+        this.concurrentProducers = concurrentProducers;
+        return this;
+    }
+
+    public String getUseAsInsertId() {
+        return useAsInsertId;
+    }
+
+    public GoogleBigQueryConfiguration setUseAsInsertId(String useAsInsertId) {
+        this.useAsInsertId = useAsInsertId;
+        return this;
+    }
+
+    public String getProjectId() {
+        return projectId;
+    }
+
+    public GoogleBigQueryConfiguration setProjectId(String projectId) {
+        this.projectId = projectId;
+        return this;
+    }
+
+    public String getDatasetId() {
+        return datasetId;
+    }
+
+    public GoogleBigQueryConfiguration setDatasetId(String datasetId) {
+        this.datasetId = datasetId;
+        return this;
+    }
+
+    public String getTableId() {
+        return tableId;
+    }
+
+    public GoogleBigQueryConfiguration setTableId(String tableId) {
+        this.tableId = tableId;
+        return this;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/7759b260/components/camel-google-bigquery/src/main/java/org/apache/camel/component/google/bigquery/GoogleBigQueryConnectionFactory.java
----------------------------------------------------------------------
diff --git a/components/camel-google-bigquery/src/main/java/org/apache/camel/component/google/bigquery/GoogleBigQueryConnectionFactory.java b/components/camel-google-bigquery/src/main/java/org/apache/camel/component/google/bigquery/GoogleBigQueryConnectionFactory.java
new file mode 100644
index 0000000..a46859b
--- /dev/null
+++ b/components/camel-google-bigquery/src/main/java/org/apache/camel/component/google/bigquery/GoogleBigQueryConnectionFactory.java
@@ -0,0 +1,216 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.google.bigquery;
+
+import java.io.FileInputStream;
+import java.security.KeyFactory;
+import java.security.PrivateKey;
+import java.security.spec.PKCS8EncodedKeySpec;
+import java.util.Collection;
+import java.util.Collections;
+
+import com.google.api.client.googleapis.auth.oauth2.GoogleCredential;
+import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport;
+import com.google.api.client.http.HttpTransport;
+import com.google.api.client.http.apache.ApacheHttpTransport;
+import com.google.api.client.json.JsonFactory;
+import com.google.api.client.json.jackson2.JacksonFactory;
+import com.google.api.client.util.Base64;
+import com.google.api.client.util.Strings;
+import com.google.api.services.bigquery.Bigquery;
+import com.google.api.services.bigquery.BigqueryScopes;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class GoogleBigQueryConnectionFactory {
+
+    private static final JsonFactory JSON_FACTORY = new JacksonFactory();
+
+    private final Logger logger = LoggerFactory.getLogger(GoogleBigQueryConnectionFactory.class);
+
+    private String serviceAccount;
+    private String serviceAccountKey;
+    private String credentialsFileLocation;
+    private String serviceURL;
+
+    private Bigquery client;
+
+    public GoogleBigQueryConnectionFactory() {
+    }
+
+    public synchronized Bigquery getDefaultClient() throws Exception {
+        if (this.client == null) {
+            this.client = buildClient();
+        }
+        return this.client;
+    }
+
+    public Bigquery getMultiThreadClient(int parallelThreads) throws Exception {
+
+        PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager();
+        cm.setDefaultMaxPerRoute(parallelThreads);
+        cm.setMaxTotal(parallelThreads);
+        CloseableHttpClient httpClient = HttpClients.createMinimal(cm);
+
+        return buildClient(new ApacheHttpTransport(httpClient));
+    }
+
+    private Bigquery buildClient() throws Exception {
+        return buildClient(GoogleNetHttpTransport.newTrustedTransport());
+    }
+
+    private Bigquery buildClient(HttpTransport httpTransport) throws Exception {
+
+        GoogleCredential credential = null;
+
+        if (!Strings.isNullOrEmpty(serviceAccount) && !Strings.isNullOrEmpty(serviceAccountKey)) {
+            if (logger.isDebugEnabled()) {
+                logger.debug("Service Account and Key have been set explicitly. Initialising BigQuery using Service Account " + serviceAccount);
+            }
+            credential = createFromAccountKeyPair(httpTransport);
+        }
+
+        if (credential == null && !Strings.isNullOrEmpty(credentialsFileLocation)) {
+            if (logger.isDebugEnabled()) {
+                logger.debug("Key File Name has been set explicitly. Initialising BigQuery using Key File " + credentialsFileLocation);
+            }
+            credential = createFromFile();
+        }
+
+        if (credential == null) {
+            if (logger.isDebugEnabled()) {
+                logger.debug("No explicit Service Account or Key File Name have been provided. Initialising BigQuery using defaults ");
+            }
+            credential = createDefault();
+        }
+
+        Bigquery.Builder builder = new Bigquery.Builder(httpTransport, JSON_FACTORY, credential)
+                .setApplicationName("camel-google-bigquery");
+
+        // Local emulator, SOCKS proxy, etc.
+        if (serviceURL != null) {
+            builder.setRootUrl(serviceURL);
+        }
+
+        return builder.build();
+    }
+
+    private GoogleCredential createFromFile() throws Exception {
+
+        GoogleCredential credential = GoogleCredential.fromStream(new FileInputStream(credentialsFileLocation));
+
+        if (credential.createScopedRequired()) {
+            credential = credential.createScoped(BigqueryScopes.all());
+        }
+
+        return credential;
+    }
+
+    private GoogleCredential createDefault() throws Exception {
+        GoogleCredential credential = GoogleCredential.getApplicationDefault();
+
+        Collection<String> scopes = Collections.singletonList(BigqueryScopes.BIGQUERY);
+
+        if (credential.createScopedRequired()) {
+            credential = credential.createScoped(scopes);
+        }
+
+        return credential;
+    }
+
+    private GoogleCredential createFromAccountKeyPair(HttpTransport httpTransport) {
+        try {
+            return new GoogleCredential.Builder()
+                    .setTransport(httpTransport)
+                    .setJsonFactory(JSON_FACTORY)
+                    .setServiceAccountId(serviceAccount)
+                    .setServiceAccountScopes(BigqueryScopes.all())
+                    .setServiceAccountPrivateKey(getPrivateKeyFromString(serviceAccountKey))
+                    .build();
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private PrivateKey getPrivateKeyFromString(String serviceKeyPem) {
+        PrivateKey privateKey;
+        try {
+            String privKeyPEM = serviceKeyPem.replace("-----BEGIN PRIVATE KEY-----", "")
+                                             .replace("-----END PRIVATE KEY-----", "")
+                                             .replace("\r", "")
+                                             .replace("\n", "");
+
+            byte[] encoded = Base64.decodeBase64(privKeyPEM);
+
+            PKCS8EncodedKeySpec keySpec = new PKCS8EncodedKeySpec(encoded);
+            privateKey = KeyFactory.getInstance("RSA")
+                                   .generatePrivate(keySpec);
+        } catch (Exception e) {
+            String error = "Constructing Private Key from PEM string failed: " + e.getMessage();
+            logger.error(error, e);
+            throw new RuntimeException(e);
+        }
+        return privateKey;
+    }
+
+    public String getServiceAccount() {
+        return serviceAccount;
+    }
+
+    public GoogleBigQueryConnectionFactory setServiceAccount(String serviceAccount) {
+        this.serviceAccount = serviceAccount;
+        resetClient();
+        return this;
+    }
+
+    public String getServiceAccountKey() {
+        return serviceAccountKey;
+    }
+
+    public GoogleBigQueryConnectionFactory setServiceAccountKey(String serviceAccountKey) {
+        this.serviceAccountKey = serviceAccountKey;
+        resetClient();
+        return this;
+    }
+
+    public String getCredentialsFileLocation() {
+        return credentialsFileLocation;
+    }
+
+    public GoogleBigQueryConnectionFactory setCredentialsFileLocation(String credentialsFileLocation) {
+        this.credentialsFileLocation = credentialsFileLocation;
+        resetClient();
+        return this;
+    }
+
+    public String getServiceURL() {
+        return serviceURL;
+    }
+
+    public GoogleBigQueryConnectionFactory setServiceURL(String serviceURL) {
+        this.serviceURL = serviceURL;
+        resetClient();
+        return this;
+    }
+
+    private synchronized void resetClient() {
+        this.client = null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/7759b260/components/camel-google-bigquery/src/main/java/org/apache/camel/component/google/bigquery/GoogleBigQueryConstants.java
----------------------------------------------------------------------
diff --git a/components/camel-google-bigquery/src/main/java/org/apache/camel/component/google/bigquery/GoogleBigQueryConstants.java b/components/camel-google-bigquery/src/main/java/org/apache/camel/component/google/bigquery/GoogleBigQueryConstants.java
new file mode 100644
index 0000000..d9849ce
--- /dev/null
+++ b/components/camel-google-bigquery/src/main/java/org/apache/camel/component/google/bigquery/GoogleBigQueryConstants.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.google.bigquery;
+
+public interface GoogleBigQueryConstants {
+    String TABLE_SUFFIX = "CamelGoogleBigQuery.TableSuffix";
+    String TABLE_ID = "CamelGoogleBigQuery.TableId";
+    String INSERT_ID = "CamelGoogleBigQuery.InsertId";
+    String PARTITION_DECORATOR = "CamelGoogleBigQuery.PartitionDecorator";
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/7759b260/components/camel-google-bigquery/src/main/java/org/apache/camel/component/google/bigquery/GoogleBigQueryEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-google-bigquery/src/main/java/org/apache/camel/component/google/bigquery/GoogleBigQueryEndpoint.java b/components/camel-google-bigquery/src/main/java/org/apache/camel/component/google/bigquery/GoogleBigQueryEndpoint.java
new file mode 100644
index 0000000..07f67db
--- /dev/null
+++ b/components/camel-google-bigquery/src/main/java/org/apache/camel/component/google/bigquery/GoogleBigQueryEndpoint.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.google.bigquery;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+
+import com.google.api.services.bigquery.Bigquery;
+import org.apache.camel.Consumer;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.impl.DefaultEndpoint;
+import org.apache.camel.spi.UriEndpoint;
+import org.apache.camel.spi.UriParam;
+
+/**
+ * BigQuery Endpoint Definition
+ * Represents a table within a BigQuery dataset
+ * Contains configuration details for a single table and the utility methods (such as check, create) to ease operations
+ * URI Parameters:
+ * * Logger ID - To ensure that logging is unified under Route Logger, the logger ID can be passed on
+ *               via an endpoint URI parameter
+ * * Partitioned - to indicate that the table needs to be partitioned - every UTC day to be written into a
+ *                 timestamped separate table
+ *                 side effect: Australian operational day is always split between two UTC days, and, therefore, tables
+ *
+ * Another consideration is that exceptions are not handled within the class. They are expected to bubble up and be handled
+ * by Camel.
+ */
+@UriEndpoint(scheme = "bigquery", title = "BigQuery", syntax = "bigquery:projectId:datasetId[:tableName]", label = "messaging")
+public class GoogleBigQueryEndpoint extends DefaultEndpoint {
+    @UriParam
+    protected final GoogleBigQueryConfiguration configuration;
+
+    private final Bigquery bigquery;
+    private final String uri;
+
+    private final Map<String, Boolean> verifiedTables = new ConcurrentHashMap<>();
+    private ExecutorService executorService;
+
+    protected GoogleBigQueryEndpoint(String uri, Bigquery bigquery, GoogleBigQueryConfiguration configuration) {
+        this.bigquery = bigquery;
+        this.uri = uri;
+        this.configuration = configuration;
+    }
+
+    public Producer createProducer() throws Exception {
+        GoogleBigQueryProducer producer = new GoogleBigQueryProducer(this, configuration);
+        if (configuration.getConcurrentProducers() > 0) {
+            executorService = getCamelContext()
+                    .getExecutorServiceManager()
+                    .newFixedThreadPool(
+                            this,
+                            "camel-google-bigquery",
+                            configuration.getConcurrentProducers()
+                    );
+        } else {
+            executorService = getCamelContext()
+                    .getExecutorServiceManager()
+                    .newDefaultThreadPool(this, "camel-google-bigquery");
+        }
+        return producer;
+    }
+
+
+    public Consumer createConsumer(Processor processor) throws Exception {
+        throw new UnsupportedOperationException("Cannot consume from the BigQuery endpoint: " + getEndpointUri());
+    }
+
+    public boolean isSingleton() {
+        return false;
+    }
+
+    public Bigquery getBigquery() {
+        return bigquery;
+    }
+
+    public ExecutorService getExecutorService() {
+        return executorService;
+    }
+
+    @Override
+    protected String createEndpointUri() {
+        return uri;
+    }
+
+    @Override
+    public GoogleBigQueryComponent getComponent() {
+        return (GoogleBigQueryComponent)super.getComponent();
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/7759b260/components/camel-google-bigquery/src/main/java/org/apache/camel/component/google/bigquery/GoogleBigQueryProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-google-bigquery/src/main/java/org/apache/camel/component/google/bigquery/GoogleBigQueryProducer.java b/components/camel-google-bigquery/src/main/java/org/apache/camel/component/google/bigquery/GoogleBigQueryProducer.java
new file mode 100644
index 0000000..9c72ec9
--- /dev/null
+++ b/components/camel-google-bigquery/src/main/java/org/apache/camel/component/google/bigquery/GoogleBigQueryProducer.java
@@ -0,0 +1,219 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.google.bigquery;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import com.google.api.client.util.Strings;
+import com.google.api.services.bigquery.Bigquery;
+import com.google.api.services.bigquery.model.TableDataInsertAllRequest;
+import com.google.api.services.bigquery.model.TableDataInsertAllResponse;
+import com.google.api.services.bigquery.model.TableRow;
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+import org.apache.camel.impl.DefaultAsyncProducer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Generic BigQuery Producer
+ */
+public class GoogleBigQueryProducer extends DefaultAsyncProducer {
+
+    private final Logger logger;
+    private final GoogleBigQueryConfiguration configuration;
+
+    public GoogleBigQueryProducer(GoogleBigQueryEndpoint endpoint, GoogleBigQueryConfiguration configuration) {
+        super(endpoint);
+
+        this.configuration = configuration;
+
+        String loggerId = configuration.getLoggerId();
+        if (loggerId == null || loggerId.trim().isEmpty()) {
+            loggerId = this.getClass().getName();
+        }
+
+        logger = LoggerFactory.getLogger(loggerId);
+    }
+
+    /**
+     * The method converts a single incoming message into a List
+     *
+     * @param exchange
+     * @return
+     */
+    private static List<Exchange> prepareExchangeList(Exchange exchange) {
+
+        List<Exchange> entryList;
+
+        if (null == exchange.getProperty(Exchange.GROUPED_EXCHANGE)) {
+            entryList = new ArrayList<>();
+            entryList.add(exchange);
+        } else {
+            entryList = (List<Exchange>) exchange.getProperty(Exchange.GROUPED_EXCHANGE);
+        }
+
+        return entryList;
+    }
+
+    /**
+     * Process the exchange
+     *
+     * The incoming exchange can be a grouped exchange in which case all the exchanges will be combined.
+     *
+     * The incoming can be
+     * <ul>
+     *     <li>A map where all map keys will map to field records. One map object maps to one bigquery row</li>
+     *     <li>A list of maps. Each entry in the list will map to one bigquery row</li>
+     * </ul>
+     * The incoming message is expected to be a List of Maps
+     * The assumptions:
+     * - All incoming records go into the same table
+     * - Incoming records sorted by the timestamp
+     */
+    @Override
+    public void process(Exchange exchange) throws Exception {
+        List<Exchange> exchanges = prepareExchangeList(exchange);
+
+        List<Exchange> processGroup = new ArrayList<>();
+
+        String partitionDecorator = "";
+        String suffix = "";
+        String tableId = configuration.getTableId() == null ? "" : configuration.getTableId();
+        int totalProcessed = 0;
+
+        for (Exchange ex: exchanges) {
+            String tmpPartitionDecorator = ex.getIn().getHeader(GoogleBigQueryConstants.PARTITION_DECORATOR, "", String.class);
+            String tmpSuffix = ex.getIn().getHeader(GoogleBigQueryConstants.TABLE_SUFFIX, "", String.class);
+            String tmpTableId = ex.getIn().getHeader(GoogleBigQueryConstants.TABLE_ID, tableId, String.class);
+
+            if (tmpTableId.isEmpty()) {
+                throw new IllegalArgumentException("tableId need to be specified in one of endpoint configuration or exchange header");
+            }
+
+            // Ensure all rows of same request goes to same table and suffix
+            if (!tmpPartitionDecorator.equals(partitionDecorator) || !tmpSuffix.equals(suffix) || !tmpTableId.equals(tableId)) {
+                if (!processGroup.isEmpty()) {
+                    totalProcessed += process(tableId, partitionDecorator, suffix, processGroup, exchange.getExchangeId());
+                }
+                processGroup.clear();
+                partitionDecorator = tmpPartitionDecorator;
+                suffix = tmpSuffix;
+                tableId = tmpTableId.isEmpty() ? tableId : tmpTableId;
+            }
+            processGroup.add(ex);
+        }
+        if (!processGroup.isEmpty()) {
+            totalProcessed += process(tableId, partitionDecorator, suffix, processGroup, exchange.getExchangeId());
+        }
+
+        if (totalProcessed == 0) {
+            logger.debug("The incoming message is either null or empty for exchange {}", exchange.getExchangeId());
+        }
+    }
+
+    private int process(String tableId, String partitionDecorator, String suffix, List<Exchange> exchanges, String exchangeId) throws Exception {
+        String tableIdWithPartition = Strings.isNullOrEmpty(partitionDecorator)
+                ? tableId
+                : (tableId + "?" + partitionDecorator);
+
+        List<TableDataInsertAllRequest.Rows> apiRequestRows = new ArrayList<>();
+        for (Exchange ex: exchanges) {
+            Object entryObject = ex.getIn().getBody();
+            if (entryObject instanceof List) {
+                for (Map<String, Object> entry: (List<Map<String, Object>>) entryObject) {
+                    apiRequestRows.add(createRowRequest(null, entry));
+                }
+            } else if (entryObject instanceof Map) {
+                apiRequestRows.add(createRowRequest(ex, (Map<String, Object>) entryObject));
+            } else {
+                ex.setException(new IllegalArgumentException("Cannot handle body type " + entryObject.getClass()));
+            }
+        }
+
+        if (apiRequestRows.isEmpty()) {
+            return 0;
+        }
+
+        GoogleBigQueryEndpoint endpoint = getEndpoint();
+
+        TableDataInsertAllRequest apiRequestData = new TableDataInsertAllRequest().setRows(apiRequestRows);
+
+        Bigquery.Tabledata.InsertAll apiRequest = endpoint.getBigquery()
+                .tabledata()
+                .insertAll(configuration.getProjectId(),
+                        configuration.getDatasetId(),
+                        tableIdWithPartition,
+                        apiRequestData);
+        if (suffix != null) {
+            apiRequest.set("template_suffix", suffix);
+        }
+
+        logger.trace("uploader thread/id: {} / {} . calling google api", Thread.currentThread(), exchangeId);
+
+        TableDataInsertAllResponse apiResponse = apiRequest.execute();
+
+        if (apiResponse.getInsertErrors() != null && !apiResponse.getInsertErrors().isEmpty()) {
+            throw new Exception("InsertAll into " + tableId + " failed: " + apiResponse.getInsertErrors());
+        }
+
+        logger.debug("uploader thread/id: {} / {} . api call completed.", Thread.currentThread().getId(), exchangeId);
+        return apiRequestData.size();
+    }
+
+    private TableDataInsertAllRequest.Rows createRowRequest(Exchange exchange, Map<String, Object> object) {
+        TableRow tableRow = new TableRow();
+        tableRow.putAll(object);
+        String insertId = null;
+        if (configuration.getUseAsInsertId() != null) {
+            insertId = (String)(object.get(configuration.getUseAsInsertId()));
+        } else {
+            if (exchange != null) {
+                insertId = exchange.getIn().getHeader(GoogleBigQueryConstants.INSERT_ID, String.class);
+            }
+        }
+        TableDataInsertAllRequest.Rows rows = new TableDataInsertAllRequest.Rows();
+        rows.setInsertId(insertId);
+        rows.setJson(tableRow);
+        return rows;
+    }
+
+    @Override
+    public boolean isSingleton() {
+        return true;
+    }
+
+    @Override
+    public GoogleBigQueryEndpoint getEndpoint() {
+        return (GoogleBigQueryEndpoint) super.getEndpoint();
+    }
+
+    @Override
+    public boolean process(Exchange exchange, AsyncCallback callback) {
+        getEndpoint().getExecutorService().submit(() -> {
+            try {
+                process(exchange);
+            } catch (Exception e) {
+                exchange.setException(e);
+            }
+            callback.done(false);
+        });
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/7759b260/components/camel-google-bigquery/src/main/resources/META-INF/LICENSE.txt
----------------------------------------------------------------------
diff --git a/components/camel-google-bigquery/src/main/resources/META-INF/LICENSE.txt b/components/camel-google-bigquery/src/main/resources/META-INF/LICENSE.txt
new file mode 100644
index 0000000..6b0b127
--- /dev/null
+++ b/components/camel-google-bigquery/src/main/resources/META-INF/LICENSE.txt
@@ -0,0 +1,203 @@
+
+                                 Apache License
+                           Version 2.0, January 2004
+                        http://www.apache.org/licenses/
+
+   TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+   1. Definitions.
+
+      "License" shall mean the terms and conditions for use, reproduction,
+      and distribution as defined by Sections 1 through 9 of this document.
+
+      "Licensor" shall mean the copyright owner or entity authorized by
+      the copyright owner that is granting the License.
+
+      "Legal Entity" shall mean the union of the acting entity and all
+      other entities that control, are controlled by, or are under common
+      control with that entity. For the purposes of this definition,
+      "control" means (i) the power, direct or indirect, to cause the
+      direction or management of such entity, whether by contract or
+      otherwise, or (ii) ownership of fifty percent (50%) or more of the
+      outstanding shares, or (iii) beneficial ownership of such entity.
+
+      "You" (or "Your") shall mean an individual or Legal Entity
+      exercising permissions granted by this License.
+
+      "Source" form shall mean the preferred form for making modifications,
+      including but not limited to software source code, documentation
+      source, and configuration files.
+
+      "Object" form shall mean any form resulting from mechanical
+      transformation or translation of a Source form, including but
+      not limited to compiled object code, generated documentation,
+      and conversions to other media types.
+
+      "Work" shall mean the work of authorship, whether in Source or
+      Object form, made available under the License, as indicated by a
+      copyright notice that is included in or attached to the work
+      (an example is provided in the Appendix below).
+
+      "Derivative Works" shall mean any work, whether in Source or Object
+      form, that is based on (or derived from) the Work and for which the
+      editorial revisions, annotations, elaborations, or other modifications
+      represent, as a whole, an original work of authorship. For the purposes
+      of this License, Derivative Works shall not include works that remain
+      separable from, or merely link (or bind by name) to the interfaces of,
+      the Work and Derivative Works thereof.
+
+      "Contribution" shall mean any work of authorship, including
+      the original version of the Work and any modifications or additions
+      to that Work or Derivative Works thereof, that is intentionally
+      submitted to Licensor for inclusion in the Work by the copyright owner
+      or by an individual or Legal Entity authorized to submit on behalf of
+      the copyright owner. For the purposes of this definition, "submitted"
+      means any form of electronic, verbal, or written communication sent
+      to the Licensor or its representatives, including but not limited to
+      communication on electronic mailing lists, source code control systems,
+      and issue tracking systems that are managed by, or on behalf of, the
+      Licensor for the purpose of discussing and improving the Work, but
+      excluding communication that is conspicuously marked or otherwise
+      designated in writing by the copyright owner as "Not a Contribution."
+
+      "Contributor" shall mean Licensor and any individual or Legal Entity
+      on behalf of whom a Contribution has been received by Licensor and
+      subsequently incorporated within the Work.
+
+   2. Grant of Copyright License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      copyright license to reproduce, prepare Derivative Works of,
+      publicly display, publicly perform, sublicense, and distribute the
+      Work and such Derivative Works in Source or Object form.
+
+   3. Grant of Patent License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      (except as stated in this section) patent license to make, have made,
+      use, offer to sell, sell, import, and otherwise transfer the Work,
+      where such license applies only to those patent claims licensable
+      by such Contributor that are necessarily infringed by their
+      Contribution(s) alone or by combination of their Contribution(s)
+      with the Work to which such Contribution(s) was submitted. If You
+      institute patent litigation against any entity (including a
+      cross-claim or counterclaim in a lawsuit) alleging that the Work
+      or a Contribution incorporated within the Work constitutes direct
+      or contributory patent infringement, then any patent licenses
+      granted to You under this License for that Work shall terminate
+      as of the date such litigation is filed.
+
+   4. Redistribution. You may reproduce and distribute copies of the
+      Work or Derivative Works thereof in any medium, with or without
+      modifications, and in Source or Object form, provided that You
+      meet the following conditions:
+
+      (a) You must give any other recipients of the Work or
+          Derivative Works a copy of this License; and
+
+      (b) You must cause any modified files to carry prominent notices
+          stating that You changed the files; and
+
+      (c) You must retain, in the Source form of any Derivative Works
+          that You distribute, all copyright, patent, trademark, and
+          attribution notices from the Source form of the Work,
+          excluding those notices that do not pertain to any part of
+          the Derivative Works; and
+
+      (d) If the Work includes a "NOTICE" text file as part of its
+          distribution, then any Derivative Works that You distribute must
+          include a readable copy of the attribution notices contained
+          within such NOTICE file, excluding those notices that do not
+          pertain to any part of the Derivative Works, in at least one
+          of the following places: within a NOTICE text file distributed
+          as part of the Derivative Works; within the Source form or
+          documentation, if provided along with the Derivative Works; or,
+          within a display generated by the Derivative Works, if and
+          wherever such third-party notices normally appear. The contents
+          of the NOTICE file are for informational purposes only and
+          do not modify the License. You may add Your own attribution
+          notices within Derivative Works that You distribute, alongside
+          or as an addendum to the NOTICE text from the Work, provided
+          that such additional attribution notices cannot be construed
+          as modifying the License.
+
+      You may add Your own copyright statement to Your modifications and
+      may provide additional or different license terms and conditions
+      for use, reproduction, or distribution of Your modifications, or
+      for any such Derivative Works as a whole, provided Your use,
+      reproduction, and distribution of the Work otherwise complies with
+      the conditions stated in this License.
+
+   5. Submission of Contributions. Unless You explicitly state otherwise,
+      any Contribution intentionally submitted for inclusion in the Work
+      by You to the Licensor shall be under the terms and conditions of
+      this License, without any additional terms or conditions.
+      Notwithstanding the above, nothing herein shall supersede or modify
+      the terms of any separate license agreement you may have executed
+      with Licensor regarding such Contributions.
+
+   6. Trademarks. This License does not grant permission to use the trade
+      names, trademarks, service marks, or product names of the Licensor,
+      except as required for reasonable and customary use in describing the
+      origin of the Work and reproducing the content of the NOTICE file.
+
+   7. Disclaimer of Warranty. Unless required by applicable law or
+      agreed to in writing, Licensor provides the Work (and each
+      Contributor provides its Contributions) on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+      implied, including, without limitation, any warranties or conditions
+      of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+      PARTICULAR PURPOSE. You are solely responsible for determining the
+      appropriateness of using or redistributing the Work and assume any
+      risks associated with Your exercise of permissions under this License.
+
+   8. Limitation of Liability. In no event and under no legal theory,
+      whether in tort (including negligence), contract, or otherwise,
+      unless required by applicable law (such as deliberate and grossly
+      negligent acts) or agreed to in writing, shall any Contributor be
+      liable to You for damages, including any direct, indirect, special,
+      incidental, or consequential damages of any character arising as a
+      result of this License or out of the use or inability to use the
+      Work (including but not limited to damages for loss of goodwill,
+      work stoppage, computer failure or malfunction, or any and all
+      other commercial damages or losses), even if such Contributor
+      has been advised of the possibility of such damages.
+
+   9. Accepting Warranty or Additional Liability. While redistributing
+      the Work or Derivative Works thereof, You may choose to offer,
+      and charge a fee for, acceptance of support, warranty, indemnity,
+      or other liability obligations and/or rights consistent with this
+      License. However, in accepting such obligations, You may act only
+      on Your own behalf and on Your sole responsibility, not on behalf
+      of any other Contributor, and only if You agree to indemnify,
+      defend, and hold each Contributor harmless for any liability
+      incurred by, or claims asserted against, such Contributor by reason
+      of your accepting any such warranty or additional liability.
+
+   END OF TERMS AND CONDITIONS
+
+   APPENDIX: How to apply the Apache License to your work.
+
+      To apply the Apache License to your work, attach the following
+      boilerplate notice, with the fields enclosed by brackets "[]"
+      replaced with your own identifying information. (Don't include
+      the brackets!)  The text should be enclosed in the appropriate
+      comment syntax for the file format. We also recommend that a
+      file or class name and description of purpose be included on the
+      same "printed page" as the copyright notice for easier
+      identification within third-party archives.
+
+   Copyright [yyyy] [name of copyright owner]
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+

http://git-wip-us.apache.org/repos/asf/camel/blob/7759b260/components/camel-google-bigquery/src/main/resources/META-INF/NOTICE.txt
----------------------------------------------------------------------
diff --git a/components/camel-google-bigquery/src/main/resources/META-INF/NOTICE.txt b/components/camel-google-bigquery/src/main/resources/META-INF/NOTICE.txt
new file mode 100644
index 0000000..2e215bf
--- /dev/null
+++ b/components/camel-google-bigquery/src/main/resources/META-INF/NOTICE.txt
@@ -0,0 +1,11 @@
+   =========================================================================
+   ==  NOTICE file corresponding to the section 4 d of                    ==
+   ==  the Apache License, Version 2.0,                                   ==
+   ==  in this case for the Apache Camel distribution.                    ==
+   =========================================================================
+
+   This product includes software developed by
+   The Apache Software Foundation (http://www.apache.org/).
+
+   Please read the different LICENSE files present in the licenses directory of
+   this distribution.

http://git-wip-us.apache.org/repos/asf/camel/blob/7759b260/components/camel-google-bigquery/src/main/resources/META-INF/services/org/apache/camel/component/google-bigquery
----------------------------------------------------------------------
diff --git a/components/camel-google-bigquery/src/main/resources/META-INF/services/org/apache/camel/component/google-bigquery b/components/camel-google-bigquery/src/main/resources/META-INF/services/org/apache/camel/component/google-bigquery
new file mode 100644
index 0000000..07053a6
--- /dev/null
+++ b/components/camel-google-bigquery/src/main/resources/META-INF/services/org/apache/camel/component/google-bigquery
@@ -0,0 +1,18 @@
+## ------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ------------------------------------------------------------------------
+
+class=org.apache.camel.component.google.bigquery.GoogleBigQueryComponent

http://git-wip-us.apache.org/repos/asf/camel/blob/7759b260/components/camel-google-bigquery/src/test/java/org/apache/camel/component/google/bigquery/integration/AsyncTest.java
----------------------------------------------------------------------
diff --git a/components/camel-google-bigquery/src/test/java/org/apache/camel/component/google/bigquery/integration/AsyncTest.java b/components/camel-google-bigquery/src/test/java/org/apache/camel/component/google/bigquery/integration/AsyncTest.java
new file mode 100644
index 0000000..828c3fd
--- /dev/null
+++ b/components/camel-google-bigquery/src/test/java/org/apache/camel/component/google/bigquery/integration/AsyncTest.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.google.bigquery.integration;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.LoggingLevel;
+import org.apache.camel.Produce;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.DefaultExchange;
+import org.junit.Before;
+import org.junit.Test;
+
+public class AsyncTest extends BigQueryTestSupport {
+    private static final String TABLE_ID = "asynctest";
+
+    @EndpointInject(uri = "direct:in")
+    private Endpoint directIn;
+
+    @EndpointInject(uri = "google-bigquery:{{project.id}}:{{bigquery.datasetId}}:" + TABLE_ID)
+    private Endpoint bigqueryEndpoint;
+
+    @EndpointInject(uri = "mock:sendResult")
+    private MockEndpoint sendResult;
+
+    @Produce(uri = "direct:in")
+    private ProducerTemplate producer;
+
+    @Before
+    public void init() throws Exception {
+        createBqTable(TABLE_ID);
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            public void configure() {
+                from(directIn)
+                        .to("seda:seda");
+                from("seda:seda")
+                        .routeId("Async")
+                        //.threads(10)
+
+                        .inOnly(bigqueryEndpoint)
+                        .log(LoggingLevel.INFO, "To sendresult")
+                        .to(sendResult);
+            }
+        };
+    }
+
+    @Test
+    public void sendAsync() throws Exception {
+        List<Map> objects = new ArrayList<>();
+        for (int i = 0; i < 5; i++) {
+            Exchange exchange = new DefaultExchange(context);
+            String uuidCol1 = UUID.randomUUID().toString();
+            String uuidCol2 = UUID.randomUUID().toString();
+
+            Map<String, String> object = new HashMap<>();
+            object.put("col1", uuidCol1);
+            object.put("col2", uuidCol2);
+            objects.add(object);
+            exchange.getIn().setBody(object);
+            producer.send(exchange);
+        }
+        sendResult.expectedMessageCount(5);
+
+        sendResult.assertIsSatisfied(4000);
+
+        for (Map object: objects) {
+            assertRowExist(TABLE_ID, object);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/7759b260/components/camel-google-bigquery/src/test/java/org/apache/camel/component/google/bigquery/integration/BigQueryTestSupport.java
----------------------------------------------------------------------
diff --git a/components/camel-google-bigquery/src/test/java/org/apache/camel/component/google/bigquery/integration/BigQueryTestSupport.java b/components/camel-google-bigquery/src/test/java/org/apache/camel/component/google/bigquery/integration/BigQueryTestSupport.java
new file mode 100644
index 0000000..6dda547
--- /dev/null
+++ b/components/camel-google-bigquery/src/test/java/org/apache/camel/component/google/bigquery/integration/BigQueryTestSupport.java
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.google.bigquery.integration;
+
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.api.client.googleapis.json.GoogleJsonResponseException;
+import com.google.api.services.bigquery.model.QueryRequest;
+import com.google.api.services.bigquery.model.QueryResponse;
+import com.google.api.services.bigquery.model.Table;
+import com.google.api.services.bigquery.model.TableFieldSchema;
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableSchema;
+import org.apache.camel.CamelContext;
+import org.apache.camel.component.google.bigquery.GoogleBigQueryComponent;
+import org.apache.camel.component.google.bigquery.GoogleBigQueryConnectionFactory;
+import org.apache.camel.component.properties.PropertiesComponent;
+import org.apache.camel.impl.JndiRegistry;
+import org.apache.camel.test.junit4.CamelTestSupport;
+
+public class BigQueryTestSupport extends CamelTestSupport {
+    public static final String SERVICE_KEY;
+    public static final String SERVICE_ACCOUNT;
+    public static final String PROJECT_ID;
+    public static final String DATASET_ID;
+    public static final String SERVICE_URL;
+    public static final String CREDENTIALS_FILE_LOCATION;
+
+    private GoogleBigQueryConnectionFactory connectionFactory;
+
+    static {
+        Properties testProperties = loadProperties();
+        SERVICE_KEY = testProperties.getProperty("service.key");
+        SERVICE_ACCOUNT = testProperties.getProperty("service.account");
+        PROJECT_ID = testProperties.getProperty("project.id");
+        DATASET_ID = testProperties.getProperty("bigquery.datasetId");
+        SERVICE_URL = testProperties.getProperty("test.serviceURL");
+        CREDENTIALS_FILE_LOCATION = testProperties.getProperty("service.credentialsFileLocation");
+    }
+
+    private static Properties loadProperties() {
+        Properties testProperties = new Properties();
+        InputStream fileIn = BigQueryTestSupport.class.getClassLoader().getResourceAsStream("simple.properties");
+        try {
+            testProperties.load(fileIn);
+
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+
+        return testProperties;
+    }
+
+    protected void addBigqueryComponent(CamelContext context) {
+
+        connectionFactory = new GoogleBigQueryConnectionFactory()
+                .setServiceAccount(SERVICE_ACCOUNT)
+                .setServiceAccountKey(SERVICE_KEY)
+                .setServiceURL(SERVICE_URL);
+
+        GoogleBigQueryComponent component = new GoogleBigQueryComponent();
+        component.setConnectionFactory(connectionFactory);
+
+        context.addComponent("google-bigquery", component);
+        context.addComponent("properties", new PropertiesComponent("ref:prop"));
+    }
+
+    @Override
+    protected CamelContext createCamelContext() throws Exception {
+        CamelContext context = super.createCamelContext();
+        addBigqueryComponent(context);
+        return context;
+    }
+
+    @Override
+    protected JndiRegistry createRegistry() throws Exception {
+        JndiRegistry jndi = super.createRegistry();
+        jndi.bind("prop", loadProperties());
+        return jndi;
+    }
+
+    public GoogleBigQueryConnectionFactory getConnectionFactory() {
+        return connectionFactory;
+    }
+
+    protected void assertRowExist(String tableName, Map<String, String> row) throws Exception {
+        QueryRequest queryRequest = new QueryRequest();
+        String query = "SELECT * FROM " + DATASET_ID + "." + tableName + " WHERE "
+                + row.entrySet().stream()
+                .map(e -> e.getKey() + " = '" + e.getValue() + "'")
+                .collect(Collectors.joining(" AND "));
+        log.debug("Query: {}", query);
+        queryRequest.setQuery(query);
+        QueryResponse queryResponse = getConnectionFactory()
+                .getDefaultClient()
+
+                .jobs()
+                .query(PROJECT_ID, queryRequest)
+                .execute();
+        assertEquals(1, queryResponse.getRows().size());
+    }
+
+    protected void createBqTable(String tableId) throws Exception {
+        TableReference reference = new TableReference()
+                .setTableId(tableId)
+                .setDatasetId(DATASET_ID)
+                .setProjectId(PROJECT_ID);
+        InputStream in = this.getClass().getResourceAsStream("/schema/simple-table.json");
+        TableSchema schema = readDefinition(in);
+        Table table = new Table()
+                .setTableReference(reference)
+                .setSchema(schema);
+        try {
+            getConnectionFactory().getDefaultClient().tables()
+                    .insert(PROJECT_ID, DATASET_ID, table)
+                    .execute();
+        } catch (GoogleJsonResponseException e) {
+            if (e.getDetails().getCode() == 409) {
+                log.info("Table {} already exist");
+            } else {
+                throw e;
+            }
+        }
+    }
+
+    private TableSchema readDefinition(InputStream schemaInputStream) throws Exception {
+        TableSchema schema = new TableSchema();
+
+        ObjectMapper mapper = new ObjectMapper();
+        List<TableFieldSchema> fields = mapper.readValue(schemaInputStream, ArrayList.class);
+
+        schema.setFields(fields);
+
+        return schema;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/7759b260/components/camel-google-bigquery/src/test/java/org/apache/camel/component/google/bigquery/integration/DynamicTableIdTest.java
----------------------------------------------------------------------
diff --git a/components/camel-google-bigquery/src/test/java/org/apache/camel/component/google/bigquery/integration/DynamicTableIdTest.java b/components/camel-google-bigquery/src/test/java/org/apache/camel/component/google/bigquery/integration/DynamicTableIdTest.java
new file mode 100644
index 0000000..93888ab
--- /dev/null
+++ b/components/camel-google-bigquery/src/test/java/org/apache/camel/component/google/bigquery/integration/DynamicTableIdTest.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.google.bigquery.integration;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.Produce;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.google.bigquery.GoogleBigQueryConstants;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.DefaultExchange;
+import org.junit.Before;
+import org.junit.Test;
+
+public class DynamicTableIdTest extends BigQueryTestSupport {
+    private static final String TABLE_ID_1 = "dynamic_table_1";
+    private static final String TABLE_ID_2 = "dynamic_table_2";
+
+    @EndpointInject(uri = "direct:in")
+    private Endpoint directIn;
+
+    @EndpointInject(uri = "google-bigquery:{{project.id}}:{{bigquery.datasetId}}")
+    private Endpoint bigqueryEndpoint;
+
+    @EndpointInject(uri = "mock:sendResult")
+    private MockEndpoint sendResult;
+
+    @Produce(uri = "direct:in")
+    private ProducerTemplate producer;
+
+    @Before
+    public void init() throws Exception {
+        createBqTable(TABLE_ID_1);
+        createBqTable(TABLE_ID_2);
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            public void configure() {
+                from(directIn)
+                        .routeId("DynamicTable")
+                        .to(bigqueryEndpoint)
+                        .to(sendResult);
+            }
+        };
+    }
+
+    @Test
+    public void dynamicTable() throws Exception {
+        Exchange exchange1 = new DefaultExchange(context);
+        String uuidCol11 = UUID.randomUUID().toString();
+        String uuidCol21 = UUID.randomUUID().toString();
+        exchange1.getIn().setHeader(GoogleBigQueryConstants.TABLE_ID, TABLE_ID_1);
+
+        Map<String, String> object1 = new HashMap<>();
+        object1.put("col1", uuidCol11);
+        object1.put("col2", uuidCol21);
+        exchange1.getIn().setBody(object1);
+
+        Exchange exchange2 = new DefaultExchange(context);
+        String uuidCol12 = UUID.randomUUID().toString();
+        String uuidCol22 = UUID.randomUUID().toString();
+        exchange2.getIn().setHeader(GoogleBigQueryConstants.TABLE_ID, TABLE_ID_2);
+
+        Map<String, String> object2 = new HashMap<>();
+        object2.put("col1", uuidCol12);
+        object2.put("col2", uuidCol22);
+        exchange2.getIn().setBody(object2);
+
+
+        sendResult.expectedMessageCount(2);
+        producer.send(exchange1);
+        producer.send(exchange2);
+        sendResult.assertIsSatisfied(4000);
+
+        assertRowExist(TABLE_ID_1, object1);
+        assertRowExist(TABLE_ID_2, object2);
+    }
+
+}