You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2015/02/11 18:57:31 UTC
samza git commit: SAMZA-484;
define serialization for tuples in samza-sql
Repository: samza
Updated Branches:
refs/heads/master efb9795a2 -> eedf2e720
SAMZA-484; define serialization for tuples in samza-sql
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/eedf2e72
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/eedf2e72
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/eedf2e72
Branch: refs/heads/master
Commit: eedf2e7204fc01e32bd21c454ff83a36a23f6105
Parents: efb9795
Author: Navina Ramesh <na...@gmail.com>
Authored: Wed Feb 11 09:56:32 2015 -0800
Committer: Chris Riccomini <cr...@apache.org>
Committed: Wed Feb 11 09:56:32 2015 -0800
----------------------------------------------------------------------
build.gradle | 10 +-
gradle/dependency-versions.gradle | 1 +
samza-sql/README | 1 -
samza-sql/README.md | 1 +
.../org/apache/samza/sql/api/data/Data.java | 54 ++++
.../org/apache/samza/sql/api/data/Schema.java | 55 ++++
.../org/apache/samza/sql/api/data/Tuple.java | 4 +-
.../samza/sql/data/IncomingMessageTuple.java | 9 +-
.../apache/samza/sql/data/avro/AvroData.java | 262 ++++++++++++++++
.../apache/samza/sql/data/avro/AvroSchema.java | 296 +++++++++++++++++++
.../sql/data/serializers/SqlStringSerde.java | 45 +++
.../data/serializers/SqlStringSerdeFactory.java | 33 +++
.../samza/sql/data/string/StringData.java | 101 +++++++
.../samza/sql/data/string/StringSchema.java | 73 +++++
.../sql/operators/partition/PartitionOp.java | 5 +-
15 files changed, 932 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/eedf2e72/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index e6b10fc..b49c313 100644
--- a/build.gradle
+++ b/build.gradle
@@ -78,7 +78,7 @@ rat {
'gradlew',
'gradlew.bat',
'samza-test/state/mystore/**',
- 'README.md',
+ '**/README.md',
'RELEASE.md',
]
}
@@ -249,18 +249,12 @@ project(":samza-yarn_$scalaVersion") {
project(":samza-sql_$scalaVersion") {
apply plugin: 'java'
- configurations {
- // Remove transitive dependencies from Zookeeper that we don't want.
- compile.exclude group: 'javax.jms', module: 'jms'
- compile.exclude group: 'com.sun.jdmk', module: 'jmxtools'
- compile.exclude group: 'com.sun.jmx', module: 'jmxri'
- }
-
dependencies {
compile project(':samza-api')
compile project(":samza-core_$scalaVersion")
compile project(":samza-kv_$scalaVersion")
compile "commons-collections:commons-collections:$commonsCollectionVersion"
+ compile "org.apache.avro:avro:$avroVersion"
testCompile "junit:junit:$junitVersion"
testCompile "org.mockito:mockito-all:$mockitoVersion"
}
http://git-wip-us.apache.org/repos/asf/samza/blob/eedf2e72/gradle/dependency-versions.gradle
----------------------------------------------------------------------
diff --git a/gradle/dependency-versions.gradle b/gradle/dependency-versions.gradle
index 1e76901..2bb24ad 100644
--- a/gradle/dependency-versions.gradle
+++ b/gradle/dependency-versions.gradle
@@ -35,4 +35,5 @@
guavaVersion = "17.0"
commonsCodecVersion = "1.9"
commonsCollectionVersion = "3.2.1"
+ avroVersion = "1.7.7"
}
http://git-wip-us.apache.org/repos/asf/samza/blob/eedf2e72/samza-sql/README
----------------------------------------------------------------------
diff --git a/samza-sql/README b/samza-sql/README
deleted file mode 100644
index 65b7558..0000000
--- a/samza-sql/README
+++ /dev/null
@@ -1 +0,0 @@
-samza-sql is an experimental module that is under development (SAMZA-390).
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/eedf2e72/samza-sql/README.md
----------------------------------------------------------------------
diff --git a/samza-sql/README.md b/samza-sql/README.md
new file mode 100644
index 0000000..598670b
--- /dev/null
+++ b/samza-sql/README.md
@@ -0,0 +1 @@
+samza-sql is an experimental module that is under development (SAMZA-390).
http://git-wip-us.apache.org/repos/asf/samza/blob/eedf2e72/samza-sql/src/main/java/org/apache/samza/sql/api/data/Data.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/api/data/Data.java b/samza-sql/src/main/java/org/apache/samza/sql/api/data/Data.java
new file mode 100644
index 0000000..d1b8409
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/api/data/Data.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.api.data;
+
+import java.util.List;
+import java.util.Map;
+
+
+public interface Data {
+
+ Schema schema();
+
+ Object value();
+
+ int intValue();
+
+ long longValue();
+
+ float floatValue();
+
+ double doubleValue();
+
+ boolean booleanValue();
+
+ String strValue();
+
+ byte[] bytesValue();
+
+ List<Object> arrayValue();
+
+ Map<Object, Object> mapValue();
+
+ Data getElement(int index);
+
+ Data getFieldData(String fldName);
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/eedf2e72/samza-sql/src/main/java/org/apache/samza/sql/api/data/Schema.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/api/data/Schema.java b/samza-sql/src/main/java/org/apache/samza/sql/api/data/Schema.java
new file mode 100644
index 0000000..1e8f192
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/api/data/Schema.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.api.data;
+
+import java.util.Map;
+
+
+public interface Schema {
+
+ enum Type {
+ INTEGER,
+ LONG,
+ FLOAT,
+ DOUBLE,
+ BOOLEAN,
+ STRING,
+ BYTES,
+ STRUCT,
+ ARRAY,
+ MAP
+ };
+
+ Type getType();
+
+ Schema getElementType();
+
+ Schema getValueType();
+
+ Map<String, Schema> getFields();
+
+ Schema getFieldType(String fldName);
+
+ Data read(Object object);
+
+ Data transform(Data inputData);
+
+ boolean equals(Schema other);
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/eedf2e72/samza-sql/src/main/java/org/apache/samza/sql/api/data/Tuple.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/api/data/Tuple.java b/samza-sql/src/main/java/org/apache/samza/sql/api/data/Tuple.java
index 0c21a53..bc8efcf 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/api/data/Tuple.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/api/data/Tuple.java
@@ -32,7 +32,7 @@ public interface Tuple {
*
* @return Message object in the tuple
*/
- Object getMessage();
+ Data getMessage();
/**
* Method to indicate whether the tuple is a delete tuple or an insert tuple
@@ -46,7 +46,7 @@ public interface Tuple {
*
* @return The <code>key</code> of the tuple
*/
- Object getKey();
+ Data getKey();
/**
* Get the stream name of the tuple. Note this stream name should be unique in the system.
http://git-wip-us.apache.org/repos/asf/samza/blob/eedf2e72/samza-sql/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java b/samza-sql/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java
index a8a55e2..f868e5c 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java
@@ -18,6 +18,7 @@
*/
package org.apache.samza.sql.data;
+import org.apache.samza.sql.api.data.Data;
import org.apache.samza.sql.api.data.EntityName;
import org.apache.samza.sql.api.data.Tuple;
import org.apache.samza.system.IncomingMessageEnvelope;
@@ -52,8 +53,8 @@ public class IncomingMessageTuple implements Tuple {
// TODO: the return type should be changed to the generic data type
@Override
- public Object getMessage() {
- return this.imsg.getMessage();
+ public Data getMessage() {
+ return (Data) this.imsg.getMessage();
}
@Override
@@ -62,8 +63,8 @@ public class IncomingMessageTuple implements Tuple {
}
@Override
- public Object getKey() {
- return imsg.getKey();
+ public Data getKey() {
+ return (Data) this.imsg.getKey();
}
@Override
http://git-wip-us.apache.org/repos/asf/samza/blob/eedf2e72/samza-sql/src/main/java/org/apache/samza/sql/data/avro/AvroData.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/data/avro/AvroData.java b/samza-sql/src/main/java/org/apache/samza/sql/data/avro/AvroData.java
new file mode 100644
index 0000000..d040be9
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/data/avro/AvroData.java
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.data.avro;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.avro.generic.GenericArray;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.samza.sql.api.data.Data;
+import org.apache.samza.sql.api.data.Schema;
+
+
+public class AvroData implements Data {
+ protected final Object datum;
+ protected final AvroSchema schema;
+
+ private AvroData(AvroSchema schema, Object datum) {
+ this.datum = datum;
+ this.schema = schema;
+ }
+
+ @Override
+ public Schema schema() {
+ return this.schema;
+ }
+
+ @Override
+ public Object value() {
+ return this.datum;
+ }
+
+ @Override
+ public int intValue() {
+ throw new UnsupportedOperationException("Can't get value for an unknown data type.");
+ }
+
+ @Override
+ public long longValue() {
+ throw new UnsupportedOperationException("Can't get value for an unknown data type.");
+ }
+
+ @Override
+ public float floatValue() {
+ throw new UnsupportedOperationException("Can't get value for an unknown data type.");
+ }
+
+ @Override
+ public double doubleValue() {
+ throw new UnsupportedOperationException("Can't get value for an unknown data type.");
+ }
+
+ @Override
+ public boolean booleanValue() {
+ throw new UnsupportedOperationException("Can't get value for an unknown data type.");
+ }
+
+ @Override
+ public String strValue() {
+ throw new UnsupportedOperationException("Can't get value for an unknown data type.");
+ }
+
+ @Override
+ public byte[] bytesValue() {
+ throw new UnsupportedOperationException("Can't get value for an unknown data type.");
+ }
+
+ @Override
+ public List<Object> arrayValue() {
+ throw new UnsupportedOperationException("Can't get value for an unknown data type.");
+ }
+
+ @Override
+ public Map<Object, Object> mapValue() {
+ throw new UnsupportedOperationException("Can't get value for an unknown data type.");
+ }
+
+ @Override
+ public Data getElement(int index) {
+ throw new UnsupportedOperationException("Can't get value for an unknown data type.");
+ }
+
+ @Override
+ public Data getFieldData(String fldName) {
+ throw new UnsupportedOperationException("Can't get value for an unknown data type.");
+ }
+
+ public static AvroData getArray(AvroSchema schema, Object datum) {
+ if (schema.getType() != Schema.Type.ARRAY) {
+ throw new IllegalArgumentException("Can't create an array object with non-array schema:" + schema.getType());
+ }
+ return new AvroData(schema, datum) {
+ @SuppressWarnings("unchecked")
+ private final GenericArray<Object> array = (GenericArray<Object>) this.datum;
+
+ @Override
+ public List<Object> arrayValue() {
+ return this.array;
+ }
+
+ @Override
+ public Data getElement(int index) {
+ return this.schema.getElementType().read(array.get(index));
+ }
+
+ };
+ }
+
+ public static AvroData getMap(AvroSchema schema, Object datum) {
+ if (schema.getType() != Schema.Type.MAP) {
+ throw new IllegalArgumentException("Can't create a map object with non-map schema:" + schema.getType());
+ }
+ return new AvroData(schema, datum) {
+ @SuppressWarnings("unchecked")
+ private final Map<Object, Object> map = (Map<Object, Object>) datum;
+
+ @Override
+ public Map<Object, Object> mapValue() {
+ return this.map;
+ }
+
+ @Override
+ public Data getFieldData(String fldName) {
+ // TODO Auto-generated method stub
+ return this.schema.getValueType().read(map.get(fldName));
+ }
+
+ };
+ }
+
+ public static AvroData getStruct(AvroSchema schema, Object datum) {
+ if (schema.getType() != Schema.Type.STRUCT) {
+ throw new IllegalArgumentException("Can't create a struct object with non-struct schema:" + schema.getType());
+ }
+ return new AvroData(schema, datum) {
+ private final GenericRecord record = (GenericRecord) datum;
+
+ @Override
+ public Data getFieldData(String fldName) {
+ // TODO Auto-generated method stub
+ return this.schema.getFieldType(fldName).read(record.get(fldName));
+ }
+
+ };
+ }
+
+ public static AvroData getInt(AvroSchema schema, Object datum) {
+ if (schema.getType() != Schema.Type.INTEGER || !(datum instanceof Integer)) {
+ throw new IllegalArgumentException("data object and schema mismatch. schema:" + schema.getType() + ", data: "
+ + datum.getClass().getName());
+ }
+ return new AvroData(schema, datum) {
+ @Override
+ public int intValue() {
+ return ((Integer) datum).intValue();
+ }
+
+ };
+ }
+
+ public static AvroData getLong(AvroSchema schema, Object datum) {
+ if (schema.getType() != Schema.Type.LONG || !(datum instanceof Long)) {
+ throw new IllegalArgumentException("data object and schema mismatch. schema:" + schema.getType() + ", data: "
+ + datum.getClass().getName());
+ }
+ return new AvroData(schema, datum) {
+ @Override
+ public long longValue() {
+ return ((Long) datum).longValue();
+ }
+
+ };
+ }
+
+ public static AvroData getFloat(AvroSchema schema, Object datum) {
+ if (schema.getType() != Schema.Type.FLOAT || !(datum instanceof Float)) {
+ throw new IllegalArgumentException("data object and schema mismatch. schema:" + schema.getType() + ", data: "
+ + datum.getClass().getName());
+ }
+ return new AvroData(schema, datum) {
+ @Override
+ public float floatValue() {
+ return ((Float) datum).floatValue();
+ }
+
+ };
+ }
+
+ public static AvroData getDouble(AvroSchema schema, Object datum) {
+ if (schema.getType() != Schema.Type.DOUBLE || !(datum instanceof Double)) {
+ throw new IllegalArgumentException("data object and schema mismatch. schema:" + schema.getType() + ", data: "
+ + datum.getClass().getName());
+ }
+ return new AvroData(schema, datum) {
+ @Override
+ public double doubleValue() {
+ return ((Double) datum).doubleValue();
+ }
+
+ };
+ }
+
+ public static AvroData getBoolean(AvroSchema schema, Object datum) {
+ if (schema.getType() != Schema.Type.BOOLEAN || !(datum instanceof Boolean)) {
+ throw new IllegalArgumentException("data object and schema mismatch. schema:" + schema.getType() + ", data: "
+ + datum.getClass().getName());
+ }
+ return new AvroData(schema, datum) {
+ @Override
+ public boolean booleanValue() {
+ return ((Boolean) datum).booleanValue();
+ }
+
+ };
+ }
+
+ public static AvroData getString(AvroSchema schema, Object datum) {
+ if (schema.getType() != Schema.Type.STRING || !(datum instanceof CharSequence)) {
+ throw new IllegalArgumentException("data object and schema mismatch. schema:" + schema.getType() + ", data: "
+ + datum.getClass().getName());
+ }
+ return new AvroData(schema, datum) {
+ @Override
+ public String strValue() {
+ return ((CharSequence) datum).toString();
+ }
+
+ };
+ }
+
+ public static AvroData getBytes(AvroSchema schema, Object datum) {
+ if (schema.getType() != Schema.Type.BYTES || !(datum instanceof ByteBuffer)) {
+ throw new IllegalArgumentException("data object and schema mismatch. schema:" + schema.getType() + ", data: "
+ + datum.getClass().getName());
+ }
+ return new AvroData(schema, datum) {
+ @Override
+ public byte[] bytesValue() {
+ return ((ByteBuffer) datum).array();
+ }
+
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/eedf2e72/samza-sql/src/main/java/org/apache/samza/sql/data/avro/AvroSchema.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/data/avro/AvroSchema.java b/samza-sql/src/main/java/org/apache/samza/sql/data/avro/AvroSchema.java
new file mode 100644
index 0000000..577cf74
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/data/avro/AvroSchema.java
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.data.avro;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.avro.Schema.Field;
+import org.apache.samza.sql.api.data.Data;
+import org.apache.samza.sql.api.data.Schema;
+
+
+public class AvroSchema implements Schema {
+
+ protected final org.apache.avro.Schema avroSchema;
+ protected final Schema.Type type;
+
+ private final static Map<org.apache.avro.Schema.Type, AvroSchema> primSchemas =
+ new HashMap<org.apache.avro.Schema.Type, AvroSchema>();
+
+ static {
+ primSchemas.put(org.apache.avro.Schema.Type.INT,
+ new AvroSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.INT)) {
+ @Override
+ public Data read(Object datum) {
+ return AvroData.getInt(this, datum);
+ }
+ });
+ primSchemas.put(org.apache.avro.Schema.Type.LONG,
+ new AvroSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.LONG)) {
+ @Override
+ public Data read(Object datum) {
+ return AvroData.getLong(this, datum);
+ }
+ });
+ primSchemas.put(org.apache.avro.Schema.Type.FLOAT,
+ new AvroSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.FLOAT)) {
+ @Override
+ public Data read(Object datum) {
+ return AvroData.getFloat(this, datum);
+ }
+ });
+ primSchemas.put(org.apache.avro.Schema.Type.DOUBLE,
+ new AvroSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.DOUBLE)) {
+ @Override
+ public Data read(Object datum) {
+ return AvroData.getDouble(this, datum);
+ }
+ });
+ primSchemas.put(org.apache.avro.Schema.Type.BOOLEAN,
+ new AvroSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.BOOLEAN)) {
+ @Override
+ public Data read(Object datum) {
+ return AvroData.getBoolean(this, datum);
+ }
+ });
+ primSchemas.put(org.apache.avro.Schema.Type.STRING,
+ new AvroSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.STRING)) {
+ @Override
+ public Data read(Object datum) {
+ return AvroData.getString(this, datum);
+ }
+ });
+ primSchemas.put(org.apache.avro.Schema.Type.BYTES,
+ new AvroSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.BYTES)) {
+ @Override
+ public Data read(Object datum) {
+ return AvroData.getBytes(this, datum);
+ }
+ });
+ };
+
+ public static AvroSchema getSchema(final org.apache.avro.Schema schema) {
+ Schema.Type type = mapType(schema.getType());
+ if (type != Schema.Type.ARRAY && type != Schema.Type.MAP && type != Schema.Type.STRUCT) {
+ return primSchemas.get(schema.getType());
+ }
+ // otherwise, construct the new schema
+ // TODO: It would be possible to assign each complex schema an ID and cache it w/o repeated create in-memory schema objects
+ switch (type) {
+ case ARRAY:
+ return new AvroSchema(schema) {
+ @Override
+ public Data transform(Data input) {
+ // This would get all the elements until the length of the current schema's array length
+ if (input.schema().getType() != Schema.Type.ARRAY) {
+ throw new IllegalArgumentException("Schema mismatch. Can't transfer data. input schema: "
+ + input.schema().getType());
+ }
+ if (!input.schema().getElementType().equals(this.getElementType())) {
+ throw new IllegalArgumentException("Element schema mismatch. Can't transfer data. input schema: "
+ + input.schema().getElementType().getType());
+ }
+ // input type matches array type
+ return AvroData.getArray(this, input.value());
+ }
+ };
+ case MAP:
+ return new AvroSchema(schema) {
+ @Override
+ public Data transform(Data input) {
+ // This would get all the elements until the length of the current schema's array length
+ if (input.schema().getType() != Schema.Type.MAP) {
+ throw new IllegalArgumentException("Schema mismatch. Can't transfer data. input schema: "
+ + input.schema().getType());
+ }
+ if (!input.schema().getValueType().equals(this.getValueType())) {
+ throw new IllegalArgumentException("Element schema mismatch. Can't transfer data. input schema: "
+ + input.schema().getValueType().getType());
+ }
+ // input type matches map type
+ return AvroData.getMap(this, input.value());
+ }
+ };
+ case STRUCT:
+ return new AvroSchema(schema) {
+ @SuppressWarnings("serial")
+ private final Map<String, Schema> fldSchemas = new HashMap<String, Schema>() {
+ {
+ for (Field field : schema.getFields()) {
+ put(field.name(), getSchema(field.schema()));
+ }
+ }
+ };
+
+ @Override
+ public Map<String, Schema> getFields() {
+ return this.fldSchemas;
+ }
+
+ @Override
+ public Schema getFieldType(String fldName) {
+ return this.fldSchemas.get(fldName);
+ }
+
+ @Override
+ public Data transform(Data input) {
+ // This would get all the elements until the length of the current schema's array length
+ if (input.schema().getType() != Schema.Type.STRUCT) {
+ throw new IllegalArgumentException("Schema mismatch. Can't transfer data. input schema: "
+ + input.schema().getType());
+ }
+ // Note: this particular transform function only implements "projection to a sub-set" concept.
+ // More complex function is needed if some other concepts such as "merge from two sets of data", "allow null if does not exist" are needed
+ for (String fldName : this.fldSchemas.keySet()) {
+ // check each field schema matches input
+ Schema fldSchema = this.fldSchemas.get(fldName);
+ Schema inputFld = input.schema().getFieldType(fldName);
+ if (!fldSchema.equals(inputFld)) {
+ throw new IllegalArgumentException("Field schema mismatch. Can't transfer data for field " + fldName
+ + ". input field schema:" + inputFld.getType() + ", this field schema: " + fldSchema.getType());
+ }
+ }
+ // input type matches struct type
+ return AvroData.getStruct(this, input.value());
+ }
+
+ };
+ default:
+ throw new IllegalArgumentException("Un-recognized complext data type:" + type);
+ }
+ }
+
+ private AvroSchema(org.apache.avro.Schema schema) {
+ this.avroSchema = schema;
+ this.type = mapType(schema.getType());
+ }
+
+ private static Type mapType(org.apache.avro.Schema.Type type) {
+ switch (type) {
+ case ARRAY:
+ return Schema.Type.ARRAY;
+ case RECORD:
+ return Schema.Type.STRUCT;
+ case MAP:
+ return Schema.Type.MAP;
+ case INT:
+ return Schema.Type.INTEGER;
+ case LONG:
+ return Schema.Type.LONG;
+ case BOOLEAN:
+ return Schema.Type.BOOLEAN;
+ case FLOAT:
+ return Schema.Type.FLOAT;
+ case DOUBLE:
+ return Schema.Type.DOUBLE;
+ case STRING:
+ return Schema.Type.STRING;
+ case BYTES:
+ return Schema.Type.BYTES;
+ default:
+ throw new IllegalArgumentException("Avro schema: " + type + " is not supported");
+ }
+ }
+
+ @Override
+ public Type getType() {
+ return this.type;
+ }
+
+ @Override
+ public Schema getElementType() {
+ if (this.type != Schema.Type.ARRAY) {
+ throw new UnsupportedOperationException("Can't getElmentType with non-array schema: " + this.type);
+ }
+ return getSchema(this.avroSchema.getElementType());
+ }
+
+ @Override
+ public Schema getValueType() {
+ if (this.type != Schema.Type.MAP) {
+ throw new UnsupportedOperationException("Can't getValueType with non-map schema: " + this.type);
+ }
+ return getSchema(this.avroSchema.getValueType());
+ }
+
+ @Override
+ public Map<String, Schema> getFields() {
+ throw new UnsupportedOperationException("Can't get field types with unknown schema type:" + this.type);
+ }
+
+ @Override
+ public Schema getFieldType(String fldName) {
+ throw new UnsupportedOperationException("Can't getFieldType with non-map/non-struct schema: " + this.type);
+ }
+
+ @Override
+ public Data read(Object object) {
+ if (this.avroSchema.getType() == org.apache.avro.Schema.Type.ARRAY) {
+ return AvroData.getArray(this, object);
+ } else if (this.avroSchema.getType() == org.apache.avro.Schema.Type.MAP) {
+ return AvroData.getMap(this, object);
+ } else if (this.avroSchema.getType() == org.apache.avro.Schema.Type.RECORD) {
+ return AvroData.getStruct(this, object);
+ }
+ throw new UnsupportedOperationException("Reading unknown complext type:" + this.type + " is not supported");
+ }
+
+ @Override
+ public Data transform(Data inputData) {
+ if (inputData.schema().getType() == Schema.Type.ARRAY || inputData.schema().getType() == Schema.Type.MAP
+ || inputData.schema().getType() == Schema.Type.STRUCT) {
+ throw new IllegalArgumentException("Complex schema should have overriden the default transform() function.");
+ }
+ if (inputData.schema().getType() != this.type) {
+ throw new IllegalArgumentException("Can't transform a mismatched primitive type. this type:" + this.type
+ + ", input type:" + inputData.schema().getType());
+ }
+ return inputData;
+ }
+
+ @Override
+ public boolean equals(Schema other) {
+ // TODO Auto-generated method stub
+ if (this.type != other.getType()) {
+ return false;
+ }
+ switch (this.type) {
+ case ARRAY:
+ // check if element types are the same
+ return this.getElementType().equals(other.getElementType());
+ case MAP:
+ // check if value types are the same
+ return this.getValueType().equals(other.getValueType());
+ case STRUCT:
+ // check if the fields schemas in this equals the other
+ // NOTE: this equals check is in consistent with the "projection to subset" concept implemented in transform()
+ for (String fieldName : this.getFields().keySet()) {
+ if (!this.getFieldType(fieldName).equals(other.getFieldType(fieldName))) {
+ return false;
+ }
+ }
+ return true;
+ default:
+ return true;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/eedf2e72/samza-sql/src/main/java/org/apache/samza/sql/data/serializers/SqlStringSerde.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/data/serializers/SqlStringSerde.java b/samza-sql/src/main/java/org/apache/samza/sql/data/serializers/SqlStringSerde.java
new file mode 100644
index 0000000..1f0c3b2
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/data/serializers/SqlStringSerde.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.data.serializers;
+
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.serializers.StringSerde;
+import org.apache.samza.sql.data.string.StringData;
+
+import java.io.UnsupportedEncodingException;
+
+public class SqlStringSerde implements Serde<StringData> {
+
+ private final Serde<String> serde;
+
+ public SqlStringSerde(String encoding) {
+ this.serde = new StringSerde(encoding);
+ }
+
+ @Override
+ public StringData fromBytes(byte[] bytes) {
+ return new StringData(serde.fromBytes(bytes));
+ }
+
+ @Override
+ public byte[] toBytes(StringData object) {
+ return serde.toBytes(object.strValue());
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/eedf2e72/samza-sql/src/main/java/org/apache/samza/sql/data/serializers/SqlStringSerdeFactory.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/data/serializers/SqlStringSerdeFactory.java b/samza-sql/src/main/java/org/apache/samza/sql/data/serializers/SqlStringSerdeFactory.java
new file mode 100644
index 0000000..2564479
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/data/serializers/SqlStringSerdeFactory.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.data.serializers;
+
+
+import org.apache.samza.config.Config;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.serializers.SerdeFactory;
+import org.apache.samza.sql.data.string.StringData;
+
+public class SqlStringSerdeFactory implements SerdeFactory<StringData> {
+ @Override
+ public Serde<StringData> getSerde(String name, Config config) {
+ return new SqlStringSerde(config.get("encoding", "UTF-8"));
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/eedf2e72/samza-sql/src/main/java/org/apache/samza/sql/data/string/StringData.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/data/string/StringData.java b/samza-sql/src/main/java/org/apache/samza/sql/data/string/StringData.java
new file mode 100644
index 0000000..b81d9fa
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/data/string/StringData.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.data.string;
+
+import org.apache.samza.sql.api.data.Data;
+import org.apache.samza.sql.api.data.Schema;
+
+import java.util.List;
+import java.util.Map;
+
+public class StringData implements Data {
+ private final Object datum;
+ private final Schema schema;
+
+ public StringData(Object datum) {
+ this.datum = datum;
+ this.schema = new StringSchema();
+ }
+
+ @Override
+ public Schema schema() {
+ return this.schema;
+ }
+
+ @Override
+ public Object value() {
+ return this.datum;
+ }
+
+ @Override
+ public int intValue() {
+ throw new UnsupportedOperationException("Can't get int value for a string type data");
+ }
+
+ @Override
+ public long longValue() {
+ throw new UnsupportedOperationException("Can't get long value for a string type data");
+ }
+
+ @Override
+ public float floatValue() {
+ throw new UnsupportedOperationException("Can't get float value for a string type data");
+ }
+
+ @Override
+ public double doubleValue() {
+ throw new UnsupportedOperationException("Can't get double value for a string type data");
+ }
+
+ @Override
+ public boolean booleanValue() {
+ throw new UnsupportedOperationException("Can't get boolean value for a string type data");
+ }
+
+ @Override
+ public String strValue() {
+ return String.valueOf(datum);
+ }
+
+ @Override
+ public byte[] bytesValue() {
+ throw new UnsupportedOperationException("Can't get bytesValue for a string type data");
+ }
+
+ @Override
+ public List<Object> arrayValue() {
+ throw new UnsupportedOperationException("Can't get arrayValue for a string type data");
+ }
+
+ @Override
+ public Map<Object, Object> mapValue() {
+ throw new UnsupportedOperationException("Can't get mapValue for a string type data");
+ }
+
+ @Override
+ public Data getElement(int index) {
+ throw new UnsupportedOperationException("Can't getElement(index) on a string type data");
+ }
+
+ @Override
+ public Data getFieldData(String fldName) {
+ throw new UnsupportedOperationException("Can't getFieldData(fieldName) for a string type data");
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/eedf2e72/samza-sql/src/main/java/org/apache/samza/sql/data/string/StringSchema.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/data/string/StringSchema.java b/samza-sql/src/main/java/org/apache/samza/sql/data/string/StringSchema.java
new file mode 100644
index 0000000..348fc0c
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/data/string/StringSchema.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.data.string;
+
+import org.apache.samza.sql.api.data.Data;
+import org.apache.samza.sql.api.data.Schema;
+
+import java.util.Map;
+
+public class StringSchema implements Schema {
+ private Type type = Type.STRING;
+
+ @Override
+ public Type getType() {
+ return Type.STRING;
+ }
+
+ @Override
+ public Schema getElementType() {
+ throw new UnsupportedOperationException("Can't getElmentType with non-array schema: " + this.type);
+ }
+
+ @Override
+ public Schema getValueType() {
+ throw new UnsupportedOperationException("Can't getValueType with non-map schema: " + this.type);
+ }
+
+ @Override
+ public Map<String, Schema> getFields() {
+ throw new UnsupportedOperationException("Can't get field types with unknown schema type:" + this.type);
+ }
+
+ @Override
+ public Schema getFieldType(String fldName) {
+ throw new UnsupportedOperationException("Can't getFieldType with non-map/non-struct schema: " + this.type);
+ }
+
+ @Override
+ public Data read(Object object) {
+ return new StringData(object);
+ }
+
+ @Override
+ public Data transform(Data inputData) {
+ if (inputData.schema().getType() != this.type) {
+ throw new IllegalArgumentException("Can't transform a mismatched primitive type. this type:" + this.type
+ + ", input type:" + inputData.schema().getType());
+ }
+ return inputData;
+ }
+
+ @Override
+ public boolean equals(Schema other) {
+ return other.getType() == this.type;
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/eedf2e72/samza-sql/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java b/samza-sql/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java
index 7921d4f..986d688 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java
@@ -82,9 +82,8 @@ public final class PartitionOp extends SimpleOperator implements TupleOperator {
@Override
public void process(Tuple tuple, SqlMessageCollector collector) throws Exception {
- collector.send(new OutgoingMessageEnvelope(PartitionOp.this.spec.getSystemStream(), tuple.getKey(),
- null /* TODO: when merge with Schema API changes, use: tuple
- .getMessage().getFieldData(PartitionOp.this.spec.getParKey()) */, tuple.getMessage()));
+ collector.send(new OutgoingMessageEnvelope(PartitionOp.this.spec.getSystemStream(), tuple.getKey().value(),
+ tuple.getMessage().getFieldData(PartitionOp.this.spec.getParKey()).value(), tuple.getMessage().value()));
}
}