You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gora.apache.org by dj...@apache.org on 2020/03/23 10:07:56 UTC
[gora] branch master updated: GORA-546 Hazelcast Jet execution
engine support (#175)
This is an automated email from the ASF dual-hosted git repository.
djkevincr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gora.git
The following commit(s) were added to refs/heads/master by this push:
new 392b264 GORA-546 Hazelcast Jet execution engine support (#175)
392b264 is described below
commit 392b264fdb0da86556a632be5a12d41da3512f49
Author: Lahiru Jayasekara <ml...@gmail.com>
AuthorDate: Mon Mar 23 15:37:47 2020 +0530
GORA-546 Hazelcast Jet execution engine support (#175)
* Add hazelcast jet source initial impls
* Add HazelcastJet Source Vertex with distributed execution support
* Rename class name
* Improve test cases
* Add gora-jet sink connector implementations
* Refactor code and moved into gora-jet module
* Fix Indentation to 2 spaces
* Fix injecting datastore key into jet source
* Fix JetSource data partitioning
* Add Apache License
* Remove printStackTrace
* Add java doc comments
* Fix test cases via hbase testing utilities
* Remove printStackTrace
* Resolve hazelcast dependency conflict
* Resolve issues from review
* Add LogAnalyticsJet tutorial class
* Move gora-jet module up in pom.xml
* Change snapshot version in gora-jet module
* Add license header
---
gora-jet/conf/gora-aerospike-mapping.xml | 44 +
gora-jet/conf/gora-cassandra-mapping.xml | 51 ++
gora-jet/conf/gora-couchdb-mapping.xml | 41 +
gora-jet/conf/gora-hbase-mapping.xml | 55 ++
gora-jet/conf/gora-solr-mapping.xml | 43 +
gora-jet/conf/gora-sql-mapping.xml | 43 +
gora-jet/conf/gora.properties | 66 ++
gora-jet/conf/hazelcast-client.xml | 31 +
gora-jet/conf/hazelcast.xml | 34 +
gora-jet/pom.xml | 131 +++
.../main/java/org/apache/gora/jet/JetEngine.java | 51 ++
.../org/apache/gora/jet/JetInputOutputFormat.java | 50 ++
.../src/main/java/org/apache/gora/jet/JetSink.java | 91 +++
.../main/java/org/apache/gora/jet/JetSource.java | 109 +++
gora-jet/src/test/avro/pageview.json | 15 +
gora-jet/src/test/avro/resultPageView.json | 10 +
.../src/test/java/org/apache/gora/jet/JetTest.java | 150 ++++
.../org/apache/gora/jet/generated/Pageview.java | 881 +++++++++++++++++++++
.../apache/gora/jet/generated/ResultPageView.java | 468 +++++++++++
gora-tutorial/pom.xml | 5 +
.../apache/gora/tutorial/log/LogAnalyticsJet.java | 108 +++
pom.xml | 16 +-
22 files changed, 2492 insertions(+), 1 deletion(-)
diff --git a/gora-jet/conf/gora-aerospike-mapping.xml b/gora-jet/conf/gora-aerospike-mapping.xml
new file mode 100644
index 0000000..5506880
--- /dev/null
+++ b/gora-jet/conf/gora-aerospike-mapping.xml
@@ -0,0 +1,44 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<!--
+ Gora Mapping file for Aerospike Backend
+-->
+<gora-otd>
+
+ <policy name="write" gen="NONE" recordExists="UPDATE" commitLevel="COMMIT_ALL" durableDelete="false"/>
+ <policy name="read" priority="DEFAULT" consistencyLevel="CONSISTENCY_ONE" replica="SEQUENCE" maxRetries="2"/>
+
+ <class name="org.apache.gora.tutorial.log.generated.Pageview" keyClass="java.lang.Long" set="AccessLog" namespace = "test">
+ <field name="url" bin="url"/>
+ <field name="timestamp" bin="timestamp"/>
+ <field name="ip" bin="ip" />
+ <field name="httpMethod" bin="httpMethod"/>
+ <field name="httpStatusCode" bin="httpStatusCode"/>
+ <field name="responseSize" bin="responseSize"/>
+ <field name="referrer" bin="referrer"/>
+ <field name="userAgent" bin="userAgent"/>
+ </class>
+
+ <class name="org.apache.gora.tutorial.log.generated.MetricDatum" keyClass="java.lang.String" set="Metrics" namespace = "test">
+ <field name="metricDimension" bin="metricDimension"/>
+ <field name="timestamp" bin="ts"/>
+ <field name="metric" bin="metric"/>
+ </class>
+
+</gora-otd>
diff --git a/gora-jet/conf/gora-cassandra-mapping.xml b/gora-jet/conf/gora-cassandra-mapping.xml
new file mode 100644
index 0000000..bb0275f
--- /dev/null
+++ b/gora-jet/conf/gora-cassandra-mapping.xml
@@ -0,0 +1,51 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<!--
+ Gora Mapping file for Cassandra Backend
+-->
+<gora-otd>
+
+ <keyspace name="Pageview" cluster="Test Cluster" host="localhost">
+ <family name="common"/>
+ <family name="http"/>
+ <family name="misc"/>
+ </keyspace>
+
+ <keyspace name="Metrics" cluster="Test Cluster" host="localhost">
+ <family name="common"/>
+ </keyspace>
+
+ <class name="org.apache.gora.tutorial.log.generated.Pageview" keyClass="java.lang.Long" keyspace="Pageview">
+ <field name="url" family="common" qualifier="url"/>
+ <field name="timestamp" family="common" qualifier="timestamp"/>
+ <field name="ip" family="common" qualifier="ip" />
+ <field name="httpMethod" family="http" qualifier="httpMethod"/>
+ <field name="httpStatusCode" family="http" qualifier="httpStatusCode"/>
+ <field name="responseSize" family="http" qualifier="responseSize"/>
+ <field name="referrer" family="misc" qualifier="referrer"/>
+ <field name="userAgent" family="misc" qualifier="userAgent"/>
+ </class>
+
+ <class name="org.apache.gora.tutorial.log.generated.MetricDatum" keyClass="java.lang.String" keyspace="Metrics">
+ <field name="metricDimension" family="common" qualifier="metricDimension"/>
+ <field name="timestamp" family="common" qualifier="ts"/>
+ <field name="metric" family="common" qualifier="metric"/>
+ </class>
+
+</gora-otd>
diff --git a/gora-jet/conf/gora-couchdb-mapping.xml b/gora-jet/conf/gora-couchdb-mapping.xml
new file mode 100644
index 0000000..2cbee77
--- /dev/null
+++ b/gora-jet/conf/gora-couchdb-mapping.xml
@@ -0,0 +1,41 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<!--
+ Gora Mapping file for CouchDB Backend
+-->
+<gora-otd>
+
+ <class name="org.apache.gora.tutorial.log.generated.Pageview" keyClass="java.lang.Long" document="AccessLog">
+ <field name="url"/>
+ <field name="timestamp"/>
+ <field name="ip"/>
+ <field name="httpMethod"/>
+ <field name="httpStatusCode"/>
+ <field name="responseSize"/>
+ <field name="referrer"/>
+ <field name="userAgent"/>
+ </class>
+
+ <class name="org.apache.gora.tutorial.log.generated.MetricDatum" keyClass="java.lang.String" document="Metrics">
+ <field name="metricDimension"/>
+ <field name="timestamp"/>
+ <field name="metric"/>
+ </class>
+
+</gora-otd>
diff --git a/gora-jet/conf/gora-hbase-mapping.xml b/gora-jet/conf/gora-hbase-mapping.xml
new file mode 100644
index 0000000..9722144
--- /dev/null
+++ b/gora-jet/conf/gora-hbase-mapping.xml
@@ -0,0 +1,55 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<!--
+ Gora Mapping file for HBase Backend
+-->
+<gora-otd>
+ <table name="Pageview"> <!-- optional descriptors for tables -->
+ <family name="common"/> <!-- This can also have params like compression, bloom filters -->
+ <family name="http"/>
+ <family name="misc"/>
+ </table>
+
+ <table name="ResultPageView"> <!-- optional descriptors for tables -->
+ <family name="common"/> <!-- This can also have params like compression, bloom filters -->
+ </table>
+
+ <class name="org.apache.gora.jet.generated.Pageview" keyClass="java.lang.Long" table="AccessLog">
+ <field name="url" family="common" qualifier="url"/>
+ <field name="timestamp" family="common" qualifier="timestamp"/>
+ <field name="ip" family="common" qualifier="ip" />
+ <field name="httpMethod" family="http" qualifier="httpMethod"/>
+ <field name="httpStatusCode" family="http" qualifier="httpStatusCode"/>
+ <field name="responseSize" family="http" qualifier="responseSize"/>
+ <field name="referrer" family="misc" qualifier="referrer"/>
+ <field name="userAgent" family="misc" qualifier="userAgent"/>
+ </class>
+
+ <class name="org.apache.gora.jet.generated.MetricDatum" keyClass="java.lang.String" table="Metrics">
+ <field name="metricDimension" family="common" qualifier="metricDimension"/>
+ <field name="timestamp" family="common" qualifier="ts"/>
+ <field name="metric" family="common" qualifier="metric"/>
+ </class>
+
+ <class name="org.apache.gora.jet.generated.ResultPageView" keyClass="java.lang.Long" table="Results">
+ <field name="url" family="common" qualifier="url"/>
+ <field name="timestamp" family="common" qualifier="timestamp"/>
+ <field name="ip" family="common" qualifier="ip" />
+ </class>
+</gora-otd>
diff --git a/gora-jet/conf/gora-solr-mapping.xml b/gora-jet/conf/gora-solr-mapping.xml
new file mode 100644
index 0000000..5fe10fb
--- /dev/null
+++ b/gora-jet/conf/gora-solr-mapping.xml
@@ -0,0 +1,43 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<!--
+ Gora Mapping file for SQL Backend
+-->
+<gora-otd>
+ <class name="org.apache.gora.tutorial.log.generated.Pageview" keyClass="java.lang.Long" table="AccessLog">
+ <primarykey column="line"/>
+ <field name="url" column="url"/>
+ <field name="timestamp" column="timestamp"/>
+ <field name="ip" column="ip"/>
+ <field name="httpMethod" column="httpMethod"/>
+ <field name="httpStatusCode" column="httpStatusCode"/>
+ <field name="responseSize" column="responseSize"/>
+ <field name="referrer" column="referrer"/>
+ <field name="userAgent" column="userAgent"/>
+ </class>
+
+ <class name="org.apache.gora.tutorial.log.generated.MetricDatum" keyClass="java.lang.String" table="Metrics">
+ <primarykey column="id"/>
+ <field name="metricDimension" column="metricDimension"/>
+ <field name="timestamp" column="ts"/>
+ <field name="metric" column="metric"/>
+ </class>
+
+</gora-otd>
+
diff --git a/gora-jet/conf/gora-sql-mapping.xml b/gora-jet/conf/gora-sql-mapping.xml
new file mode 100644
index 0000000..90f1cfc
--- /dev/null
+++ b/gora-jet/conf/gora-sql-mapping.xml
@@ -0,0 +1,43 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<!--
+ Gora Mapping file for SQL Backend
+-->
+<gora-otd>
+ <class name="org.apache.gora.tutorial.log.generated.Pageview" keyClass="java.lang.Long" table="AccessLog">
+ <primarykey column="line"/>
+ <field name="url" column="url" length="512" primarykey="true"/>
+ <field name="timestamp" column="timestamp"/>
+ <field name="ip" column="ip" length="16"/>
+ <field name="httpMethod" column="httpMethod" length="6"/>
+ <field name="httpStatusCode" column="httpStatusCode"/>
+ <field name="responseSize" column="responseSize"/>
+ <field name="referrer" column="referrer" length="512"/>
+ <field name="userAgent" column="userAgent" length="512"/>
+ </class>
+
+ <class name="org.apache.gora.tutorial.log.generated.MetricDatum" keyClass="java.lang.String" table="Metrics">
+ <primarykey column="id" length="512"/>
+ <field name="metricDimension" column="metricDimension" length="512"/>
+ <field name="timestamp" column="ts"/>
+ <field name="metric" column="metric"/>
+ </class>
+
+</gora-otd>
+
diff --git a/gora-jet/conf/gora.properties b/gora-jet/conf/gora.properties
new file mode 100644
index 0000000..ddce0ed
--- /dev/null
+++ b/gora-jet/conf/gora.properties
@@ -0,0 +1,66 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+##gora.datastore.default is the default detastore implementation to use
+##if it is not passed to the DataStoreFactory#createDataStore() method.
+gora.datastore.default=org.apache.gora.hbase.store.HBaseStore
+#gora.datastore.default=org.apache.gora.couchdb.store.CouchDBStore
+#gora.datastore.default=org.apache.gora.cassandra.store.CassandraStore
+#gora.datastore.default=org.apache.gora.solr.store.SolrStore
+#gora.datastore.default=org.apache.gora.aerospike.store.AerospikeStore
+
+#gora.datastore.default=org.apache.gora.avro.store.AvroStore
+#gora.avrostore.input.path=hdfs://localhost:9000/gora.avrostore.test.input
+#gora.avrostore.output.path=hdfs://localhost:9000/gora.avrostore.test.output
+#gora.avrostore.codec.type=JSON || BINARY
+
+##whether to create schema automatically if not exists.
+gora.datastore.autocreateschema=true
+
+##Cassandra properties for gora-cassandra module using Cassandra
+#gora.cassandrastore.servers=localhost:9160
+
+##JDBC properties for gora-sql module using HSQL
+gora.sqlstore.jdbc.driver=org.hsqldb.jdbcDriver
+##HSQL jdbc connection as persistent in-process database
+gora.sqlstore.jdbc.url=jdbc:hsqldb:file:./hsql-data
+
+##HSQL jdbc connection as network server
+#gora.sqlstore.jdbc.url=jdbc:hsqldb:hsql://localhost/goratest
+
+##JDBC properties for gora-sql module using MySQL
+#gora.sqlstore.jdbc.driver=com.mysql.jdbc.Driver
+#gora.sqlstore.jdbc.url=jdbc:mysql://localhost:3306/goratest
+#gora.sqlstore.jdbc.user=root
+#gora.sqlstore.jdbc.password=
+
+gora.solrstore.solr.url=http://localhost:8983/solr
+gora.solrstore.solr.commitwithin=0
+gora.solrstore.solr.batchsize=100
+# set which Solrj server impl you wish to use
+# cloud, concurrent, http, loadbalance
+gora.solrstore.solr.solrjserver=http
+
+#JCache dataStore properties
+gora.cache.datastore.default=org.apache.gora.jcache.store.JCacheStore
+gora.datastore.jcache.provider=com.hazelcast.cache.impl.HazelcastServerCachingProvider
+#gora.datastore.jcache.provider=com.hazelcast.client.cache.impl.HazelcastClientCachingProvider
+
+##Aerospike dataStore properties
+#gora.aerospikestore.server.ip=localhost
+#gora.aerospikestore.server.port=3000
+#gora.aerospikestore.server.username=
+#gora.aerospikestore.server.password=
diff --git a/gora-jet/conf/hazelcast-client.xml b/gora-jet/conf/hazelcast-client.xml
new file mode 100644
index 0000000..93f21e2
--- /dev/null
+++ b/gora-jet/conf/hazelcast-client.xml
@@ -0,0 +1,31 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<!--
+ Hazelcast client side cache provider configuration.
+-->
+
+<hazelcast-client xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://www.hazelcast.com/schema/client-config
+ http://www.hazelcast.com/schema/client-config/hazelcast-client-config-3.6.xsd"
+ xmlns="http://www.hazelcast.com/schema/client-config">
+ <network>
+ <cluster-members>
+ <address>127.0.0.1</address>
+ </cluster-members>
+ </network>
+</hazelcast-client>
diff --git a/gora-jet/conf/hazelcast.xml b/gora-jet/conf/hazelcast.xml
new file mode 100755
index 0000000..9838710
--- /dev/null
+++ b/gora-jet/conf/hazelcast.xml
@@ -0,0 +1,34 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<!--
+ Hazelcast server side cache provider configuration.
+-->
+
+<hazelcast xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://www.hazelcast.com/schema/config
+ http://www.hazelcast.com/schema/config/hazelcast-config-3.6.xsd"
+ xmlns="http://www.hazelcast.com/schema/config">
+ <network>
+ <join>
+ <multicast enabled="false"/>
+ <tcp-ip enabled="true">
+ <member>127.0.0.1</member>
+ </tcp-ip>
+ </join>
+ </network>
+</hazelcast>
diff --git a/gora-jet/pom.xml b/gora-jet/pom.xml
new file mode 100644
index 0000000..36b1734
--- /dev/null
+++ b/gora-jet/pom.xml
@@ -0,0 +1,131 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.gora</groupId>
+ <artifactId>gora</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ <relativePath>../</relativePath>
+ </parent>
+
+ <artifactId>gora-jet</artifactId>
+ <packaging>bundle</packaging>
+
+ <name>Apache Gora :: Jet</name>
+ <url>http://gora.apache.org</url>
+ <description>Jet -> Gora -> Jet Sink and Source connectors</description>
+ <inceptionYear>2010</inceptionYear>
+ <organization>
+ <name>The Apache Software Foundation</name>
+ <url>http://www.apache.org/</url>
+ </organization>
+
+ <properties>
+ <osgi.import>*</osgi.import>
+ <osgi.export>org.apache.gora.jet*;version="${project.version}";-noimport:=true</osgi.export>
+ </properties>
+
+ <build>
+ <directory>target</directory>
+ <outputDirectory>target/classes</outputDirectory>
+ <finalName>${project.artifactId}-${project.version}</finalName>
+ <testOutputDirectory>target/test-classes</testOutputDirectory>
+ <testSourceDirectory>src/test/java</testSourceDirectory>
+ <sourceDirectory>src/main/java</sourceDirectory>
+ <resources>
+ <resource>
+ <directory>${basedir}/src/main/resources</directory>
+ </resource>
+ <resource>
+ <directory>${basedir}/conf</directory>
+ </resource>
+ </resources>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <version>${build-helper-maven-plugin.version}</version>
+ <executions>
+ <execution>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>src/examples/java</source>
+ </sources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>jcl-over-slf4j</artifactId>
+ </dependency>
+
+ <!-- Gora Internal Dependencies -->
+ <dependency>
+ <groupId>org.apache.gora</groupId>
+ <artifactId>gora-core</artifactId>
+ <scope>compile</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.gora</groupId>
+ <artifactId>gora-hbase</artifactId>
+ </dependency>
+
+ <!-- jet Dependencies -->
+ <dependency>
+ <groupId>com.hazelcast.jet</groupId>
+ <artifactId>hazelcast-jet</artifactId>
+ </dependency>
+
+ <!-- Logging Dependencies -->
+ <dependency>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>javax.jms</groupId>
+ <artifactId>jms</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <!-- Testing Dependencies -->
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>2.5.2</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-minicluster</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-testing-util</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+</project>
\ No newline at end of file
diff --git a/gora-jet/src/main/java/org/apache/gora/jet/JetEngine.java b/gora-jet/src/main/java/org/apache/gora/jet/JetEngine.java
new file mode 100644
index 0000000..004b561
--- /dev/null
+++ b/gora-jet/src/main/java/org/apache/gora/jet/JetEngine.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.jet;
+
+import com.hazelcast.jet.pipeline.BatchSource;
+import com.hazelcast.jet.pipeline.Sink;
+import com.hazelcast.jet.pipeline.Sinks;
+import com.hazelcast.jet.pipeline.Sources;
+import org.apache.gora.persistency.impl.PersistentBase;
+import org.apache.gora.query.Query;
+import org.apache.gora.store.DataStore;
+
+/**
+ * Core class which handles Gora - Jet Engine integration.
+ */
+public class JetEngine<KeyIn, ValueIn extends PersistentBase, KeyOut, ValueOut extends PersistentBase> {
+ static DataStore dataOutStore;
+ static DataStore dataInStore;
+ static Query query;
+
+ public BatchSource<JetInputOutputFormat<KeyIn, ValueIn>> createDataSource(DataStore<KeyIn, ValueIn> dataOutStore) {
+ return createDataSource(dataOutStore, dataOutStore.newQuery());
+ }
+
+ public BatchSource<JetInputOutputFormat<KeyIn, ValueIn>> createDataSource(DataStore<KeyIn, ValueIn> dataOutStore,
+ Query<KeyIn, ValueIn> query) {
+ JetEngine.dataInStore = dataOutStore;
+ JetEngine.query = query;
+ return Sources.batchFromProcessor("gora-jet-source", new JetSource<KeyIn, ValueIn>());
+ }
+
+ public Sink<JetInputOutputFormat<KeyOut, ValueOut>> createDataSink(DataStore<KeyOut, ValueOut> dataOutStore) {
+ JetEngine.dataOutStore = dataOutStore;
+ return Sinks.fromProcessor("gora-jet-sink", new JetSink<KeyOut, ValueOut>());
+ }
+}
diff --git a/gora-jet/src/main/java/org/apache/gora/jet/JetInputOutputFormat.java b/gora-jet/src/main/java/org/apache/gora/jet/JetInputOutputFormat.java
new file mode 100644
index 0000000..b4652be
--- /dev/null
+++ b/gora-jet/src/main/java/org/apache/gora/jet/JetInputOutputFormat.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.jet;
+
+import org.apache.gora.persistency.impl.PersistentBase;
+
+/**
+ * Wrapper class which will be used to fetch data from data stores to Gora-
+ * jet-source and to write data into data stores through Gora-jet-sink.
+ */
+public class JetInputOutputFormat<KeyOut, ValueOut extends PersistentBase> {
+ KeyOut key;
+ ValueOut value;
+
+ public JetInputOutputFormat(KeyOut key, ValueOut value) {
+ this.key = key;
+ this.value = value;
+ }
+
+ public KeyOut getKey() {
+ return key;
+ }
+
+ public void setKey(KeyOut key) {
+ this.key = key;
+ }
+
+ public ValueOut getValue() {
+ return value;
+ }
+
+ public void setValue(ValueOut value) {
+ this.value = value;
+ }
+}
diff --git a/gora-jet/src/main/java/org/apache/gora/jet/JetSink.java b/gora-jet/src/main/java/org/apache/gora/jet/JetSink.java
new file mode 100644
index 0000000..dcdb28e
--- /dev/null
+++ b/gora-jet/src/main/java/org/apache/gora/jet/JetSink.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.jet;
+
+import com.hazelcast.jet.core.AbstractProcessor;
+import com.hazelcast.jet.core.ProcessorMetaSupplier;
+import com.hazelcast.jet.core.ProcessorSupplier;
+import com.hazelcast.nio.Address;
+import org.apache.gora.persistency.impl.PersistentBase;
+import org.apache.gora.util.GoraException;
+
+import javax.annotation.Nonnull;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.IntStream.range;
+
+/**
+ * jet-sink implementation.
+ */
+public class JetSink<KeyOut, ValueOut extends PersistentBase> implements ProcessorMetaSupplier {
+
+ private transient int localParallelism;
+
+ @Override
+ public void init(@Nonnull Context context) {
+ localParallelism = context.localParallelism();
+ }
+
+ @Nonnull
+ @Override
+ public Function<? super Address, ? extends ProcessorSupplier> get(@Nonnull List<Address> addresses) {
+ Map<Address, ProcessorSupplier> map = new HashMap<>();
+ for (int i = 0; i < addresses.size(); i++) {
+ //globalIndexBase is the first processor index in a certain Jet-Cluster member
+ int globalIndexBase = localParallelism * i;
+
+ // processorCount will be equal to localParallelism:
+ ProcessorSupplier supplier = processorCount ->
+ range(globalIndexBase, globalIndexBase + processorCount)
+ .mapToObj(globalIndex ->
+ new SinkProcessor<KeyOut, ValueOut>()
+ ).collect(toList());
+ map.put(addresses.get(i), supplier);
+ }
+ return map::get;
+ }
+}
+
+class SinkProcessor<KeyOut, ValueOut extends PersistentBase> extends AbstractProcessor {
+
+ @Override
+ public boolean isCooperative() {
+ return false;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ protected boolean tryProcess(int ordinal, Object item) throws Exception {
+ JetEngine.dataOutStore.put(((JetInputOutputFormat<KeyOut, ValueOut>) item).getKey(),
+ ((JetInputOutputFormat<KeyOut, ValueOut>) item).getValue());
+ return true;
+ }
+
+ @Override
+ public void close() {
+ try {
+ JetEngine.dataOutStore.flush();
+ } catch (GoraException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/gora-jet/src/main/java/org/apache/gora/jet/JetSource.java b/gora-jet/src/main/java/org/apache/gora/jet/JetSource.java
new file mode 100644
index 0000000..86d1d23
--- /dev/null
+++ b/gora-jet/src/main/java/org/apache/gora/jet/JetSource.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.jet;
+
+import com.hazelcast.jet.Traverser;
+import com.hazelcast.jet.core.AbstractProcessor;
+import com.hazelcast.jet.core.ProcessorMetaSupplier;
+import com.hazelcast.jet.core.ProcessorSupplier;
+import com.hazelcast.nio.Address;
+import org.apache.gora.persistency.impl.PersistentBase;
+import org.apache.gora.query.PartitionQuery;
+import org.apache.gora.query.Result;
+
+import javax.annotation.Nonnull;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+
+import static com.hazelcast.jet.Traversers.traverseIterable;
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.IntStream.range;
+
+/**
+ * jet-source implementation.
+ */
+public class JetSource<KeyIn, ValueIn extends PersistentBase> implements ProcessorMetaSupplier {
+
+ private int totalParallelism;
+ private transient int localParallelism;
+
+ @Override
+ public void init(@Nonnull Context context) {
+ totalParallelism = context.totalParallelism();
+ localParallelism = context.localParallelism();
+ }
+
+ @Nonnull
+ @Override
+ public Function<? super Address, ? extends ProcessorSupplier> get(@Nonnull List<Address> addresses) {
+ Map<Address, ProcessorSupplier> map = new HashMap<>();
+ for (int i = 0; i < addresses.size(); i++) {
+ // We'll calculate the global index of each processor in the cluster:
+ //globalIndexBase is the first processor index in a certain Jet-Cluster member
+ int globalIndexBase = localParallelism * i;
+
+ // processorCount will be equal to localParallelism:
+ ProcessorSupplier supplier = processorCount ->
+ range(globalIndexBase, globalIndexBase + processorCount)
+ .mapToObj(globalIndex ->
+ new GoraJetProcessor<KeyIn, ValueIn>(getPartitionedData(globalIndex))
+ ).collect(toList());
+ map.put(addresses.get(i), supplier);
+ }
+ return map::get;
+ }
+
+ @SuppressWarnings("unchecked")
+ private List<JetInputOutputFormat<KeyIn, ValueIn>> getPartitionedData(int globalIndex) {
+ try {
+ List<PartitionQuery<KeyIn, ValueIn>> partitionQueries = JetEngine.dataInStore.getPartitions(JetEngine.query);
+ List<JetInputOutputFormat<KeyIn, ValueIn>> resultsList = new ArrayList<>();
+ int i = 1;
+ int partitionNo = globalIndex;
+ while (partitionNo < partitionQueries.size()) {
+ Result<KeyIn, ValueIn> result = null;
+ result = partitionQueries.get(partitionNo).execute();
+ while (result.next()) {
+ resultsList.add(new JetInputOutputFormat<>(result.getKey(), result.get()));
+ }
+ partitionNo = (i * totalParallelism) + globalIndex;
+ i++;
+ }
+ return resultsList;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
+
+class GoraJetProcessor<KeyIn, ValueIn extends PersistentBase> extends AbstractProcessor {
+
+ private final Traverser<JetInputOutputFormat<KeyIn, ValueIn>> traverser;
+
+ GoraJetProcessor(List<JetInputOutputFormat<KeyIn, ValueIn>> list) {
+ this.traverser = traverseIterable(list);
+ }
+
+ @Override
+ public boolean complete() {
+ return emitFromTraverser(traverser);
+ }
+}
diff --git a/gora-jet/src/test/avro/pageview.json b/gora-jet/src/test/avro/pageview.json
new file mode 100644
index 0000000..10412f2
--- /dev/null
+++ b/gora-jet/src/test/avro/pageview.json
@@ -0,0 +1,15 @@
+{
+ "type": "record",
+ "name": "Pageview", "default":null,
+ "namespace": "org.apache.gora.jet.generated",
+ "fields" : [
+ {"name": "url", "type": ["null","string"], "default":null},
+ {"name": "timestamp", "type": "long", "default":0},
+ {"name": "ip", "type": ["null","string"], "default":null},
+ {"name": "httpMethod", "type": ["null","string"], "default":null},
+ {"name": "httpStatusCode", "type": "int", "default":0},
+ {"name": "responseSize", "type": "int", "default":0},
+ {"name": "referrer", "type": ["null","string"], "default":null},
+ {"name": "userAgent", "type": ["null","string"], "default":null}
+ ]
+}
diff --git a/gora-jet/src/test/avro/resultPageView.json b/gora-jet/src/test/avro/resultPageView.json
new file mode 100644
index 0000000..50eb914
--- /dev/null
+++ b/gora-jet/src/test/avro/resultPageView.json
@@ -0,0 +1,10 @@
+{
+ "type": "record",
+ "name": "ResultPageView", "default":null,
+ "namespace": "org.apache.gora.jet.generated",
+ "fields" : [
+ {"name": "url", "type": ["null","string"], "default":null},
+ {"name": "timestamp", "type": "long", "default":0},
+ {"name": "ip", "type": ["null","string"], "default":null}
+ ]
+}
diff --git a/gora-jet/src/test/java/org/apache/gora/jet/JetTest.java b/gora-jet/src/test/java/org/apache/gora/jet/JetTest.java
new file mode 100644
index 0000000..1f813db
--- /dev/null
+++ b/gora-jet/src/test/java/org/apache/gora/jet/JetTest.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.jet;
+
+import com.hazelcast.core.IMap;
+import com.hazelcast.jet.Jet;
+import com.hazelcast.jet.JetInstance;
+import com.hazelcast.jet.pipeline.BatchSource;
+import com.hazelcast.jet.pipeline.Pipeline;
+import com.hazelcast.jet.pipeline.Sinks;
+import org.apache.gora.jet.generated.Pageview;
+import org.apache.gora.jet.generated.ResultPageView;
+import org.apache.gora.query.Query;
+import org.apache.gora.query.Result;
+import org.apache.gora.store.DataStore;
+import org.apache.gora.store.DataStoreFactory;
+import org.apache.gora.util.GoraException;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.regex.Pattern;
+
+import static com.hazelcast.jet.Traversers.traverseArray;
+import static com.hazelcast.jet.aggregate.AggregateOperations.counting;
+import static com.hazelcast.jet.function.Functions.wholeItem;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test case for jet sink and source connectors.
+ */
+public class JetTest {
+
+ private static DataStore<Long, ResultPageView> dataStoreOut;
+ private static Query<Long, Pageview> query = null;
+
+ private static HBaseTestingUtility utility;
+
+ @BeforeClass
+ public static void insertData() throws Exception {
+ utility = new HBaseTestingUtility();
+ utility.startMiniCluster();
+
+ dataStoreOut = DataStoreFactory.getDataStore(Long.class, ResultPageView.class, utility.getConfiguration());
+
+ ResultPageView resultPageView = new ResultPageView();
+ resultPageView.setIp("88.240.129.183");
+ resultPageView.setTimestamp(123L);
+ resultPageView.setUrl("I am the the one");
+
+ ResultPageView resultPageView1 = new ResultPageView();
+ resultPageView1.setIp("87.240.129.170");
+ resultPageView1.setTimestamp(124L);
+ resultPageView1.setUrl("How are you");
+
+ ResultPageView resultPageView2 = new ResultPageView();
+ resultPageView1.setIp("88.240.129.183");
+ resultPageView1.setTimestamp(124L);
+ resultPageView1.setUrl("This is the jet engine");
+
+ dataStoreOut.put(1L,resultPageView);
+ dataStoreOut.put(2L,resultPageView1);
+ dataStoreOut.put(3L,resultPageView2);
+ dataStoreOut.flush();
+
+ }
+
+ @Test
+ public void testNewJetSource() throws Exception {
+
+ DataStore<Long, Pageview> dataStoreIn;
+
+ dataStoreIn = DataStoreFactory.getDataStore(Long.class, Pageview.class, utility.getConfiguration());
+
+ dataStoreOut = DataStoreFactory.getDataStore(Long.class, ResultPageView.class, utility.getConfiguration());
+
+ query = dataStoreIn.newQuery();
+ query.setStartKey(0L);
+ query.setEndKey(55L);
+
+ JetEngine<Long, Pageview, Long, ResultPageView> jetEngine = new JetEngine<>();
+ BatchSource<JetInputOutputFormat<Long, Pageview>> fileSource = jetEngine.createDataSource(dataStoreIn, query);
+ Pipeline p = Pipeline.create();
+ p.drawFrom(fileSource)
+ .filter(item -> item.getValue().getIp().toString().equals("88.240.129.183"))
+ .map(e -> {
+ ResultPageView resultPageView = new ResultPageView();
+ resultPageView.setIp(e.getValue().getIp());
+ resultPageView.setTimestamp(e.getValue().getTimestamp());
+ resultPageView.setUrl(e.getValue().getUrl());
+ return new JetInputOutputFormat<Long, ResultPageView>(e.getValue().getTimestamp(), resultPageView);
+ })
+ .drainTo(jetEngine.createDataSink(dataStoreOut));
+
+ JetInstance jet = Jet.newJetInstance();
+ Jet.newJetInstance();
+ try {
+ jet.newJob(p).join();
+ } finally {
+ Jet.shutdownAll();
+ }
+
+ Query<Long, ResultPageView> query = dataStoreOut.newQuery();
+ Result<Long, ResultPageView> result = query.execute();
+ int noOfOutputRecords = 0;
+ String ip = "";
+ while (result.next()) {
+ noOfOutputRecords++;
+ ip = result.get().getIp().toString();
+ assertEquals("88.240.129.183", ip);
+ }
+ assertEquals(2, noOfOutputRecords);
+ }
+
+ @Test
+ public void jetWordCount() throws GoraException {
+ dataStoreOut = DataStoreFactory.getDataStore(Long.class, ResultPageView.class, utility.getConfiguration());
+
+ Query<Long, ResultPageView> query = dataStoreOut.newQuery();
+ JetEngine<Long, ResultPageView, Long, ResultPageView> jetEngine = new JetEngine<>();
+
+ Pattern delimiter = Pattern.compile("\\W+");
+ Pipeline p = Pipeline.create();
+ p.drawFrom(jetEngine.createDataSource(dataStoreOut, query))
+ .flatMap(e -> traverseArray(delimiter.split(e.getValue().getUrl().toString())))
+ .filter(word -> !word.isEmpty())
+ .groupingKey(wholeItem())
+ .aggregate(counting())
+ .drainTo(Sinks.map("COUNTS"));
+ JetInstance jet = Jet.newJetInstance();;
+ jet.newJob(p).join();
+ IMap<String, Long> counts = jet.getMap("COUNTS");
+ assertEquals(3L, (long)counts.get("the"));
+ }
+}
diff --git a/gora-jet/src/test/java/org/apache/gora/jet/generated/Pageview.java b/gora-jet/src/test/java/org/apache/gora/jet/generated/Pageview.java
new file mode 100644
index 0000000..9cef268
--- /dev/null
+++ b/gora-jet/src/test/java/org/apache/gora/jet/generated/Pageview.java
@@ -0,0 +1,881 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements. See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership. The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the"
+ *License"); you may not use this file except in compliance
+ *with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing, software
+ *distributed under the License is distributed on an "AS IS" BASIS,
+ *WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *See the License for the specific language governing permissions and
+ *limitations under the License.
+ */
+package org.apache.gora.jet.generated;
+
+public class Pageview extends org.apache.gora.persistency.impl.PersistentBase implements org.apache.avro.specific.SpecificRecord, org.apache.gora.persistency.Persistent {
+ public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Pageview\",\"namespace\":\"org.apache.gora.jet.generated\",\"fields\":[{\"name\":\"url\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"timestamp\",\"type\":\"long\",\"default\":0},{\"name\":\"ip\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"httpMethod\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"httpStatusCode\ [...]
+ private static final long serialVersionUID = 483475731342327584L;
+ /** Enum containing all data bean's fields. */
+ public static enum Field {
+ URL(0, "url"),
+ TIMESTAMP(1, "timestamp"),
+ IP(2, "ip"),
+ HTTP_METHOD(3, "httpMethod"),
+ HTTP_STATUS_CODE(4, "httpStatusCode"),
+ RESPONSE_SIZE(5, "responseSize"),
+ REFERRER(6, "referrer"),
+ USER_AGENT(7, "userAgent"),
+ ;
+ /**
+ * Field's index.
+ */
+ private int index;
+
+ /**
+ * Field's name.
+ */
+ private String name;
+
+ /**
+ * Field's constructor
+ * @param index field's index.
+ * @param name field's name.
+ */
+ Field(int index, String name) {this.index=index;this.name=name;}
+
+ /**
+ * Gets field's index.
+ * @return int field's index.
+ */
+ public int getIndex() {return index;}
+
+ /**
+ * Gets field's name.
+ * @return String field's name.
+ */
+ public String getName() {return name;}
+
+ /**
+ * Gets field's attributes to string.
+ * @return String field's attributes to string.
+ */
+ public String toString() {return name;}
+ };
+
+ public static final String[] _ALL_FIELDS = {
+ "url",
+ "timestamp",
+ "ip",
+ "httpMethod",
+ "httpStatusCode",
+ "responseSize",
+ "referrer",
+ "userAgent",
+ };
+
+ /**
+ * Gets the total field count.
+ * @return int field count
+ */
+ public int getFieldsCount() {
+ return Pageview._ALL_FIELDS.length;
+ }
+
+ private java.lang.CharSequence url;
+ private long timestamp;
+ private java.lang.CharSequence ip;
+ private java.lang.CharSequence httpMethod;
+ private int httpStatusCode;
+ private int responseSize;
+ private java.lang.CharSequence referrer;
+ private java.lang.CharSequence userAgent;
+ public org.apache.avro.Schema getSchema() { return SCHEMA$; }
+ // Used by DatumWriter. Applications should not call.
+ public java.lang.Object get(int field$) {
+ switch (field$) {
+ case 0: return url;
+ case 1: return timestamp;
+ case 2: return ip;
+ case 3: return httpMethod;
+ case 4: return httpStatusCode;
+ case 5: return responseSize;
+ case 6: return referrer;
+ case 7: return userAgent;
+ default: throw new org.apache.avro.AvroRuntimeException("Bad index");
+ }
+ }
+
+ // Used by DatumReader. Applications should not call.
+ @SuppressWarnings(value="unchecked")
+ public void put(int field$, java.lang.Object value) {
+ switch (field$) {
+ case 0: url = (java.lang.CharSequence)(value); break;
+ case 1: timestamp = (java.lang.Long)(value); break;
+ case 2: ip = (java.lang.CharSequence)(value); break;
+ case 3: httpMethod = (java.lang.CharSequence)(value); break;
+ case 4: httpStatusCode = (java.lang.Integer)(value); break;
+ case 5: responseSize = (java.lang.Integer)(value); break;
+ case 6: referrer = (java.lang.CharSequence)(value); break;
+ case 7: userAgent = (java.lang.CharSequence)(value); break;
+ default: throw new org.apache.avro.AvroRuntimeException("Bad index");
+ }
+ }
+
+ /**
+ * Gets the value of the 'url' field.
+ */
+ public java.lang.CharSequence getUrl() {
+ return url;
+ }
+
+ /**
+ * Sets the value of the 'url' field.
+ * @param value the value to set.
+ */
+ public void setUrl(java.lang.CharSequence value) {
+ this.url = value;
+ setDirty(0);
+ }
+
+ /**
+ * Checks the dirty status of the 'url' field. A field is dirty if it represents a change that has not yet been written to the database.
+ * @param value the value to set.
+ */
+ public boolean isUrlDirty() {
+ return isDirty(0);
+ }
+
+ /**
+ * Gets the value of the 'timestamp' field.
+ */
+ public java.lang.Long getTimestamp() {
+ return timestamp;
+ }
+
+ /**
+ * Sets the value of the 'timestamp' field.
+ * @param value the value to set.
+ */
+ public void setTimestamp(java.lang.Long value) {
+ this.timestamp = value;
+ setDirty(1);
+ }
+
+ /**
+ * Checks the dirty status of the 'timestamp' field. A field is dirty if it represents a change that has not yet been written to the database.
+ * @param value the value to set.
+ */
+ public boolean isTimestampDirty() {
+ return isDirty(1);
+ }
+
+ /**
+ * Gets the value of the 'ip' field.
+ */
+ public java.lang.CharSequence getIp() {
+ return ip;
+ }
+
+ /**
+ * Sets the value of the 'ip' field.
+ * @param value the value to set.
+ */
+ public void setIp(java.lang.CharSequence value) {
+ this.ip = value;
+ setDirty(2);
+ }
+
+ /**
+ * Checks the dirty status of the 'ip' field. A field is dirty if it represents a change that has not yet been written to the database.
+ * @param value the value to set.
+ */
+ public boolean isIpDirty() {
+ return isDirty(2);
+ }
+
+ /**
+ * Gets the value of the 'httpMethod' field.
+ */
+ public java.lang.CharSequence getHttpMethod() {
+ return httpMethod;
+ }
+
+ /**
+ * Sets the value of the 'httpMethod' field.
+ * @param value the value to set.
+ */
+ public void setHttpMethod(java.lang.CharSequence value) {
+ this.httpMethod = value;
+ setDirty(3);
+ }
+
+ /**
+ * Checks the dirty status of the 'httpMethod' field. A field is dirty if it represents a change that has not yet been written to the database.
+ * @param value the value to set.
+ */
+ public boolean isHttpMethodDirty() {
+ return isDirty(3);
+ }
+
+ /**
+ * Gets the value of the 'httpStatusCode' field.
+ */
+ public java.lang.Integer getHttpStatusCode() {
+ return httpStatusCode;
+ }
+
+ /**
+ * Sets the value of the 'httpStatusCode' field.
+ * @param value the value to set.
+ */
+ public void setHttpStatusCode(java.lang.Integer value) {
+ this.httpStatusCode = value;
+ setDirty(4);
+ }
+
+ /**
+ * Checks the dirty status of the 'httpStatusCode' field. A field is dirty if it represents a change that has not yet been written to the database.
+ * @param value the value to set.
+ */
+ public boolean isHttpStatusCodeDirty() {
+ return isDirty(4);
+ }
+
+ /**
+ * Gets the value of the 'responseSize' field.
+ */
+ public java.lang.Integer getResponseSize() {
+ return responseSize;
+ }
+
+ /**
+ * Sets the value of the 'responseSize' field.
+ * @param value the value to set.
+ */
+ public void setResponseSize(java.lang.Integer value) {
+ this.responseSize = value;
+ setDirty(5);
+ }
+
+ /**
+ * Checks the dirty status of the 'responseSize' field. A field is dirty if it represents a change that has not yet been written to the database.
+ * @param value the value to set.
+ */
+ public boolean isResponseSizeDirty() {
+ return isDirty(5);
+ }
+
+ /**
+ * Gets the value of the 'referrer' field.
+ */
+ public java.lang.CharSequence getReferrer() {
+ return referrer;
+ }
+
+ /**
+ * Sets the value of the 'referrer' field.
+ * @param value the value to set.
+ */
+ public void setReferrer(java.lang.CharSequence value) {
+ this.referrer = value;
+ setDirty(6);
+ }
+
+ /**
+ * Checks the dirty status of the 'referrer' field. A field is dirty if it represents a change that has not yet been written to the database.
+ * @param value the value to set.
+ */
+ public boolean isReferrerDirty() {
+ return isDirty(6);
+ }
+
+ /**
+ * Gets the value of the 'userAgent' field.
+ */
+ public java.lang.CharSequence getUserAgent() {
+ return userAgent;
+ }
+
+ /**
+ * Sets the value of the 'userAgent' field.
+ * @param value the value to set.
+ */
+ public void setUserAgent(java.lang.CharSequence value) {
+ this.userAgent = value;
+ setDirty(7);
+ }
+
+ /**
+ * Checks the dirty status of the 'userAgent' field. A field is dirty if it represents a change that has not yet been written to the database.
+ * @param value the value to set.
+ */
+ public boolean isUserAgentDirty() {
+ return isDirty(7);
+ }
+
+ /** Creates a new Pageview RecordBuilder */
+ public static org.apache.gora.jet.generated.Pageview.Builder newBuilder() {
+ return new org.apache.gora.jet.generated.Pageview.Builder();
+ }
+
+ /** Creates a new Pageview RecordBuilder by copying an existing Builder */
+ public static org.apache.gora.jet.generated.Pageview.Builder newBuilder(org.apache.gora.jet.generated.Pageview.Builder other) {
+ return new org.apache.gora.jet.generated.Pageview.Builder(other);
+ }
+
+ /** Creates a new Pageview RecordBuilder by copying an existing Pageview instance */
+ public static org.apache.gora.jet.generated.Pageview.Builder newBuilder(org.apache.gora.jet.generated.Pageview other) {
+ return new org.apache.gora.jet.generated.Pageview.Builder(other);
+ }
+
+ @Override
+ public org.apache.gora.jet.generated.Pageview clone() {
+ return newBuilder(this).build();
+ }
+
+ private static java.nio.ByteBuffer deepCopyToReadOnlyBuffer(
+ java.nio.ByteBuffer input) {
+ java.nio.ByteBuffer copy = java.nio.ByteBuffer.allocate(input.capacity());
+ int position = input.position();
+ input.reset();
+ int mark = input.position();
+ int limit = input.limit();
+ input.rewind();
+ input.limit(input.capacity());
+ copy.put(input);
+ input.rewind();
+ copy.rewind();
+ input.position(mark);
+ input.mark();
+ copy.position(mark);
+ copy.mark();
+ input.position(position);
+ copy.position(position);
+ input.limit(limit);
+ copy.limit(limit);
+ return copy.asReadOnlyBuffer();
+ }
+
+ /**
+ * RecordBuilder for Pageview instances.
+ */
+ public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase<Pageview>
+ implements org.apache.avro.data.RecordBuilder<Pageview> {
+
+ private java.lang.CharSequence url;
+ private long timestamp;
+ private java.lang.CharSequence ip;
+ private java.lang.CharSequence httpMethod;
+ private int httpStatusCode;
+ private int responseSize;
+ private java.lang.CharSequence referrer;
+ private java.lang.CharSequence userAgent;
+
+ /** Creates a new Builder */
+ private Builder() {
+ super(org.apache.gora.jet.generated.Pageview.SCHEMA$);
+ }
+
+ /** Creates a Builder by copying an existing Builder */
+ private Builder(org.apache.gora.jet.generated.Pageview.Builder other) {
+ super(other);
+ }
+
+ /** Creates a Builder by copying an existing Pageview instance */
+ private Builder(org.apache.gora.jet.generated.Pageview other) {
+ super(org.apache.gora.jet.generated.Pageview.SCHEMA$);
+ if (isValidValue(fields()[0], other.url)) {
+ this.url = (java.lang.CharSequence) data().deepCopy(fields()[0].schema(), other.url);
+ fieldSetFlags()[0] = true;
+ }
+ if (isValidValue(fields()[1], other.timestamp)) {
+ this.timestamp = (java.lang.Long) data().deepCopy(fields()[1].schema(), other.timestamp);
+ fieldSetFlags()[1] = true;
+ }
+ if (isValidValue(fields()[2], other.ip)) {
+ this.ip = (java.lang.CharSequence) data().deepCopy(fields()[2].schema(), other.ip);
+ fieldSetFlags()[2] = true;
+ }
+ if (isValidValue(fields()[3], other.httpMethod)) {
+ this.httpMethod = (java.lang.CharSequence) data().deepCopy(fields()[3].schema(), other.httpMethod);
+ fieldSetFlags()[3] = true;
+ }
+ if (isValidValue(fields()[4], other.httpStatusCode)) {
+ this.httpStatusCode = (java.lang.Integer) data().deepCopy(fields()[4].schema(), other.httpStatusCode);
+ fieldSetFlags()[4] = true;
+ }
+ if (isValidValue(fields()[5], other.responseSize)) {
+ this.responseSize = (java.lang.Integer) data().deepCopy(fields()[5].schema(), other.responseSize);
+ fieldSetFlags()[5] = true;
+ }
+ if (isValidValue(fields()[6], other.referrer)) {
+ this.referrer = (java.lang.CharSequence) data().deepCopy(fields()[6].schema(), other.referrer);
+ fieldSetFlags()[6] = true;
+ }
+ if (isValidValue(fields()[7], other.userAgent)) {
+ this.userAgent = (java.lang.CharSequence) data().deepCopy(fields()[7].schema(), other.userAgent);
+ fieldSetFlags()[7] = true;
+ }
+ }
+
+ /** Gets the value of the 'url' field */
+ public java.lang.CharSequence getUrl() {
+ return url;
+ }
+
+ /** Sets the value of the 'url' field */
+ public org.apache.gora.jet.generated.Pageview.Builder setUrl(java.lang.CharSequence value) {
+ validate(fields()[0], value);
+ this.url = value;
+ fieldSetFlags()[0] = true;
+ return this;
+ }
+
+ /** Checks whether the 'url' field has been set */
+ public boolean hasUrl() {
+ return fieldSetFlags()[0];
+ }
+
+ /** Clears the value of the 'url' field */
+ public org.apache.gora.jet.generated.Pageview.Builder clearUrl() {
+ url = null;
+ fieldSetFlags()[0] = false;
+ return this;
+ }
+
+ /** Gets the value of the 'timestamp' field */
+ public java.lang.Long getTimestamp() {
+ return timestamp;
+ }
+
+ /** Sets the value of the 'timestamp' field */
+ public org.apache.gora.jet.generated.Pageview.Builder setTimestamp(long value) {
+ validate(fields()[1], value);
+ this.timestamp = value;
+ fieldSetFlags()[1] = true;
+ return this;
+ }
+
+ /** Checks whether the 'timestamp' field has been set */
+ public boolean hasTimestamp() {
+ return fieldSetFlags()[1];
+ }
+
+ /** Clears the value of the 'timestamp' field */
+ public org.apache.gora.jet.generated.Pageview.Builder clearTimestamp() {
+ fieldSetFlags()[1] = false;
+ return this;
+ }
+
+ /** Gets the value of the 'ip' field */
+ public java.lang.CharSequence getIp() {
+ return ip;
+ }
+
+ /** Sets the value of the 'ip' field */
+ public org.apache.gora.jet.generated.Pageview.Builder setIp(java.lang.CharSequence value) {
+ validate(fields()[2], value);
+ this.ip = value;
+ fieldSetFlags()[2] = true;
+ return this;
+ }
+
+ /** Checks whether the 'ip' field has been set */
+ public boolean hasIp() {
+ return fieldSetFlags()[2];
+ }
+
+ /** Clears the value of the 'ip' field */
+ public org.apache.gora.jet.generated.Pageview.Builder clearIp() {
+ ip = null;
+ fieldSetFlags()[2] = false;
+ return this;
+ }
+
+ /** Gets the value of the 'httpMethod' field */
+ public java.lang.CharSequence getHttpMethod() {
+ return httpMethod;
+ }
+
+ /** Sets the value of the 'httpMethod' field */
+ public org.apache.gora.jet.generated.Pageview.Builder setHttpMethod(java.lang.CharSequence value) {
+ validate(fields()[3], value);
+ this.httpMethod = value;
+ fieldSetFlags()[3] = true;
+ return this;
+ }
+
+ /** Checks whether the 'httpMethod' field has been set */
+ public boolean hasHttpMethod() {
+ return fieldSetFlags()[3];
+ }
+
+ /** Clears the value of the 'httpMethod' field */
+ public org.apache.gora.jet.generated.Pageview.Builder clearHttpMethod() {
+ httpMethod = null;
+ fieldSetFlags()[3] = false;
+ return this;
+ }
+
+ /** Gets the value of the 'httpStatusCode' field */
+ public java.lang.Integer getHttpStatusCode() {
+ return httpStatusCode;
+ }
+
+ /** Sets the value of the 'httpStatusCode' field */
+ public org.apache.gora.jet.generated.Pageview.Builder setHttpStatusCode(int value) {
+ validate(fields()[4], value);
+ this.httpStatusCode = value;
+ fieldSetFlags()[4] = true;
+ return this;
+ }
+
+ /** Checks whether the 'httpStatusCode' field has been set */
+ public boolean hasHttpStatusCode() {
+ return fieldSetFlags()[4];
+ }
+
+ /** Clears the value of the 'httpStatusCode' field */
+ public org.apache.gora.jet.generated.Pageview.Builder clearHttpStatusCode() {
+ fieldSetFlags()[4] = false;
+ return this;
+ }
+
+ /** Gets the value of the 'responseSize' field */
+ public java.lang.Integer getResponseSize() {
+ return responseSize;
+ }
+
+ /** Sets the value of the 'responseSize' field */
+ public org.apache.gora.jet.generated.Pageview.Builder setResponseSize(int value) {
+ validate(fields()[5], value);
+ this.responseSize = value;
+ fieldSetFlags()[5] = true;
+ return this;
+ }
+
+ /** Checks whether the 'responseSize' field has been set */
+ public boolean hasResponseSize() {
+ return fieldSetFlags()[5];
+ }
+
+ /** Clears the value of the 'responseSize' field */
+ public org.apache.gora.jet.generated.Pageview.Builder clearResponseSize() {
+ fieldSetFlags()[5] = false;
+ return this;
+ }
+
+ /** Gets the value of the 'referrer' field */
+ public java.lang.CharSequence getReferrer() {
+ return referrer;
+ }
+
+ /** Sets the value of the 'referrer' field */
+ public org.apache.gora.jet.generated.Pageview.Builder setReferrer(java.lang.CharSequence value) {
+ validate(fields()[6], value);
+ this.referrer = value;
+ fieldSetFlags()[6] = true;
+ return this;
+ }
+
+ /** Checks whether the 'referrer' field has been set */
+ public boolean hasReferrer() {
+ return fieldSetFlags()[6];
+ }
+
+ /** Clears the value of the 'referrer' field */
+ public org.apache.gora.jet.generated.Pageview.Builder clearReferrer() {
+ referrer = null;
+ fieldSetFlags()[6] = false;
+ return this;
+ }
+
+ /** Gets the value of the 'userAgent' field */
+ public java.lang.CharSequence getUserAgent() {
+ return userAgent;
+ }
+
+ /** Sets the value of the 'userAgent' field */
+ public org.apache.gora.jet.generated.Pageview.Builder setUserAgent(java.lang.CharSequence value) {
+ validate(fields()[7], value);
+ this.userAgent = value;
+ fieldSetFlags()[7] = true;
+ return this;
+ }
+
+ /** Checks whether the 'userAgent' field has been set */
+ public boolean hasUserAgent() {
+ return fieldSetFlags()[7];
+ }
+
+ /** Clears the value of the 'userAgent' field */
+ public org.apache.gora.jet.generated.Pageview.Builder clearUserAgent() {
+ userAgent = null;
+ fieldSetFlags()[7] = false;
+ return this;
+ }
+
+ @Override
+ public Pageview build() {
+ try {
+ Pageview record = new Pageview();
+ record.url = fieldSetFlags()[0] ? this.url : (java.lang.CharSequence) defaultValue(fields()[0]);
+ record.timestamp = fieldSetFlags()[1] ? this.timestamp : (java.lang.Long) defaultValue(fields()[1]);
+ record.ip = fieldSetFlags()[2] ? this.ip : (java.lang.CharSequence) defaultValue(fields()[2]);
+ record.httpMethod = fieldSetFlags()[3] ? this.httpMethod : (java.lang.CharSequence) defaultValue(fields()[3]);
+ record.httpStatusCode = fieldSetFlags()[4] ? this.httpStatusCode : (java.lang.Integer) defaultValue(fields()[4]);
+ record.responseSize = fieldSetFlags()[5] ? this.responseSize : (java.lang.Integer) defaultValue(fields()[5]);
+ record.referrer = fieldSetFlags()[6] ? this.referrer : (java.lang.CharSequence) defaultValue(fields()[6]);
+ record.userAgent = fieldSetFlags()[7] ? this.userAgent : (java.lang.CharSequence) defaultValue(fields()[7]);
+ return record;
+ } catch (Exception e) {
+ throw new org.apache.avro.AvroRuntimeException(e);
+ }
+ }
+ }
+
+ public Pageview.Tombstone getTombstone(){
+ return TOMBSTONE;
+ }
+
+ public Pageview newInstance(){
+ return newBuilder().build();
+ }
+
+ private static final Tombstone TOMBSTONE = new Tombstone();
+
+ public static final class Tombstone extends Pageview implements org.apache.gora.persistency.Tombstone {
+
+ private Tombstone() { }
+
+ /**
+ * Gets the value of the 'url' field.
+ */
+ public java.lang.CharSequence getUrl() {
+ throw new java.lang.UnsupportedOperationException("Get is not supported on tombstones");
+ }
+
+ /**
+ * Sets the value of the 'url' field.
+ * @param value the value to set.
+ */
+ public void setUrl(java.lang.CharSequence value) {
+ throw new java.lang.UnsupportedOperationException("Set is not supported on tombstones");
+ }
+
+ /**
+ * Checks the dirty status of the 'url' field. A field is dirty if it represents a change that has not yet been written to the database.
+ * @param value the value to set.
+ */
+ public boolean isUrlDirty() {
+ throw new java.lang.UnsupportedOperationException("IsDirty is not supported on tombstones");
+ }
+
+ /**
+ * Gets the value of the 'timestamp' field.
+ */
+ public java.lang.Long getTimestamp() {
+ throw new java.lang.UnsupportedOperationException("Get is not supported on tombstones");
+ }
+
+ /**
+ * Sets the value of the 'timestamp' field.
+ * @param value the value to set.
+ */
+ public void setTimestamp(java.lang.Long value) {
+ throw new java.lang.UnsupportedOperationException("Set is not supported on tombstones");
+ }
+
+ /**
+ * Checks the dirty status of the 'timestamp' field. A field is dirty if it represents a change that has not yet been written to the database.
+ * @param value the value to set.
+ */
+ public boolean isTimestampDirty() {
+ throw new java.lang.UnsupportedOperationException("IsDirty is not supported on tombstones");
+ }
+
+ /**
+ * Gets the value of the 'ip' field.
+ */
+ public java.lang.CharSequence getIp() {
+ throw new java.lang.UnsupportedOperationException("Get is not supported on tombstones");
+ }
+
+ /**
+ * Sets the value of the 'ip' field.
+ * @param value the value to set.
+ */
+ public void setIp(java.lang.CharSequence value) {
+ throw new java.lang.UnsupportedOperationException("Set is not supported on tombstones");
+ }
+
+ /**
+ * Checks the dirty status of the 'ip' field. A field is dirty if it represents a change that has not yet been written to the database.
+ * @param value the value to set.
+ */
+ public boolean isIpDirty() {
+ throw new java.lang.UnsupportedOperationException("IsDirty is not supported on tombstones");
+ }
+
+ /**
+ * Gets the value of the 'httpMethod' field.
+ */
+ public java.lang.CharSequence getHttpMethod() {
+ throw new java.lang.UnsupportedOperationException("Get is not supported on tombstones");
+ }
+
+ /**
+ * Sets the value of the 'httpMethod' field.
+ * @param value the value to set.
+ */
+ public void setHttpMethod(java.lang.CharSequence value) {
+ throw new java.lang.UnsupportedOperationException("Set is not supported on tombstones");
+ }
+
+ /**
+ * Checks the dirty status of the 'httpMethod' field. A field is dirty if it represents a change that has not yet been written to the database.
+ * @param value the value to set.
+ */
+ public boolean isHttpMethodDirty() {
+ throw new java.lang.UnsupportedOperationException("IsDirty is not supported on tombstones");
+ }
+
+ /**
+ * Gets the value of the 'httpStatusCode' field.
+ */
+ public java.lang.Integer getHttpStatusCode() {
+ throw new java.lang.UnsupportedOperationException("Get is not supported on tombstones");
+ }
+
+ /**
+ * Sets the value of the 'httpStatusCode' field.
+ * @param value the value to set.
+ */
+ public void setHttpStatusCode(java.lang.Integer value) {
+ throw new java.lang.UnsupportedOperationException("Set is not supported on tombstones");
+ }
+
+ /**
+ * Checks the dirty status of the 'httpStatusCode' field. A field is dirty if it represents a change that has not yet been written to the database.
+ * @param value the value to set.
+ */
+ public boolean isHttpStatusCodeDirty() {
+ throw new java.lang.UnsupportedOperationException("IsDirty is not supported on tombstones");
+ }
+
+ /**
+ * Gets the value of the 'responseSize' field.
+ */
+ public java.lang.Integer getResponseSize() {
+ throw new java.lang.UnsupportedOperationException("Get is not supported on tombstones");
+ }
+
+ /**
+ * Sets the value of the 'responseSize' field.
+ * @param value the value to set.
+ */
+ public void setResponseSize(java.lang.Integer value) {
+ throw new java.lang.UnsupportedOperationException("Set is not supported on tombstones");
+ }
+
+ /**
+ * Checks the dirty status of the 'responseSize' field. A field is dirty if it represents a change that has not yet been written to the database.
+ * @param value the value to set.
+ */
+ public boolean isResponseSizeDirty() {
+ throw new java.lang.UnsupportedOperationException("IsDirty is not supported on tombstones");
+ }
+
+ /**
+ * Gets the value of the 'referrer' field.
+ */
+ public java.lang.CharSequence getReferrer() {
+ throw new java.lang.UnsupportedOperationException("Get is not supported on tombstones");
+ }
+
+ /**
+ * Sets the value of the 'referrer' field.
+ * @param value the value to set.
+ */
+ public void setReferrer(java.lang.CharSequence value) {
+ throw new java.lang.UnsupportedOperationException("Set is not supported on tombstones");
+ }
+
+ /**
+ * Checks the dirty status of the 'referrer' field. A field is dirty if it represents a change that has not yet been written to the database.
+ * @param value the value to set.
+ */
+ public boolean isReferrerDirty() {
+ throw new java.lang.UnsupportedOperationException("IsDirty is not supported on tombstones");
+ }
+
+ /**
+ * Gets the value of the 'userAgent' field.
+ */
+ public java.lang.CharSequence getUserAgent() {
+ throw new java.lang.UnsupportedOperationException("Get is not supported on tombstones");
+ }
+
+ /**
+ * Sets the value of the 'userAgent' field.
+ * @param value the value to set.
+ */
+ public void setUserAgent(java.lang.CharSequence value) {
+ throw new java.lang.UnsupportedOperationException("Set is not supported on tombstones");
+ }
+
+ /**
+ * Checks the dirty status of the 'userAgent' field. A field is dirty if it represents a change that has not yet been written to the database.
+ * @param value the value to set.
+ */
+ public boolean isUserAgentDirty() {
+ throw new java.lang.UnsupportedOperationException("IsDirty is not supported on tombstones");
+ }
+
+
+ }
+
+ private static final org.apache.avro.io.DatumWriter
+ DATUM_WRITER$ = new org.apache.avro.specific.SpecificDatumWriter(SCHEMA$);
+ private static final org.apache.avro.io.DatumReader
+ DATUM_READER$ = new org.apache.avro.specific.SpecificDatumReader(SCHEMA$);
+
+ /**
+ * Writes AVRO data bean to output stream in the form of AVRO Binary encoding format. This will transform
+ * AVRO data bean from its Java object form to it s serializable form.
+ *
+ * @param out java.io.ObjectOutput output stream to write data bean in serializable form
+ */
+ @Override
+ public void writeExternal(java.io.ObjectOutput out)
+ throws java.io.IOException {
+ out.write(super.getDirtyBytes().array());
+ DATUM_WRITER$.write(this, org.apache.avro.io.EncoderFactory.get()
+ .directBinaryEncoder((java.io.OutputStream) out,
+ null));
+ }
+
+ /**
+ * Reads AVRO data bean from input stream in it s AVRO Binary encoding format to Java object format.
+ * This will transform AVRO data bean from it s serializable form to deserialized Java object form.
+ *
+ * @param in java.io.ObjectOutput input stream to read data bean in serializable form
+ */
+ @Override
+ public void readExternal(java.io.ObjectInput in)
+ throws java.io.IOException {
+ byte[] __g__dirty = new byte[getFieldsCount()];
+ in.read(__g__dirty);
+ super.setDirtyBytes(java.nio.ByteBuffer.wrap(__g__dirty));
+ DATUM_READER$.read(this, org.apache.avro.io.DecoderFactory.get()
+ .directBinaryDecoder((java.io.InputStream) in,
+ null));
+ }
+
+}
+
diff --git a/gora-jet/src/test/java/org/apache/gora/jet/generated/ResultPageView.java b/gora-jet/src/test/java/org/apache/gora/jet/generated/ResultPageView.java
new file mode 100644
index 0000000..c1f43fb
--- /dev/null
+++ b/gora-jet/src/test/java/org/apache/gora/jet/generated/ResultPageView.java
@@ -0,0 +1,468 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements. See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership. The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the"
+ *License"); you may not use this file except in compliance
+ *with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing, software
+ *distributed under the License is distributed on an "AS IS" BASIS,
+ *WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *See the License for the specific language governing permissions and
+ *limitations under the License.
+ */
+package org.apache.gora.jet.generated;
+
+public class ResultPageView extends org.apache.gora.persistency.impl.PersistentBase implements org.apache.avro.specific.SpecificRecord, org.apache.gora.persistency.Persistent {
+ public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"ResultPageView\",\"namespace\":\"org.apache.gora.jet.generated\",\"fields\":[{\"name\":\"url\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"timestamp\",\"type\":\"long\",\"default\":0},{\"name\":\"ip\",\"type\":[\"null\",\"string\"],\"default\":null}],\"default\":null}");
+ private static final long serialVersionUID = -7453871077431322534L;
+ /** Enum containing all data bean's fields. */
+ public static enum Field {
+ URL(0, "url"),
+ TIMESTAMP(1, "timestamp"),
+ IP(2, "ip"),
+ ;
+ /**
+ * Field's index.
+ */
+ private int index;
+
+ /**
+ * Field's name.
+ */
+ private String name;
+
+ /**
+ * Field's constructor
+ * @param index field's index.
+ * @param name field's name.
+ */
+ Field(int index, String name) {this.index=index;this.name=name;}
+
+ /**
+ * Gets field's index.
+ * @return int field's index.
+ */
+ public int getIndex() {return index;}
+
+ /**
+ * Gets field's name.
+ * @return String field's name.
+ */
+ public String getName() {return name;}
+
+ /**
+ * Gets field's attributes to string.
+ * @return String field's attributes to string.
+ */
+ public String toString() {return name;}
+ };
+
+ public static final String[] _ALL_FIELDS = {
+ "url",
+ "timestamp",
+ "ip",
+ };
+
+ /**
+ * Gets the total field count.
+ * @return int field count
+ */
+ public int getFieldsCount() {
+ return ResultPageView._ALL_FIELDS.length;
+ }
+
+ private java.lang.CharSequence url;
+ private long timestamp;
+ private java.lang.CharSequence ip;
+ public org.apache.avro.Schema getSchema() { return SCHEMA$; }
+ // Used by DatumWriter. Applications should not call.
+ public java.lang.Object get(int field$) {
+ switch (field$) {
+ case 0: return url;
+ case 1: return timestamp;
+ case 2: return ip;
+ default: throw new org.apache.avro.AvroRuntimeException("Bad index");
+ }
+ }
+
+ // Used by DatumReader. Applications should not call.
+ @SuppressWarnings(value="unchecked")
+ public void put(int field$, java.lang.Object value) {
+ switch (field$) {
+ case 0: url = (java.lang.CharSequence)(value); break;
+ case 1: timestamp = (java.lang.Long)(value); break;
+ case 2: ip = (java.lang.CharSequence)(value); break;
+ default: throw new org.apache.avro.AvroRuntimeException("Bad index");
+ }
+ }
+
+ /**
+ * Gets the value of the 'url' field.
+ */
+ public java.lang.CharSequence getUrl() {
+ return url;
+ }
+
+ /**
+ * Sets the value of the 'url' field.
+ * @param value the value to set.
+ */
+ public void setUrl(java.lang.CharSequence value) {
+ this.url = value;
+ setDirty(0);
+ }
+
+ /**
+ * Checks the dirty status of the 'url' field. A field is dirty if it represents a change that has not yet been written to the database.
+ * @param value the value to set.
+ */
+ public boolean isUrlDirty() {
+ return isDirty(0);
+ }
+
+ /**
+ * Gets the value of the 'timestamp' field.
+ */
+ public java.lang.Long getTimestamp() {
+ return timestamp;
+ }
+
+ /**
+ * Sets the value of the 'timestamp' field.
+ * @param value the value to set.
+ */
+ public void setTimestamp(java.lang.Long value) {
+ this.timestamp = value;
+ setDirty(1);
+ }
+
+ /**
+ * Checks the dirty status of the 'timestamp' field. A field is dirty if it represents a change that has not yet been written to the database.
+ * @param value the value to set.
+ */
+ public boolean isTimestampDirty() {
+ return isDirty(1);
+ }
+
+ /**
+ * Gets the value of the 'ip' field.
+ */
+ public java.lang.CharSequence getIp() {
+ return ip;
+ }
+
+ /**
+ * Sets the value of the 'ip' field.
+ * @param value the value to set.
+ */
+ public void setIp(java.lang.CharSequence value) {
+ this.ip = value;
+ setDirty(2);
+ }
+
+ /**
+ * Checks the dirty status of the 'ip' field. A field is dirty if it represents a change that has not yet been written to the database.
+ * @param value the value to set.
+ */
+ public boolean isIpDirty() {
+ return isDirty(2);
+ }
+
+ /** Creates a new ResultPageView RecordBuilder */
+ public static org.apache.gora.jet.generated.ResultPageView.Builder newBuilder() {
+ return new org.apache.gora.jet.generated.ResultPageView.Builder();
+ }
+
+ /** Creates a new ResultPageView RecordBuilder by copying an existing Builder */
+ public static org.apache.gora.jet.generated.ResultPageView.Builder newBuilder(org.apache.gora.jet.generated.ResultPageView.Builder other) {
+ return new org.apache.gora.jet.generated.ResultPageView.Builder(other);
+ }
+
+ /** Creates a new ResultPageView RecordBuilder by copying an existing ResultPageView instance */
+ public static org.apache.gora.jet.generated.ResultPageView.Builder newBuilder(org.apache.gora.jet.generated.ResultPageView other) {
+ return new org.apache.gora.jet.generated.ResultPageView.Builder(other);
+ }
+
+ @Override
+ public org.apache.gora.jet.generated.ResultPageView clone() {
+ return newBuilder(this).build();
+ }
+
+ private static java.nio.ByteBuffer deepCopyToReadOnlyBuffer(
+ java.nio.ByteBuffer input) {
+ java.nio.ByteBuffer copy = java.nio.ByteBuffer.allocate(input.capacity());
+ int position = input.position();
+ input.reset();
+ int mark = input.position();
+ int limit = input.limit();
+ input.rewind();
+ input.limit(input.capacity());
+ copy.put(input);
+ input.rewind();
+ copy.rewind();
+ input.position(mark);
+ input.mark();
+ copy.position(mark);
+ copy.mark();
+ input.position(position);
+ copy.position(position);
+ input.limit(limit);
+ copy.limit(limit);
+ return copy.asReadOnlyBuffer();
+ }
+
+ /**
+ * RecordBuilder for ResultPageView instances.
+ */
+ public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase<ResultPageView>
+ implements org.apache.avro.data.RecordBuilder<ResultPageView> {
+
+ private java.lang.CharSequence url;
+ private long timestamp;
+ private java.lang.CharSequence ip;
+
+ /** Creates a new Builder */
+ private Builder() {
+ super(org.apache.gora.jet.generated.ResultPageView.SCHEMA$);
+ }
+
+ /** Creates a Builder by copying an existing Builder */
+ private Builder(org.apache.gora.jet.generated.ResultPageView.Builder other) {
+ super(other);
+ }
+
+ /** Creates a Builder by copying an existing ResultPageView instance */
+ private Builder(org.apache.gora.jet.generated.ResultPageView other) {
+ super(org.apache.gora.jet.generated.ResultPageView.SCHEMA$);
+ if (isValidValue(fields()[0], other.url)) {
+ this.url = (java.lang.CharSequence) data().deepCopy(fields()[0].schema(), other.url);
+ fieldSetFlags()[0] = true;
+ }
+ if (isValidValue(fields()[1], other.timestamp)) {
+ this.timestamp = (java.lang.Long) data().deepCopy(fields()[1].schema(), other.timestamp);
+ fieldSetFlags()[1] = true;
+ }
+ if (isValidValue(fields()[2], other.ip)) {
+ this.ip = (java.lang.CharSequence) data().deepCopy(fields()[2].schema(), other.ip);
+ fieldSetFlags()[2] = true;
+ }
+ }
+
+ /** Gets the value of the 'url' field */
+ public java.lang.CharSequence getUrl() {
+ return url;
+ }
+
+ /** Sets the value of the 'url' field */
+ public org.apache.gora.jet.generated.ResultPageView.Builder setUrl(java.lang.CharSequence value) {
+ validate(fields()[0], value);
+ this.url = value;
+ fieldSetFlags()[0] = true;
+ return this;
+ }
+
+ /** Checks whether the 'url' field has been set */
+ public boolean hasUrl() {
+ return fieldSetFlags()[0];
+ }
+
+ /** Clears the value of the 'url' field */
+ public org.apache.gora.jet.generated.ResultPageView.Builder clearUrl() {
+ url = null;
+ fieldSetFlags()[0] = false;
+ return this;
+ }
+
+ /** Gets the value of the 'timestamp' field */
+ public java.lang.Long getTimestamp() {
+ return timestamp;
+ }
+
+ /** Sets the value of the 'timestamp' field */
+ public org.apache.gora.jet.generated.ResultPageView.Builder setTimestamp(long value) {
+ validate(fields()[1], value);
+ this.timestamp = value;
+ fieldSetFlags()[1] = true;
+ return this;
+ }
+
+ /** Checks whether the 'timestamp' field has been set */
+ public boolean hasTimestamp() {
+ return fieldSetFlags()[1];
+ }
+
+ /** Clears the value of the 'timestamp' field */
+ public org.apache.gora.jet.generated.ResultPageView.Builder clearTimestamp() {
+ fieldSetFlags()[1] = false;
+ return this;
+ }
+
+ /** Gets the value of the 'ip' field */
+ public java.lang.CharSequence getIp() {
+ return ip;
+ }
+
+ /** Sets the value of the 'ip' field */
+ public org.apache.gora.jet.generated.ResultPageView.Builder setIp(java.lang.CharSequence value) {
+ validate(fields()[2], value);
+ this.ip = value;
+ fieldSetFlags()[2] = true;
+ return this;
+ }
+
+ /** Checks whether the 'ip' field has been set */
+ public boolean hasIp() {
+ return fieldSetFlags()[2];
+ }
+
+ /** Clears the value of the 'ip' field */
+ public org.apache.gora.jet.generated.ResultPageView.Builder clearIp() {
+ ip = null;
+ fieldSetFlags()[2] = false;
+ return this;
+ }
+
+ @Override
+ public ResultPageView build() {
+ try {
+ ResultPageView record = new ResultPageView();
+ record.url = fieldSetFlags()[0] ? this.url : (java.lang.CharSequence) defaultValue(fields()[0]);
+ record.timestamp = fieldSetFlags()[1] ? this.timestamp : (java.lang.Long) defaultValue(fields()[1]);
+ record.ip = fieldSetFlags()[2] ? this.ip : (java.lang.CharSequence) defaultValue(fields()[2]);
+ return record;
+ } catch (Exception e) {
+ throw new org.apache.avro.AvroRuntimeException(e);
+ }
+ }
+ }
+
+ public ResultPageView.Tombstone getTombstone(){
+ return TOMBSTONE;
+ }
+
+ public ResultPageView newInstance(){
+ return newBuilder().build();
+ }
+
+ private static final Tombstone TOMBSTONE = new Tombstone();
+
+ public static final class Tombstone extends ResultPageView implements org.apache.gora.persistency.Tombstone {
+
+ private Tombstone() { }
+
+ /**
+ * Gets the value of the 'url' field.
+ */
+ public java.lang.CharSequence getUrl() {
+ throw new java.lang.UnsupportedOperationException("Get is not supported on tombstones");
+ }
+
+ /**
+ * Sets the value of the 'url' field.
+ * @param value the value to set.
+ */
+ public void setUrl(java.lang.CharSequence value) {
+ throw new java.lang.UnsupportedOperationException("Set is not supported on tombstones");
+ }
+
+ /**
+ * Checks the dirty status of the 'url' field. A field is dirty if it represents a change that has not yet been written to the database.
+ * @param value the value to set.
+ */
+ public boolean isUrlDirty() {
+ throw new java.lang.UnsupportedOperationException("IsDirty is not supported on tombstones");
+ }
+
+ /**
+ * Gets the value of the 'timestamp' field.
+ */
+ public java.lang.Long getTimestamp() {
+ throw new java.lang.UnsupportedOperationException("Get is not supported on tombstones");
+ }
+
+ /**
+ * Sets the value of the 'timestamp' field.
+ * @param value the value to set.
+ */
+ public void setTimestamp(java.lang.Long value) {
+ throw new java.lang.UnsupportedOperationException("Set is not supported on tombstones");
+ }
+
+ /**
+ * Checks the dirty status of the 'timestamp' field. A field is dirty if it represents a change that has not yet been written to the database.
+ * @param value the value to set.
+ */
+ public boolean isTimestampDirty() {
+ throw new java.lang.UnsupportedOperationException("IsDirty is not supported on tombstones");
+ }
+
+ /**
+ * Gets the value of the 'ip' field.
+ */
+ public java.lang.CharSequence getIp() {
+ throw new java.lang.UnsupportedOperationException("Get is not supported on tombstones");
+ }
+
+ /**
+ * Sets the value of the 'ip' field.
+ * @param value the value to set.
+ */
+ public void setIp(java.lang.CharSequence value) {
+ throw new java.lang.UnsupportedOperationException("Set is not supported on tombstones");
+ }
+
+ /**
+ * Checks the dirty status of the 'ip' field. A field is dirty if it represents a change that has not yet been written to the database.
+ * @param value the value to set.
+ */
+ public boolean isIpDirty() {
+ throw new java.lang.UnsupportedOperationException("IsDirty is not supported on tombstones");
+ }
+
+
+ }
+
+ private static final org.apache.avro.io.DatumWriter
+ DATUM_WRITER$ = new org.apache.avro.specific.SpecificDatumWriter(SCHEMA$);
+ private static final org.apache.avro.io.DatumReader
+ DATUM_READER$ = new org.apache.avro.specific.SpecificDatumReader(SCHEMA$);
+
+ /**
+ * Writes AVRO data bean to output stream in the form of AVRO Binary encoding format. This will transform
+ * AVRO data bean from its Java object form to it s serializable form.
+ *
+ * @param out java.io.ObjectOutput output stream to write data bean in serializable form
+ */
+ @Override
+ public void writeExternal(java.io.ObjectOutput out)
+ throws java.io.IOException {
+ out.write(super.getDirtyBytes().array());
+ DATUM_WRITER$.write(this, org.apache.avro.io.EncoderFactory.get()
+ .directBinaryEncoder((java.io.OutputStream) out,
+ null));
+ }
+
+ /**
+ * Reads AVRO data bean from input stream in it s AVRO Binary encoding format to Java object format.
+ * This will transform AVRO data bean from it s serializable form to deserialized Java object form.
+ *
+ * @param in java.io.ObjectOutput input stream to read data bean in serializable form
+ */
+ @Override
+ public void readExternal(java.io.ObjectInput in)
+ throws java.io.IOException {
+ byte[] __g__dirty = new byte[getFieldsCount()];
+ in.read(__g__dirty);
+ super.setDirtyBytes(java.nio.ByteBuffer.wrap(__g__dirty));
+ DATUM_READER$.read(this, org.apache.avro.io.DecoderFactory.get()
+ .directBinaryDecoder((java.io.InputStream) in,
+ null));
+ }
+
+}
+
diff --git a/gora-tutorial/pom.xml b/gora-tutorial/pom.xml
index e89ff95..d0fcf8f 100644
--- a/gora-tutorial/pom.xml
+++ b/gora-tutorial/pom.xml
@@ -105,6 +105,11 @@
<dependency>
<groupId>org.apache.gora</groupId>
+ <artifactId>gora-jet</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.gora</groupId>
<artifactId>gora-jcache</artifactId>
</dependency>
diff --git a/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsJet.java b/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsJet.java
new file mode 100644
index 0000000..e17d490
--- /dev/null
+++ b/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsJet.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.tutorial.log;
+
+import com.hazelcast.jet.Jet;
+import com.hazelcast.jet.JetInstance;
+import com.hazelcast.jet.pipeline.Pipeline;
+import org.apache.gora.jet.JetEngine;
+import org.apache.gora.jet.JetInputOutputFormat;
+import org.apache.gora.query.Query;
+import org.apache.gora.store.DataStore;
+import org.apache.gora.store.DataStoreFactory;
+import org.apache.gora.tutorial.log.generated.MetricDatum;
+import org.apache.gora.tutorial.log.generated.Pageview;
+import org.apache.hadoop.conf.Configuration;
+
+import java.util.Map;
+
+import static com.hazelcast.jet.aggregate.AggregateOperations.counting;
+import static com.hazelcast.jet.aggregate.AggregateOperations.groupingBy;
+
+/**
+ * LogAnalyticsJet is the tutorial class to illustrate Gora's Hazelcast Jet API.
+ * The analytics jet reads the web access data stored earlier by the
+ * {@link LogManager}, and calculates the aggregate daily pageviews. The
+ * output is stored in a Gora compatible data store.
+ *
+ * <p>See the tutorial.html file in docs or go to the
+ * <a href="http://gora.apache.org/current/tutorial.html">
+ * web site</a>for more information.</p>
+ */
+public class LogAnalyticsJet {
+
+ private static DataStore<Long, Pageview> inStore;
+ private static DataStore<String, MetricDatum> outStore;
+
+ /**
+ * The number of miliseconds in a day
+ */
+ private static final long DAY_MILIS = 1000 * 60 * 60 * 24;
+
+ /**
+ * In the main method pageviews are fetched though the jet source connector.
+ * Then those are grouped by url and day. Then a counting aggregator is
+ * applied to calculate the aggregated daily pageviews. Then the result is
+ * output through the jet sink connector to a gora compatible data store.
+ */
+ public static void main(String[] args) throws Exception{
+
+ Configuration conf = new Configuration();
+
+ inStore = DataStoreFactory.getDataStore(Long.class, Pageview.class, conf);
+ outStore = DataStoreFactory.getDataStore(String.class, MetricDatum.class, conf);
+
+ Query<Long, Pageview> query = inStore.newQuery();
+ JetEngine<Long, Pageview, String, MetricDatum> jetEngine = new JetEngine<>();
+
+ Pipeline p = Pipeline.create();
+ p.drawFrom(jetEngine.createDataSource(inStore, query))
+ .groupingKey(e -> e.getValue().getUrl().toString())
+ .aggregate(groupingBy(e -> getDay(e.getValue().getTimestamp()), counting()))
+ .map(e -> {
+ MetricDatum metricDatum = new MetricDatum();
+ String url = e.getKey();
+ for (Map.Entry<Long, Long> item : e.getValue().entrySet()) {
+ long timeStamp = item.getKey();
+ long sum = item.getKey();
+ metricDatum.setTimestamp(timeStamp);
+ metricDatum.setMetric(sum);
+ }
+ metricDatum.setMetricDimension(url);
+ return new JetInputOutputFormat<String, MetricDatum>(url + "_" + "ip", metricDatum);
+ })
+ .peek()
+ .drainTo(jetEngine.createDataSink(outStore));
+
+ JetInstance jet = Jet.newJetInstance();
+ try {
+ jet.newJob(p).join();
+ } finally {
+ Jet.shutdownAll();
+ }
+ }
+
+ /**
+ * Rolls up the given timestamp to the day cardinality, so that
+ * data can be aggregated daily
+ */
+ private static long getDay(long timeStamp) {
+ return (timeStamp / DAY_MILIS) * DAY_MILIS;
+ }
+
+}
diff --git a/pom.xml b/pom.xml
index 6710e88..8f82b20 100755
--- a/pom.xml
+++ b/pom.xml
@@ -801,6 +801,7 @@
<module>gora-kudu</module>
<module>gora-hive</module>
<module>gora-redis</module>
+ <module>gora-jet</module>
<module>gora-tutorial</module>
<module>gora-benchmark</module>
<module>sources-dist</module>
@@ -856,7 +857,8 @@
<!-- JCache Dependencies -->
<jsr107.api.version>1.0.0</jsr107.api.version>
- <hazelcast.version>3.6.4</hazelcast.version>
+ <hazelcast.version>3.12.2</hazelcast.version>
+ <hazelcast.jet.version>3.1</hazelcast.jet.version>
<!-- OrientDB Dependencies -->
<orientdb.version>2.2.22</orientdb.version>
@@ -973,6 +975,12 @@
<dependency>
<groupId>org.apache.gora</groupId>
+ <artifactId>gora-jet</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.gora</groupId>
<artifactId>gora-hbase</artifactId>
<version>${project.version}</version>
</dependency>
@@ -1763,6 +1771,12 @@
</exclusions>
</dependency>
+ <dependency>
+ <groupId>com.hazelcast.jet</groupId>
+ <artifactId>hazelcast-jet</artifactId>
+ <version>${hazelcast.jet.version}</version>
+ </dependency>
+
<!--Aerospike Dependency -->
<dependency>
<groupId>com.aerospike</groupId>