You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gora.apache.org by le...@apache.org on 2014/06/04 18:36:29 UTC
[12/50] [abbrv] GORA-321. Merge GORA_94 into Gora trunk
http://git-wip-us.apache.org/repos/asf/gora/blob/136fc595/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/GenericArraySerializer.java
----------------------------------------------------------------------
diff --git a/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/GenericArraySerializer.java b/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/GenericArraySerializer.java
index b03afce..e69de29 100644
--- a/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/GenericArraySerializer.java
+++ b/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/GenericArraySerializer.java
@@ -1,199 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gora.cassandra.serializers;
-
-import java.nio.BufferUnderflowException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import me.prettyprint.cassandra.serializers.AbstractSerializer;
-import me.prettyprint.cassandra.serializers.BytesArraySerializer;
-import me.prettyprint.cassandra.serializers.IntegerSerializer;
-import me.prettyprint.hector.api.Serializer;
-import me.prettyprint.hector.api.ddl.ComparatorType;
-import static me.prettyprint.hector.api.ddl.ComparatorType.UTF8TYPE;
-
-import org.apache.avro.Schema;
-import org.apache.avro.Schema.Type;
-import org.apache.avro.generic.GenericArray;
-import org.apache.avro.specific.SpecificFixed;
-import org.apache.avro.util.Utf8;
-import org.apache.gora.persistency.ListGenericArray;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A GenericArraySerializer translates the byte[] to and from GenericArray of Avro.
- */
-public class GenericArraySerializer<T> extends AbstractSerializer<GenericArray<T>> {
-
- public static final Logger LOG = LoggerFactory.getLogger(GenericArraySerializer.class);
-
- private static Map<Type, GenericArraySerializer> elementTypeToSerializerMap = new HashMap<Type, GenericArraySerializer>();
- private static Map<Class, GenericArraySerializer> fixedClassToSerializerMap = new HashMap<Class, GenericArraySerializer>();
-
- public static GenericArraySerializer get(Type elementType) {
- GenericArraySerializer serializer = elementTypeToSerializerMap.get(elementType);
- if (serializer == null) {
- serializer = new GenericArraySerializer(elementType);
- elementTypeToSerializerMap.put(elementType, serializer);
- }
- return serializer;
- }
-
- public static GenericArraySerializer get(Type elementType, Class clazz) {
- if (elementType != Type.FIXED) {
- return null;
- }
- GenericArraySerializer serializer = elementTypeToSerializerMap.get(clazz);
- if (serializer == null) {
- serializer = new GenericArraySerializer(clazz);
- fixedClassToSerializerMap.put(clazz, serializer);
- }
- return serializer;
- }
-
- public static GenericArraySerializer get(Schema elementSchema) {
- Type type = elementSchema.getType();
- if (type == Type.FIXED) {
- return get(Type.FIXED, TypeUtils.getClass(elementSchema));
- } else {
- return get(type);
- }
- }
-
- private Schema elementSchema = null;
- private Type elementType = null;
- private int size = -1;
- private Class<T> clazz = null;
- private Serializer<T> elementSerializer = null;
-
- public GenericArraySerializer(Serializer<T> elementSerializer) {
- this.elementSerializer = elementSerializer;
- }
-
- public GenericArraySerializer(Schema elementSchema) {
- this.elementSchema = elementSchema;
- elementType = elementSchema.getType();
- size = TypeUtils.getFixedSize(elementSchema);
- elementSerializer = GoraSerializerTypeInferer.getSerializer(elementSchema);
- }
-
- public GenericArraySerializer(Type elementType) {
- this.elementType = elementType;
- if (elementType != Type.FIXED) {
- elementSchema = Schema.create(elementType);
- }
- clazz = TypeUtils.getClass(elementType);
- size = TypeUtils.getFixedSize(elementType);
- elementSerializer = GoraSerializerTypeInferer.getSerializer(elementType);
- }
-
- public GenericArraySerializer(Class<T> clazz) {
- this.clazz = clazz;
- elementType = TypeUtils.getType(clazz);
- size = TypeUtils.getFixedSize(clazz);
- if (elementType == null || elementType == Type.FIXED) {
- elementType = Type.FIXED;
- elementSchema = TypeUtils.getSchema(clazz);
- elementSerializer = GoraSerializerTypeInferer.getSerializer(elementType, clazz);
- } else {
- elementSerializer = GoraSerializerTypeInferer.getSerializer(elementType);
- }
- }
-
- @Override
- public ByteBuffer toByteBuffer(GenericArray<T> array) {
- if (array == null) {
- return null;
- }
- if (size > 0) {
- return toByteBufferWithFixedLengthElements(array);
- } else {
- return toByteBufferWithVariableLengthElements(array);
- }
- }
-
- private ByteBuffer toByteBufferWithFixedLengthElements(GenericArray<T> array) {
- ByteBuffer byteBuffer = ByteBuffer.allocate((int) array.size() * size);
- for (T element : array) {
- byteBuffer.put(elementSerializer.toByteBuffer(element));
- }
- byteBuffer.rewind();
- return byteBuffer;
- }
-
- private ByteBuffer toByteBufferWithVariableLengthElements(GenericArray<T> array) {
- int n = (int) array.size();
- List<byte[]> list = new ArrayList<byte[]>(n);
- n *= 4;
- for (T element : array) {
- byte[] bytes = BytesArraySerializer.get().fromByteBuffer(elementSerializer.toByteBuffer(element));
- list.add(bytes);
- n += bytes.length;
- }
- ByteBuffer byteBuffer = ByteBuffer.allocate(n);
- for (byte[] bytes : list) {
- byteBuffer.put(IntegerSerializer.get().toByteBuffer(bytes.length));
- byteBuffer.put(BytesArraySerializer.get().toByteBuffer(bytes));
- }
- byteBuffer.rewind();
- return byteBuffer;
- }
-
- @Override
- public GenericArray<T> fromByteBuffer(ByteBuffer byteBuffer) {
- if (byteBuffer == null) {
- return null;
- }
- GenericArray<T> array = new ListGenericArray<T>(elementSchema);
-int i = 0;
- while (true) {
- T element = null;
- try {
- if (size > 0) {
- element = elementSerializer.fromByteBuffer(byteBuffer);
- }
- else {
- int n = IntegerSerializer.get().fromByteBuffer(byteBuffer);
- byte[] bytes = new byte[n];
- byteBuffer.get(bytes, 0, n);
- element = elementSerializer.fromByteBuffer( BytesArraySerializer.get().toByteBuffer(bytes) );
- }
- } catch (BufferUnderflowException e) {
- break;
- }
- if (element == null) {
- break;
- }
- array.add(element);
- }
- return array;
- }
-
- @Override
- public ComparatorType getComparatorType() {
- return elementSerializer.getComparatorType();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/gora/blob/136fc595/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/GoraSerializerTypeInferer.java
----------------------------------------------------------------------
diff --git a/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/GoraSerializerTypeInferer.java b/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/GoraSerializerTypeInferer.java
index 15ebf23..c95d51c 100644
--- a/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/GoraSerializerTypeInferer.java
+++ b/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/GoraSerializerTypeInferer.java
@@ -19,16 +19,18 @@
package org.apache.gora.cassandra.serializers;
import java.nio.ByteBuffer;
+import java.util.Map;
-import me.prettyprint.cassandra.serializers.BytesArraySerializer;
-import me.prettyprint.cassandra.serializers.ByteBufferSerializer;
import me.prettyprint.cassandra.serializers.BooleanSerializer;
+import me.prettyprint.cassandra.serializers.ByteBufferSerializer;
+import me.prettyprint.cassandra.serializers.BytesArraySerializer;
import me.prettyprint.cassandra.serializers.DoubleSerializer;
import me.prettyprint.cassandra.serializers.FloatSerializer;
import me.prettyprint.cassandra.serializers.IntegerSerializer;
import me.prettyprint.cassandra.serializers.LongSerializer;
-import me.prettyprint.cassandra.serializers.StringSerializer;
+import me.prettyprint.cassandra.serializers.ObjectSerializer;
import me.prettyprint.cassandra.serializers.SerializerTypeInferer;
+import me.prettyprint.cassandra.serializers.StringSerializer;
import me.prettyprint.hector.api.Serializer;
import org.apache.avro.Schema;
@@ -36,9 +38,7 @@ import org.apache.avro.Schema.Type;
import org.apache.avro.generic.GenericArray;
import org.apache.avro.specific.SpecificFixed;
import org.apache.avro.util.Utf8;
-
-import org.apache.gora.persistency.StatefulHashMap;
-
+import org.apache.gora.persistency.Persistent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -56,7 +56,7 @@ public class GoraSerializerTypeInferer {
if (value == null) {
serializer = ByteBufferSerializer.get();
} else if (value instanceof Utf8) {
- serializer = Utf8Serializer.get();
+ serializer = CharSequenceSerializer.get();
} else if (value instanceof Boolean) {
serializer = BooleanSerializer.get();
} else if (value instanceof ByteBuffer) {
@@ -80,18 +80,21 @@ public class GoraSerializerTypeInferer {
if (schema.getType() == Type.ARRAY) {
schema = schema.getElementType();
}
- serializer = GenericArraySerializer.get(schema);
- } else if (value instanceof StatefulHashMap) {
- StatefulHashMap map = (StatefulHashMap)value;
+ serializer = ListSerializer.get(schema);
+ } else if (value instanceof Map) {
+ Map map = (Map)value;
if (map.size() == 0) {
serializer = ByteBufferSerializer.get();
}
else {
Object value0 = map.values().iterator().next();
Schema schema = TypeUtils.getSchema(value0);
- serializer = StatefulHashMapSerializer.get(schema);
+ serializer = MapSerializer.get(schema);
}
- } else {
+ } else if (value instanceof Persistent){
+ serializer = ObjectSerializer.get();
+ }
+ else {
serializer = SerializerTypeInferer.getSerializer(value);
}
return serializer;
@@ -101,7 +104,7 @@ public class GoraSerializerTypeInferer {
public static <T> Serializer<T> getSerializer(Class<?> valueClass) {
Serializer serializer = null;
if (valueClass.equals(Utf8.class)) {
- serializer = Utf8Serializer.get();
+ serializer = CharSequenceSerializer.get();
} else if (valueClass.equals(Boolean.class) || valueClass.equals(boolean.class)) {
serializer = BooleanSerializer.get();
} else if (valueClass.equals(ByteBuffer.class)) {
@@ -126,30 +129,32 @@ public class GoraSerializerTypeInferer {
public static <T> Serializer<T> getSerializer(Schema schema) {
Serializer serializer = null;
Type type = schema.getType();
- if (type == Type.STRING) {
- serializer = Utf8Serializer.get();
- } else if (type == Type.BOOLEAN) {
+ if (type.equals(Type.STRING)) {
+ serializer = CharSequenceSerializer.get();
+ } else if (type.equals(Type.BOOLEAN)) {
serializer = BooleanSerializer.get();
- } else if (type == Type.BYTES) {
+ } else if (type.equals(Type.BYTES)) {
serializer = ByteBufferSerializer.get();
- } else if (type == Type.DOUBLE) {
+ } else if (type.equals(Type.DOUBLE)) {
serializer = DoubleSerializer.get();
- } else if (type == Type.FLOAT) {
+ } else if (type.equals(Type.FLOAT)) {
serializer = FloatSerializer.get();
- } else if (type == Type.INT) {
+ } else if (type.equals(Type.INT)) {
serializer = IntegerSerializer.get();
- } else if (type == Type.LONG) {
+ } else if (type.equals(Type.LONG)) {
serializer = LongSerializer.get();
- } else if (type == Type.FIXED) {
+ } else if (type.equals(Type.FIXED)) {
Class clazz = TypeUtils.getClass(schema);
serializer = SpecificFixedSerializer.get(clazz);
// serializer = SpecificFixedSerializer.get(schema);
- } else if (type == Type.ARRAY) {
- serializer = GenericArraySerializer.get(schema.getElementType());
- } else if (type == Type.MAP) {
- serializer = StatefulHashMapSerializer.get(schema.getValueType());
- } else if (type == Type.UNION){
+ } else if (type.equals(Type.ARRAY)) {
+ serializer = ListSerializer.get(schema.getElementType());
+ } else if (type.equals(Type.MAP)) {
+ serializer = MapSerializer.get(schema.getValueType());
+ } else if (type.equals(Type.UNION)){
serializer = ByteBufferSerializer.get();
+ } else if (type.equals(Type.RECORD)){
+ serializer = BytesArraySerializer.get();
} else {
serializer = null;
}
@@ -160,7 +165,7 @@ public class GoraSerializerTypeInferer {
public static <T> Serializer<T> getSerializer(Type type) {
Serializer serializer = null;
if (type == Type.STRING) {
- serializer = Utf8Serializer.get();
+ serializer = CharSequenceSerializer.get();
} else if (type == Type.BOOLEAN) {
serializer = BooleanSerializer.get();
} else if (type == Type.BYTES) {
@@ -197,9 +202,9 @@ public class GoraSerializerTypeInferer {
}
if (type == Type.ARRAY) {
- serializer = GenericArraySerializer.get(elementType);
+ serializer = ListSerializer.get(elementType);
} else if (type == Type.MAP) {
- serializer = StatefulHashMapSerializer.get(elementType);
+ serializer = MapSerializer.get(elementType);
} else {
serializer = null;
}
http://git-wip-us.apache.org/repos/asf/gora/blob/136fc595/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/ListSerializer.java
----------------------------------------------------------------------
diff --git a/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/ListSerializer.java b/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/ListSerializer.java
new file mode 100644
index 0000000..5745be9
--- /dev/null
+++ b/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/ListSerializer.java
@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gora.cassandra.serializers;
+
+import java.nio.BufferUnderflowException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import me.prettyprint.cassandra.serializers.AbstractSerializer;
+import me.prettyprint.cassandra.serializers.BytesArraySerializer;
+import me.prettyprint.cassandra.serializers.IntegerSerializer;
+import me.prettyprint.hector.api.Serializer;
+import me.prettyprint.hector.api.ddl.ComparatorType;
+
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A GenericArraySerializer translates the byte[] to and from GenericArray of Avro.
+ */
+public class ListSerializer<T> extends AbstractSerializer<List<T>> {
+
+ public static final Logger LOG = LoggerFactory.getLogger(ListSerializer.class);
+
+ private static Map<Type, ListSerializer> elementTypeToSerializerMap = new HashMap<Type, ListSerializer>();
+ private static Map<Class, ListSerializer> fixedClassToSerializerMap = new HashMap<Class, ListSerializer>();
+
+ public static ListSerializer get(Type elementType) {
+ ListSerializer serializer = elementTypeToSerializerMap.get(elementType);
+ if (serializer == null) {
+ serializer = new ListSerializer(elementType);
+ elementTypeToSerializerMap.put(elementType, serializer);
+ }
+ return serializer;
+ }
+
+ public static ListSerializer get(Type elementType, Class clazz) {
+ if (elementType != Type.FIXED) {
+ return null;
+ }
+ ListSerializer serializer = elementTypeToSerializerMap.get(clazz);
+ if (serializer == null) {
+ serializer = new ListSerializer(clazz);
+ fixedClassToSerializerMap.put(clazz, serializer);
+ }
+ return serializer;
+ }
+
+ public static ListSerializer get(Schema elementSchema) {
+ Type type = elementSchema.getType();
+ if (type == Type.FIXED) {
+ return get(Type.FIXED, TypeUtils.getClass(elementSchema));
+ } else {
+ return get(type);
+ }
+ }
+
+ private Schema elementSchema = null;
+ private Type elementType = null;
+ private int size = -1;
+ private Class<T> clazz = null;
+ private Serializer<T> elementSerializer = null;
+
+ public ListSerializer(Serializer<T> elementSerializer) {
+ this.elementSerializer = elementSerializer;
+ }
+
+ public ListSerializer(Schema elementSchema) {
+ this.elementSchema = elementSchema;
+ elementType = elementSchema.getType();
+ size = TypeUtils.getFixedSize(elementSchema);
+ elementSerializer = GoraSerializerTypeInferer.getSerializer(elementSchema);
+ }
+
+ @SuppressWarnings("unchecked")
+ public ListSerializer(Type elementType) {
+ this.elementType = elementType;
+ if (elementType != Type.FIXED) {
+ elementSchema = Schema.create(elementType);
+ }
+ clazz = (Class<T>) TypeUtils.getClass(elementType);
+ size = TypeUtils.getFixedSize(elementType);
+ elementSerializer = GoraSerializerTypeInferer.getSerializer(elementType);
+ }
+
+ public ListSerializer(Class<T> clazz) {
+ this.clazz = clazz;
+ elementType = TypeUtils.getType(clazz);
+ size = TypeUtils.getFixedSize(clazz);
+ if (elementType == null || elementType == Type.FIXED) {
+ elementType = Type.FIXED;
+ elementSchema = TypeUtils.getSchema(clazz);
+ elementSerializer = GoraSerializerTypeInferer.getSerializer(elementType, clazz);
+ } else {
+ elementSerializer = GoraSerializerTypeInferer.getSerializer(elementType);
+ }
+ }
+
+ @Override
+ public ByteBuffer toByteBuffer(List<T> array) {
+ if (array == null) {
+ return null;
+ }
+ if (size > 0) {
+ return toByteBufferWithFixedLengthElements(array);
+ } else {
+ return toByteBufferWithVariableLengthElements(array);
+ }
+ }
+
+ private ByteBuffer toByteBufferWithFixedLengthElements(List<T> array) {
+ ByteBuffer byteBuffer = ByteBuffer.allocate((int) array.size() * size);
+ for (T element : array) {
+ byteBuffer.put(elementSerializer.toByteBuffer(element));
+ }
+ byteBuffer.rewind();
+ return byteBuffer;
+ }
+
+ private ByteBuffer toByteBufferWithVariableLengthElements(List<T> array) {
+ int n = (int) array.size();
+ List<byte[]> list = new ArrayList<byte[]>(n);
+ n *= 4;
+ for (T element : array) {
+ byte[] bytes = BytesArraySerializer.get().fromByteBuffer(elementSerializer.toByteBuffer(element));
+ list.add(bytes);
+ n += bytes.length;
+ }
+ ByteBuffer byteBuffer = ByteBuffer.allocate(n);
+ for (byte[] bytes : list) {
+ byteBuffer.put(IntegerSerializer.get().toByteBuffer(bytes.length));
+ byteBuffer.put(BytesArraySerializer.get().toByteBuffer(bytes));
+ }
+ byteBuffer.rewind();
+ return byteBuffer;
+ }
+
+ @Override
+ public List<T> fromByteBuffer(ByteBuffer byteBuffer) {
+ if (byteBuffer == null) {
+ return null;
+ }
+ ArrayList<T> array = new ArrayList<T>();
+ while (true) {
+ T element = null;
+ try {
+ if (size > 0) {
+ element = elementSerializer.fromByteBuffer(byteBuffer);
+ }
+ else {
+ int n = IntegerSerializer.get().fromByteBuffer(byteBuffer);
+ byte[] bytes = new byte[n];
+ byteBuffer.get(bytes, 0, n);
+ element = elementSerializer.fromByteBuffer( BytesArraySerializer.get().toByteBuffer(bytes) );
+ }
+ } catch (BufferUnderflowException e) {
+ break;
+ }
+ if (element == null) {
+ break;
+ }
+ array.add(element);
+ }
+ return array;
+ }
+
+ @Override
+ public ComparatorType getComparatorType() {
+ return elementSerializer.getComparatorType();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/gora/blob/136fc595/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/MapSerializer.java
----------------------------------------------------------------------
diff --git a/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/MapSerializer.java b/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/MapSerializer.java
new file mode 100644
index 0000000..59af1d1
--- /dev/null
+++ b/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/MapSerializer.java
@@ -0,0 +1,223 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gora.cassandra.serializers;
+
+import java.nio.BufferUnderflowException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import me.prettyprint.cassandra.serializers.AbstractSerializer;
+import me.prettyprint.cassandra.serializers.BytesArraySerializer;
+import me.prettyprint.cassandra.serializers.IntegerSerializer;
+import me.prettyprint.hector.api.Serializer;
+import me.prettyprint.hector.api.ddl.ComparatorType;
+
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A MapSerializer translates the byte[] to and from Map of Avro.
+ */
+public class MapSerializer<T> extends AbstractSerializer<Map<CharSequence, T>> {
+
+ public static final Logger LOG = LoggerFactory.getLogger(MapSerializer.class);
+
+ private static Map<Type, MapSerializer> valueTypeToSerializerMap = new HashMap<Type, MapSerializer>();
+ private static Map<Class, MapSerializer> fixedClassToSerializerMap = new HashMap<Class, MapSerializer>();
+
+ public static MapSerializer get(Type valueType) {
+ MapSerializer serializer = valueTypeToSerializerMap.get(valueType);
+ if (serializer == null) {
+ serializer = new MapSerializer(valueType);
+ valueTypeToSerializerMap.put(valueType, serializer);
+ }
+ return serializer;
+ }
+
+ public static MapSerializer get(Type valueType, Class clazz) {
+ if (valueType != Type.FIXED) {
+ return null;
+ }
+ MapSerializer serializer = valueTypeToSerializerMap.get(clazz);
+ if (serializer == null) {
+ serializer = new MapSerializer(clazz);
+ fixedClassToSerializerMap.put(clazz, serializer);
+ }
+ return serializer;
+ }
+
+ public static MapSerializer get(Schema valueSchema) {
+ Type type = valueSchema.getType();
+ if (type == Type.FIXED) {
+ return get(Type.FIXED, TypeUtils.getClass(valueSchema));
+ } else {
+ return get(type);
+ }
+ }
+
+ private Schema valueSchema = null;
+ private Type valueType = null;
+ private int size = -1;
+ private Class<T> clazz = null;
+ private Serializer<T> valueSerializer = null;
+
+ public MapSerializer(Serializer<T> valueSerializer) {
+ this.valueSerializer = valueSerializer;
+ }
+
+ public MapSerializer(Schema valueSchema) {
+ this.valueSchema = valueSchema;
+ valueType = valueSchema.getType();
+ size = TypeUtils.getFixedSize(valueSchema);
+ valueSerializer = GoraSerializerTypeInferer.getSerializer(valueSchema);
+ }
+
+ @SuppressWarnings("unchecked")
+ public MapSerializer(Type valueType) {
+ this.valueType = valueType;
+ if (valueType != Type.FIXED) {
+ valueSchema = Schema.create(valueType);
+ }
+ clazz = (Class<T>) TypeUtils.getClass(valueType);
+ size = TypeUtils.getFixedSize(valueType);
+ valueSerializer = GoraSerializerTypeInferer.getSerializer(valueType);
+ }
+
+ public MapSerializer(Class<T> clazz) {
+ this.clazz = clazz;
+ valueType = TypeUtils.getType(clazz);
+ size = TypeUtils.getFixedSize(clazz);
+ if (valueType == null || valueType == Type.FIXED) {
+ valueType = Type.FIXED;
+ valueSchema = TypeUtils.getSchema(clazz);
+ valueSerializer = GoraSerializerTypeInferer.getSerializer(valueType, clazz);
+ } else {
+ valueSerializer = GoraSerializerTypeInferer.getSerializer(valueType);
+ }
+ }
+
+ @Override
+ public ByteBuffer toByteBuffer(Map<CharSequence, T> map) {
+ if (map == null) {
+ return null;
+ }
+ if (size > 0) {
+ return toByteBufferWithFixedLengthElements(map);
+ } else {
+ return toByteBufferWithVariableLengthElements(map);
+ }
+ }
+
+ private ByteBuffer toByteBufferWithFixedLengthElements(Map<CharSequence, T> map) {
+ int n = (int) map.size();
+ List<byte[]> list = new ArrayList<byte[]>(n);
+ n *= 4;
+ for (CharSequence key : map.keySet()) {
+ T value = map.get(key);
+ byte[] bytes = BytesArraySerializer.get().fromByteBuffer(CharSequenceSerializer.get().toByteBuffer(key));
+ list.add(bytes);
+ n += bytes.length;
+ bytes = BytesArraySerializer.get().fromByteBuffer(valueSerializer.toByteBuffer(value));
+ list.add(bytes);
+ n += bytes.length;
+ }
+ ByteBuffer byteBuffer = ByteBuffer.allocate(n);
+ int i = 0;
+ for (byte[] bytes : list) {
+ if (i % 2 == 0) {
+ byteBuffer.put(IntegerSerializer.get().toByteBuffer(bytes.length));
+ }
+ byteBuffer.put(BytesArraySerializer.get().toByteBuffer(bytes));
+ i += 1;
+ }
+ byteBuffer.rewind();
+ return byteBuffer;
+ }
+
+ private ByteBuffer toByteBufferWithVariableLengthElements(Map<CharSequence, T> map) {
+ int n = (int) map.size();
+ List<byte[]> list = new ArrayList<byte[]>(n);
+ n *= 8;
+ for (CharSequence key : map.keySet()) {
+ T value = map.get(key);
+ byte[] bytes = BytesArraySerializer.get().fromByteBuffer(CharSequenceSerializer.get().toByteBuffer(key));
+ list.add(bytes);
+ n += bytes.length;
+ bytes = BytesArraySerializer.get().fromByteBuffer(valueSerializer.toByteBuffer(value));
+ list.add(bytes);
+ n += bytes.length;
+ }
+ ByteBuffer byteBuffer = ByteBuffer.allocate(n);
+ for (byte[] bytes : list) {
+ byteBuffer.put(IntegerSerializer.get().toByteBuffer(bytes.length));
+ byteBuffer.put(BytesArraySerializer.get().toByteBuffer(bytes));
+ }
+ byteBuffer.rewind();
+ return byteBuffer;
+ }
+
+ @Override
+ public Map<CharSequence, T> fromByteBuffer(ByteBuffer byteBuffer) {
+ if (byteBuffer == null) {
+ return null;
+ }
+ Map<CharSequence, T> map = new HashMap<CharSequence, T>();
+ while (true) {
+ CharSequence key = null;
+ T value = null;
+ try {
+ int n = IntegerSerializer.get().fromByteBuffer(byteBuffer);
+ byte[] bytes = new byte[n];
+ byteBuffer.get(bytes, 0, n);
+ key = CharSequenceSerializer.get().fromByteBuffer( BytesArraySerializer.get().toByteBuffer(bytes) );
+
+ if (size > 0) {
+ value = valueSerializer.fromByteBuffer(byteBuffer);
+ }
+ else {
+ n = IntegerSerializer.get().fromByteBuffer(byteBuffer);
+ bytes = new byte[n];
+ byteBuffer.get(bytes, 0, n);
+ value = valueSerializer.fromByteBuffer( BytesArraySerializer.get().toByteBuffer(bytes) );
+ }
+ } catch (BufferUnderflowException e) {
+ break;
+ }
+ if (key == null) {
+ break;
+ }
+ if (value == null) {
+ break;
+ }
+ map.put(key, value);
+ }
+ return map;
+ }
+
+ @Override
+ public ComparatorType getComparatorType() {
+ return valueSerializer.getComparatorType();
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/gora/blob/136fc595/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/SpecificFixedSerializer.java
----------------------------------------------------------------------
diff --git a/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/SpecificFixedSerializer.java b/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/SpecificFixedSerializer.java
index b981fbf..2232c08 100644
--- a/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/SpecificFixedSerializer.java
+++ b/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/SpecificFixedSerializer.java
@@ -25,15 +25,10 @@ import java.util.Map;
import me.prettyprint.cassandra.serializers.AbstractSerializer;
import me.prettyprint.cassandra.serializers.BytesArraySerializer;
-import me.prettyprint.hector.api.Serializer;
import me.prettyprint.hector.api.ddl.ComparatorType;
import static me.prettyprint.hector.api.ddl.ComparatorType.BYTESTYPE;
import org.apache.avro.specific.SpecificFixed;
-import org.apache.avro.Schema;
-import org.apache.avro.Schema.Type;
-import org.apache.avro.util.Utf8;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/gora/blob/136fc595/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/StatefulHashMapSerializer.java
----------------------------------------------------------------------
diff --git a/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/StatefulHashMapSerializer.java b/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/StatefulHashMapSerializer.java
index 4922220..e69de29 100644
--- a/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/StatefulHashMapSerializer.java
+++ b/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/StatefulHashMapSerializer.java
@@ -1,236 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gora.cassandra.serializers;
-
-import java.nio.BufferUnderflowException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import me.prettyprint.cassandra.serializers.AbstractSerializer;
-import me.prettyprint.cassandra.serializers.BytesArraySerializer;
-import me.prettyprint.cassandra.serializers.IntegerSerializer;
-import me.prettyprint.hector.api.Serializer;
-import me.prettyprint.hector.api.ddl.ComparatorType;
-import static me.prettyprint.hector.api.ddl.ComparatorType.UTF8TYPE;
-
-import org.apache.avro.Schema;
-import org.apache.avro.Schema.Type;
-import org.apache.avro.specific.SpecificFixed;
-import org.apache.avro.util.Utf8;
-import org.apache.gora.persistency.State;
-import org.apache.gora.persistency.StatefulHashMap;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A StatefulHashMapSerializer translates the byte[] to and from StatefulHashMap of Avro.
- */
-public class StatefulHashMapSerializer<T> extends AbstractSerializer<StatefulHashMap<Utf8, T>> {
-
- public static final Logger LOG = LoggerFactory.getLogger(StatefulHashMapSerializer.class);
-
- private static Map<Type, StatefulHashMapSerializer> valueTypeToSerializerMap = new HashMap<Type, StatefulHashMapSerializer>();
- private static Map<Class, StatefulHashMapSerializer> fixedClassToSerializerMap = new HashMap<Class, StatefulHashMapSerializer>();
-
- public static StatefulHashMapSerializer get(Type valueType) {
- StatefulHashMapSerializer serializer = valueTypeToSerializerMap.get(valueType);
- if (serializer == null) {
- serializer = new StatefulHashMapSerializer(valueType);
- valueTypeToSerializerMap.put(valueType, serializer);
- }
- return serializer;
- }
-
- public static StatefulHashMapSerializer get(Type valueType, Class clazz) {
- if (valueType != Type.FIXED) {
- return null;
- }
- StatefulHashMapSerializer serializer = valueTypeToSerializerMap.get(clazz);
- if (serializer == null) {
- serializer = new StatefulHashMapSerializer(clazz);
- fixedClassToSerializerMap.put(clazz, serializer);
- }
- return serializer;
- }
-
- public static StatefulHashMapSerializer get(Schema valueSchema) {
- Type type = valueSchema.getType();
- if (type == Type.FIXED) {
- return get(Type.FIXED, TypeUtils.getClass(valueSchema));
- } else {
- return get(type);
- }
- }
-
- private Schema valueSchema = null;
- private Type valueType = null;
- private int size = -1;
- private Class<T> clazz = null;
- private Serializer<T> valueSerializer = null;
-
- public StatefulHashMapSerializer(Serializer<T> valueSerializer) {
- this.valueSerializer = valueSerializer;
- }
-
- public StatefulHashMapSerializer(Schema valueSchema) {
- this.valueSchema = valueSchema;
- valueType = valueSchema.getType();
- size = TypeUtils.getFixedSize(valueSchema);
- valueSerializer = GoraSerializerTypeInferer.getSerializer(valueSchema);
- }
-
- public StatefulHashMapSerializer(Type valueType) {
- this.valueType = valueType;
- if (valueType != Type.FIXED) {
- valueSchema = Schema.create(valueType);
- }
- clazz = TypeUtils.getClass(valueType);
- size = TypeUtils.getFixedSize(valueType);
- valueSerializer = GoraSerializerTypeInferer.getSerializer(valueType);
- }
-
- public StatefulHashMapSerializer(Class<T> clazz) {
- this.clazz = clazz;
- valueType = TypeUtils.getType(clazz);
- size = TypeUtils.getFixedSize(clazz);
- if (valueType == null || valueType == Type.FIXED) {
- valueType = Type.FIXED;
- valueSchema = TypeUtils.getSchema(clazz);
- valueSerializer = GoraSerializerTypeInferer.getSerializer(valueType, clazz);
- } else {
- valueSerializer = GoraSerializerTypeInferer.getSerializer(valueType);
- }
- }
-
- @Override
- public ByteBuffer toByteBuffer(StatefulHashMap<Utf8, T> map) {
- if (map == null) {
- return null;
- }
- if (size > 0) {
- return toByteBufferWithFixedLengthElements(map);
- } else {
- return toByteBufferWithVariableLengthElements(map);
- }
- }
-
- private ByteBuffer toByteBufferWithFixedLengthElements(StatefulHashMap<Utf8, T> map) {
- List<byte[]> list = new ArrayList<byte[]>(map.size());
- int n = 0;
- for (Utf8 key : map.keySet()) {
- if (map.getState(key) == State.DELETED) {
- continue;
- }
- T value = map.get(key);
- byte[] bytes = BytesArraySerializer.get().fromByteBuffer(Utf8Serializer.get().toByteBuffer(key));
- list.add(bytes);
- n += 4;
- n += bytes.length;
- bytes = BytesArraySerializer.get().fromByteBuffer(valueSerializer.toByteBuffer(value));
- list.add(bytes);
- n += bytes.length;
- }
- ByteBuffer byteBuffer = ByteBuffer.allocate(n);
- int i = 0;
- for (byte[] bytes : list) {
- if (i % 2 == 0) {
- byteBuffer.put(IntegerSerializer.get().toByteBuffer(bytes.length));
- }
- byteBuffer.put(BytesArraySerializer.get().toByteBuffer(bytes));
- i += 1;
- }
- byteBuffer.rewind();
- return byteBuffer;
- }
-
- private ByteBuffer toByteBufferWithVariableLengthElements(StatefulHashMap<Utf8, T> map) {
- List<byte[]> list = new ArrayList<byte[]>(map.size());
- int n = 0;
- for (Utf8 key : map.keySet()) {
- if (map.getState(key) == State.DELETED) {
- continue;
- }
- T value = map.get(key);
- byte[] bytes = BytesArraySerializer.get().fromByteBuffer(Utf8Serializer.get().toByteBuffer(key));
- list.add(bytes);
- n += 4;
- n += bytes.length;
- bytes = BytesArraySerializer.get().fromByteBuffer(valueSerializer.toByteBuffer(value));
- list.add(bytes);
- n += 4;
- n += bytes.length;
- }
- ByteBuffer byteBuffer = ByteBuffer.allocate(n);
- for (byte[] bytes : list) {
- byteBuffer.put(IntegerSerializer.get().toByteBuffer(bytes.length));
- byteBuffer.put(BytesArraySerializer.get().toByteBuffer(bytes));
- }
- byteBuffer.rewind();
- return byteBuffer;
- }
-
- @Override
- public StatefulHashMap<Utf8, T> fromByteBuffer(ByteBuffer byteBuffer) {
- if (byteBuffer == null) {
- return null;
- }
- StatefulHashMap<Utf8, T> map = new StatefulHashMap<Utf8, T>();
-int i = 0;
- while (true) {
- Utf8 key = null;
- T value = null;
- try {
- int n = IntegerSerializer.get().fromByteBuffer(byteBuffer);
- byte[] bytes = new byte[n];
- byteBuffer.get(bytes, 0, n);
- key = Utf8Serializer.get().fromByteBuffer( BytesArraySerializer.get().toByteBuffer(bytes) );
-
- if (size > 0) {
- value = valueSerializer.fromByteBuffer(byteBuffer);
- }
- else {
- n = IntegerSerializer.get().fromByteBuffer(byteBuffer);
- bytes = new byte[n];
- byteBuffer.get(bytes, 0, n);
- value = valueSerializer.fromByteBuffer( BytesArraySerializer.get().toByteBuffer(bytes) );
- }
- } catch (BufferUnderflowException e) {
- break;
- }
- if (key == null) {
- break;
- }
- if (value == null) {
- break;
- }
- map.put(key, value);
- }
- return map;
- }
-
- @Override
- public ComparatorType getComparatorType() {
- return valueSerializer.getComparatorType();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/gora/blob/136fc595/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/TypeUtils.java
----------------------------------------------------------------------
diff --git a/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/TypeUtils.java b/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/TypeUtils.java
index 48973d0..c5db72b 100644
--- a/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/TypeUtils.java
+++ b/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/TypeUtils.java
@@ -19,16 +19,15 @@
package org.apache.gora.cassandra.serializers;
import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Type;
import org.apache.avro.generic.GenericArray;
import org.apache.avro.specific.SpecificFixed;
import org.apache.avro.util.Utf8;
-import org.apache.gora.persistency.ListGenericArray;
import org.apache.gora.persistency.Persistent;
-import org.apache.gora.persistency.StatefulHashMap;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,13 +40,13 @@ public class TypeUtils {
public static final Logger LOG = LoggerFactory.getLogger(TypeUtils.class);
// @SuppressWarnings({ "rawtypes", "unchecked" })
- public static Class getClass(Object value) {
+ public static Class<? extends Object> getClass(Object value) {
return value.getClass();
}
public static Schema getSchema(Object value) {
if (value instanceof GenericArray) {
- return Schema.createArray( getElementSchema((GenericArray)value) );
+ return Schema.createArray( getElementSchema((GenericArray<?>)value) );
} else {
return getSchema( getClass(value) );
}
@@ -72,9 +71,9 @@ public class TypeUtils {
return Type.INT;
} else if (clazz.equals(Long.class) || clazz.equals(long.class)) {
return Type.LONG;
- } else if (clazz.equals(ListGenericArray.class)) {
+ } else if (clazz.isAssignableFrom(List.class)) {
return Type.ARRAY;
- } else if (clazz.equals(StatefulHashMap.class)) {
+ } else if (clazz.isAssignableFrom(Map.class)) {
return Type.MAP;
} else if (clazz.equals(Persistent.class)) {
return Type.RECORD;
@@ -85,7 +84,7 @@ public class TypeUtils {
}
}
- public static Class getClass(Type type) {
+ public static Class<?> getClass(Type type) {
if (type == Type.STRING) {
return Utf8.class;
} else if (type == Type.BOOLEAN) {
@@ -101,9 +100,9 @@ public class TypeUtils {
} else if (type == Type.LONG) {
return Long.class;
} else if (type == Type.ARRAY) {
- return ListGenericArray.class;
+ return List.class;
} else if (type == Type.MAP) {
- return StatefulHashMap.class;
+ return Map.class;
} else if (type == Type.RECORD) {
return Persistent.class;
} else if (type == Type.FIXED) {
@@ -114,7 +113,7 @@ public class TypeUtils {
}
}
- public static Schema getSchema(Class clazz) {
+ public static Schema getSchema(Class<?> clazz) {
Type type = getType(clazz);
if (type == null) {
return null;
@@ -157,7 +156,7 @@ public class TypeUtils {
}
}
- public static Class getClass(Schema schema) {
+ public static Class<?> getClass(Schema schema) {
Type type = schema.getType();
if (type == null) {
return null;
@@ -198,7 +197,7 @@ public class TypeUtils {
}
}
- public static int getFixedSize(Class clazz) {
+ public static int getFixedSize(Class<?> clazz) {
Type type = getType(clazz);
if (type == Type.FIXED) {
try {
@@ -215,15 +214,11 @@ public class TypeUtils {
}
}
- public static Schema getElementSchema(GenericArray array) {
+ public static Schema getElementSchema(GenericArray<?> array) {
Schema schema = array.getSchema();
return (schema.getType() == Type.ARRAY) ? schema.getElementType() : schema;
}
- public static Type getElementType(ListGenericArray array) {
- return getElementSchema(array).getType();
- }
-
/*
public static Schema getValueSchema(StatefulHashMap map) {
return map.getSchema().getValueType();
http://git-wip-us.apache.org/repos/asf/gora/blob/136fc595/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/Utf8Serializer.java
----------------------------------------------------------------------
diff --git a/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/Utf8Serializer.java b/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/Utf8Serializer.java
index 19e3668..e69de29 100644
--- a/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/Utf8Serializer.java
+++ b/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/Utf8Serializer.java
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gora.cassandra.serializers;
-
-import java.nio.ByteBuffer;
-
-import me.prettyprint.cassandra.serializers.AbstractSerializer;
-import me.prettyprint.cassandra.serializers.StringSerializer;
-import me.prettyprint.hector.api.ddl.ComparatorType;
-import static me.prettyprint.hector.api.ddl.ComparatorType.UTF8TYPE;
-
-import org.apache.avro.util.Utf8;
-
-/**
- * A Utf8Serializer translates the byte[] to and from Utf8 object of Avro.
- */
-public final class Utf8Serializer extends AbstractSerializer<Utf8> {
-
- private static final Utf8Serializer instance = new Utf8Serializer();
-
- public static Utf8Serializer get() {
- return instance;
- }
-
- @Override
- public ByteBuffer toByteBuffer(Utf8 obj) {
- if (obj == null) {
- return null;
- }
- return StringSerializer.get().toByteBuffer(obj.toString());
- }
-
- @Override
- public Utf8 fromByteBuffer(ByteBuffer byteBuffer) {
- if (byteBuffer == null) {
- return null;
- }
- return new Utf8(StringSerializer.get().fromByteBuffer(byteBuffer));
- }
-
- @Override
- public ComparatorType getComparatorType() {
- return UTF8TYPE;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/gora/blob/136fc595/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java
----------------------------------------------------------------------
diff --git a/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java b/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java
index ddf3de0..416c017 100644
--- a/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java
+++ b/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java
@@ -18,16 +18,11 @@
package org.apache.gora.cassandra.store;
-import static org.apache.gora.cassandra.store.CassandraStore.colFamConsLvl;
-import static org.apache.gora.cassandra.store.CassandraStore.readOpConsLvl;
-import static org.apache.gora.cassandra.store.CassandraStore.writeOpConsLvl;
-
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Properties;
import me.prettyprint.cassandra.model.ConfigurableConsistencyLevel;
import me.prettyprint.cassandra.serializers.ByteBufferSerializer;
@@ -52,62 +47,27 @@ import me.prettyprint.hector.api.HConsistencyLevel;
import me.prettyprint.hector.api.Serializer;
import org.apache.avro.generic.GenericArray;
-import org.apache.avro.util.Utf8;
import org.apache.gora.cassandra.query.CassandraQuery;
import org.apache.gora.cassandra.serializers.GoraSerializerTypeInferer;
import org.apache.gora.mapreduce.GoraRecordReader;
import org.apache.gora.persistency.impl.PersistentBase;
-import org.apache.gora.persistency.State;
-import org.apache.gora.persistency.StatefulHashMap;
import org.apache.gora.query.Query;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-/**
- * CassandraClient is where all of the primary datastore functionality is
- * executed. Typically CassandraClient is invoked by calling
- * {@link org.apache.gora.cassandra.store.CassandraStore#initialize(Class, Class, Properties)}.
- * CassandraClient deals with Cassandra data model definition, mutation,
- * and general/specific mappings.
- * @see {@link org.apache.gora.cassandra.store.CassandraStore#initialize(Class, Class, Properties)}
- *
- * @param <K>
- * @param <T>
- */
public class CassandraClient<K, T extends PersistentBase> {
-
- /** The logging implementation */
public static final Logger LOG = LoggerFactory.getLogger(CassandraClient.class);
-
+
private Cluster cluster;
private Keyspace keyspace;
private Mutator<K> mutator;
private Class<K> keyClass;
private Class<T> persistentClass;
-
- /** Object containing the XML mapping for Cassandra. */
+
private CassandraMapping cassandraMapping = null;
- /** Hector client default column family consistency level. */
- public static final String DEFAULT_HECTOR_CONSIS_LEVEL = "QUORUM";
-
- /** Cassandra serializer to be used for serializing Gora's keys. */
private Serializer<K> keySerializer;
-
- /**
- * Given our key, persistentClass from
- * {@link org.apache.gora.cassandra.store.CassandraStore#initialize(Class, Class, Properties)}
- * we make best efforts to dictate our data model.
- * We make a quick check within {@link org.apache.gora.cassandra.store.CassandraClient#checkKeyspace(String)
- * to see if our keyspace has already been invented, this simple check prevents us from
- * recreating the keyspace if it already exists.
- * We then simple specify (based on the input keyclass) an appropriate serializer
- * via {@link org.apache.gora.cassandra.serializers.GoraSerializerTypeInferer} before
- * defining a mutator from and by which we can mutate this object.
- * @param keyClass the Key by which we wish o assign a record object
- * @param persistentClass the generated {@link org.apache.org.gora.persistency.Peristent} bean representing the data.
- * @throws Exception
- */
+
public void initialize(Class<K> keyClass, Class<T> persistentClass) throws Exception {
this.keyClass = keyClass;
@@ -115,16 +75,14 @@ public class CassandraClient<K, T extends PersistentBase> {
this.persistentClass = persistentClass;
this.cassandraMapping = CassandraMappingManager.getManager().get(persistentClass);
- this.cluster = HFactory.getOrCreateCluster(this.cassandraMapping.getClusterName(),
- new CassandraHostConfigurator(this.cassandraMapping.getHostName()));
-
+ this.cluster = HFactory.getOrCreateCluster(this.cassandraMapping.getClusterName(), new CassandraHostConfigurator(this.cassandraMapping.getHostName()));
+
// add keyspace to cluster
checkKeyspace();
-
- // Just create a Keyspace object on the client side, corresponding to an
- // already existing keyspace with already created column families.
+
+ // Just create a Keyspace object on the client side, corresponding to an already existing keyspace with already created column families.
this.keyspace = HFactory.createKeyspace(this.cassandraMapping.getKeyspaceName(), this.cluster);
-
+
this.keySerializer = GoraSerializerTypeInferer.getSerializer(keyClass);
this.mutator = HFactory.createMutator(this.keyspace, this.keySerializer);
}
@@ -136,21 +94,13 @@ public class CassandraClient<K, T extends PersistentBase> {
KeyspaceDefinition keyspaceDefinition = this.cluster.describeKeyspace(this.cassandraMapping.getKeyspaceName());
return (keyspaceDefinition != null);
}
-
+
/**
* Check if keyspace already exists. If not, create it.
- * In this method, we also utilize Hector's
- * {@link me.prettyprint.cassandra.model.ConfigurableConsistencyLevel}
- * logic.
- * It is set by passing a
- * {@link me.prettyprint.cassandra.model.ConfigurableConsistencyLevel} object right
- * when the {@link me.prettyprint.hector.api.Keyspace} is created.
- * If we cannot find a consistency level within <code>gora.properites</code>,
- * then column family consistency level is set to QUORUM (by default) which permits
- * consistency to wait for a quorum of replicas to respond regardless of data center.
- * QUORUM is Hector Client's default setting and we respect that here as well.
- *
- * @see http://hector-client.github.io/hector/build/html/content/consistency_level.html
+ * In this method, we also utilise Hector's {@ConfigurableConsistencyLevel}
+ * logic. It is set by passing a ConfigurableConsistencyLevel object right
+ * when the Keyspace is created. Currently consistency level is .ONE which
+ * permits consistency to wait until one replica has responded.
*/
public void checkKeyspace() {
// "describe keyspace <keyspaceName>;" query
@@ -164,25 +114,24 @@ public class CassandraClient<K, T extends PersistentBase> {
}
keyspaceDefinition = HFactory.createKeyspaceDefinition(this.cassandraMapping.getKeyspaceName(),
- "org.apache.cassandra.locator.SimpleStrategy", 1, columnFamilyDefinitions);
+ "org.apache.cassandra.locator.SimpleStrategy", 1, columnFamilyDefinitions);
this.cluster.addKeyspace(keyspaceDefinition, true);
+ // LOG.info("Keyspace '" + this.cassandraMapping.getKeyspaceName() + "' in cluster '" + this.cassandraMapping.getClusterName() + "' was created on host '" + this.cassandraMapping.getHostName() + "'");
+
+ // Create a customized Consistency Level
+ ConfigurableConsistencyLevel configurableConsistencyLevel = new ConfigurableConsistencyLevel();
+ Map<String, HConsistencyLevel> clmap = new HashMap<String, HConsistencyLevel>();
- // GORA-167 Create a customized Consistency Level
- ConfigurableConsistencyLevel ccl = new ConfigurableConsistencyLevel();
- Map<String, HConsistencyLevel> clmap = getConsisLevelForColFams(columnFamilyDefinitions);
- // Column family consistency levels
- ccl.setReadCfConsistencyLevels(clmap);
- ccl.setWriteCfConsistencyLevels(clmap);
- // Operations consistency levels
- String opConsisLvl = (readOpConsLvl!=null || !readOpConsLvl.isEmpty())?readOpConsLvl:DEFAULT_HECTOR_CONSIS_LEVEL;
- ccl.setDefaultReadConsistencyLevel(HConsistencyLevel.valueOf(opConsisLvl));
- LOG.debug("Hector read consistency configured to '" + opConsisLvl + "'.");
- opConsisLvl = (writeOpConsLvl!=null || !writeOpConsLvl.isEmpty())?writeOpConsLvl:DEFAULT_HECTOR_CONSIS_LEVEL;
- ccl.setDefaultWriteConsistencyLevel(HConsistencyLevel.valueOf(opConsisLvl));
- LOG.debug("Hector write consistency configured to '" + opConsisLvl + "'.");
+ // Define CL.ONE for ColumnFamily "ColumnFamily"
+ clmap.put("ColumnFamily", HConsistencyLevel.ONE);
+
+ // In this we use CL.ONE for read and writes. But you can use different CLs if needed.
+ configurableConsistencyLevel.setReadCfConsistencyLevels(clmap);
+ configurableConsistencyLevel.setWriteCfConsistencyLevels(clmap);
// Then let the keyspace know
- HFactory.createKeyspace("Keyspace", this.cluster, ccl);
+ HFactory.createKeyspace("Keyspace", this.cluster, configurableConsistencyLevel);
+
keyspaceDefinition = null;
}
else {
@@ -196,10 +145,11 @@ public class CassandraClient<K, T extends PersistentBase> {
if (! comparatorType.equals(ComparatorType.BYTESTYPE)) {
// GORA-197
LOG.warn("The comparator type of " + cfDef.getName() + " column family is " + comparatorType.getTypeName()
- + ", not BytesType. It may cause a fatal error on column validation later.");
+ + ", not BytesType. It may cause a fatal error on column validation later.");
}
else {
- // LOG.info("The comparator type of " + cfDef.getName() + " column family is " + comparatorType.getTypeName() + ".");
+ LOG.debug("The comparator type of " + cfDef.getName() + " column family is "
+ + comparatorType.getTypeName() + ".");
}
}
}
@@ -207,26 +157,9 @@ public class CassandraClient<K, T extends PersistentBase> {
}
/**
- * Method in charge of setting the consistency level for defined column families.
- * @param pColFams Column families
- * @return Map<String, HConsistencyLevel> with the mapping between colFams and consistency level.
- */
- private Map<String, HConsistencyLevel> getConsisLevelForColFams(List<ColumnFamilyDefinition> pColFams) {
- Map<String, HConsistencyLevel> clMap = new HashMap<String, HConsistencyLevel>();
- // Get columnFamily consistency level.
- String colFamConsisLvl = (colFamConsLvl != null && !colFamConsLvl.isEmpty())?colFamConsLvl:DEFAULT_HECTOR_CONSIS_LEVEL;
- LOG.debug("ColumnFamily consistency level configured to '" + colFamConsisLvl + "'.");
- // Define consistency for ColumnFamily "ColumnFamily"
- for (ColumnFamilyDefinition colFamDef : pColFams)
- clMap.put(colFamDef.getName(), HConsistencyLevel.valueOf(colFamConsisLvl));
- return clMap;
- }
-
- /**
* Drop keyspace.
*/
public void dropKeyspace() {
- // "drop keyspace <keyspaceName>;" query
this.cluster.dropKeyspace(this.cassandraMapping.getKeyspaceName());
}
@@ -245,21 +178,40 @@ public class CassandraClient<K, T extends PersistentBase> {
String columnFamily = this.cassandraMapping.getFamily(fieldName);
String columnName = this.cassandraMapping.getColumn(fieldName);
String ttlAttr = this.cassandraMapping.getColumnsAttribs().get(fieldName);
- if (ttlAttr == null) {
+ if (ttlAttr == null)
ttlAttr = CassandraMapping.DEFAULT_COLUMNS_TTL;
- }
if (columnName == null) {
LOG.warn("Column name is null for field=" + fieldName + " with value=" + value.toString());
return;
}
-
synchronized(mutator) {
HectorUtils.insertColumn(mutator, key, columnFamily, columnName, byteBuffer, ttlAttr);
}
}
/**
+ * Delete a row within the keyspace.
+ * @param key
+ * @param fieldName
+ * @param columnName
+ */
+ public void deleteColumn(K key, String familyName, ByteBuffer columnName) {
+ synchronized(mutator) {
+ HectorUtils.deleteColumn(mutator, key, familyName, columnName);
+ }
+ }
+
+ /**
+ * Deletes an entry based on its key.
+ * @param key
+ */
+ public void deleteByKey(K key) {
+ Map<String, String> familyMap = this.cassandraMapping.getFamilyMap();
+ deleteColumn(key, familyMap.values().iterator().next().toString(), null);
+ }
+
+ /**
* Insert a member in a super column. This is used for map and record Avro types.
* @param key the row key
* @param fieldName the field name
@@ -272,14 +224,12 @@ public class CassandraClient<K, T extends PersistentBase> {
}
ByteBuffer byteBuffer = toByteBuffer(value);
-
+
String columnFamily = this.cassandraMapping.getFamily(fieldName);
String superColumnName = this.cassandraMapping.getColumn(fieldName);
String ttlAttr = this.cassandraMapping.getColumnsAttribs().get(fieldName);
- if (ttlAttr == null) {
+ if (ttlAttr == null)
ttlAttr = CassandraMapping.DEFAULT_COLUMNS_TTL;
- }
-
synchronized(mutator) {
HectorUtils.insertSubColumn(mutator, key, columnFamily, superColumnName, columnName, byteBuffer, ttlAttr);
}
@@ -318,43 +268,39 @@ public class CassandraClient<K, T extends PersistentBase> {
String columnFamily = this.cassandraMapping.getFamily(fieldName);
String superColumnName = this.cassandraMapping.getColumn(fieldName);
-
+
synchronized(mutator) {
HectorUtils.deleteSubColumn(mutator, key, columnFamily, superColumnName, columnName);
}
}
/**
- * Deletes a subColumn which is a field inside a column.
- * @param key Identifying the row.
- * @param fieldName The field's name.
- * @param columnName The column's name.
+ * Deletes a subColumn
+ * @param key
+ * @param fieldName
+ * @param columnName
*/
public void deleteSubColumn(K key, String fieldName, String columnName) {
deleteSubColumn(key, fieldName, StringSerializer.get().toByteBuffer(columnName));
}
/**
- * Delete a row within the keyspace.
- * @param key
- * @param fieldName
- * @param columnName
- */
- public void deleteColumn(K key, String familyName, ByteBuffer columnName) {
+ * Deletes all subcolumns from a super column.
+ * @param key the row key.
+ * @param fieldName the field name.
+ */
+ public void deleteSubColumn(K key, String fieldName) {
+ String columnFamily = this.cassandraMapping.getFamily(fieldName);
+ String superColumnName = this.cassandraMapping.getColumn(fieldName);
synchronized(mutator) {
- HectorUtils.deleteColumn(mutator, key, familyName, columnName);
- }
+ HectorUtils.deleteSubColumn(mutator, key, columnFamily, superColumnName, null);
}
-
- /**
- * Delete all content related to a key.
- * @param key
- */
- public void deleteByKey(K key) {
- Map<String, String> familyMap = this.cassandraMapping.getFamilyMap();
- deleteColumn(key, familyMap.values().iterator().next().toString(), null);
}
+ public void deleteGenericArray(K key, String fieldName) {
+ //TODO Verify this. Everything that goes inside a genericArray will go inside a column so let's just delete that.
+ deleteColumn(key, cassandraMapping.getFamily(fieldName), toByteBuffer(fieldName));
+ }
public void addGenericArray(K key, String fieldName, GenericArray<?> array) {
if (isSuper( cassandraMapping.getFamily(fieldName) )) {
int i= 0;
@@ -362,14 +308,15 @@ public class CassandraClient<K, T extends PersistentBase> {
// TODO: hack, do not store empty arrays
if (itemValue instanceof GenericArray<?>) {
- if (((GenericArray)itemValue).size() == 0) {
+ if (((List<?>)itemValue).size() == 0) {
continue;
}
- } else if (itemValue instanceof StatefulHashMap<?,?>) {
- if (((StatefulHashMap)itemValue).size() == 0) {
+ } else if (itemValue instanceof Map<?,?>) {
+ if (((Map<?, ?>)itemValue).size() == 0) {
continue;
}
}
+
addSubColumn(key, fieldName, i++, itemValue);
}
}
@@ -378,27 +325,35 @@ public class CassandraClient<K, T extends PersistentBase> {
}
}
- public void addStatefulHashMap(K key, String fieldName, StatefulHashMap<Utf8,Object> map) {
+ public void deleteStatefulHashMap(K key, String fieldName) {
if (isSuper( cassandraMapping.getFamily(fieldName) )) {
- int i= 0;
- for (Utf8 mapKey: map.keySet()) {
- if (map.getState(mapKey) == State.DELETED) {
- deleteSubColumn(key, fieldName, mapKey.toString());
- continue;
- }
+ deleteSubColumn(key, fieldName);
+ } else {
+ deleteColumn(key, cassandraMapping.getFamily(fieldName), toByteBuffer(fieldName));
+ }
+ }
- // TODO: hack, do not store empty arrays
- Object mapValue = map.get(mapKey);
- if (mapValue instanceof GenericArray<?>) {
- if (((GenericArray)mapValue).size() == 0) {
- continue;
- }
- } else if (mapValue instanceof StatefulHashMap<?,?>) {
- if (((StatefulHashMap)mapValue).size() == 0) {
- continue;
+ public void addStatefulHashMap(K key, String fieldName, Map<CharSequence,Object> map) {
+ if (isSuper( cassandraMapping.getFamily(fieldName) )) {
+ // as we don't know what has changed inside the map or If it's an empty map, then delete its content.
+ deleteSubColumn(key, fieldName);
+ // update if there is anything to update.
+ if (!map.isEmpty()) {
+ // If it's not empty, then update its content.
+ for (CharSequence mapKey: map.keySet()) {
+ // TODO: hack, do not store empty arrays
+ Object mapValue = map.get(mapKey);
+ if (mapValue instanceof GenericArray<?>) {
+ if (((List<?>)mapValue).size() == 0) {
+ continue;
+ }
+ } else if (mapValue instanceof Map<?,?>) {
+ if (((Map<?, ?>)mapValue).size() == 0) {
+ continue;
+ }
}
+ addSubColumn(key, fieldName, mapKey.toString(), mapValue);
}
- addSubColumn(key, fieldName, mapKey.toString(), mapValue);
}
}
else {
@@ -435,7 +390,7 @@ public class CassandraClient<K, T extends PersistentBase> {
* @return a list of family rows
*/
public List<Row<K, ByteBuffer, ByteBuffer>> execute(CassandraQuery<K, T> cassandraQuery, String family) {
-
+
String[] columnNames = cassandraQuery.getColumns(family);
ByteBuffer[] columnNameByteBuffers = new ByteBuffer[columnNames.length];
for (int i = 0; i < columnNames.length; i++) {
@@ -448,87 +403,98 @@ public class CassandraClient<K, T extends PersistentBase> {
}
K startKey = query.getStartKey();
K endKey = query.getEndKey();
-
- RangeSlicesQuery<K, ByteBuffer, ByteBuffer> rangeSlicesQuery =
- HFactory.createRangeSlicesQuery(this.keyspace, this.keySerializer, ByteBufferSerializer.get(), ByteBufferSerializer.get());
+
+ RangeSlicesQuery<K, ByteBuffer, ByteBuffer> rangeSlicesQuery = HFactory.createRangeSlicesQuery(this.keyspace, this.keySerializer, ByteBufferSerializer.get(), ByteBufferSerializer.get());
rangeSlicesQuery.setColumnFamily(family);
rangeSlicesQuery.setKeys(startKey, endKey);
rangeSlicesQuery.setRange(ByteBuffer.wrap(new byte[0]), ByteBuffer.wrap(new byte[0]), false, GoraRecordReader.BUFFER_LIMIT_READ_VALUE);
rangeSlicesQuery.setRowCount(limit);
rangeSlicesQuery.setColumnNames(columnNameByteBuffers);
-
+
QueryResult<OrderedRows<K, ByteBuffer, ByteBuffer>> queryResult = rangeSlicesQuery.execute();
OrderedRows<K, ByteBuffer, ByteBuffer> orderedRows = queryResult.get();
-
+
return orderedRows.getList();
}
-
+
private String getMappingFamily(String pField){
String family = null;
- // TODO checking if it was a UNION field the one we are retrieving
- family = this.cassandraMapping.getFamily(pField);
- return family;
- }
-
+ // checking if it was a UNION field the one we are retrieving
+ if (pField.indexOf(CassandraStore.UNION_COL_SUFIX) > 0)
+ family = this.cassandraMapping.getFamily(pField.substring(0,pField.indexOf(CassandraStore.UNION_COL_SUFIX)));
+ else
+ family = this.cassandraMapping.getFamily(pField);
+ return family;
+ }
+
private String getMappingColumn(String pField){
String column = null;
- // TODO checking if it was a UNION field the one we are retrieving e.g. column = pField;
- column = this.cassandraMapping.getColumn(pField);
- return column;
- }
+ if (pField.indexOf(CassandraStore.UNION_COL_SUFIX) > 0)
+ column = pField;
+ else
+ column = this.cassandraMapping.getColumn(pField);
+ return column;
+ }
/**
* Select the families that contain at least one column mapped to a query field.
* @param query indicates the columns to select
- * @return a map which keys are the family names and values the corresponding column
- * names required to get all the query fields.
+ * @return a map which keys are the family names and values the
+ * corresponding column names required to get all the query fields.
*/
public Map<String, List<String>> getFamilyMap(Query<K, T> query) {
Map<String, List<String>> map = new HashMap<String, List<String>>();
for (String field: query.getFields()) {
String family = this.getMappingFamily(field);
String column = this.getMappingColumn(field);
-
+
// check if the family value was already initialized
List<String> list = map.get(family);
if (list == null) {
list = new ArrayList<String>();
map.put(family, list);
}
-
if (column != null) {
list.add(column);
}
-
}
-
+
return map;
}
/**
- * Retrieves the cassandraMapping which holds whatever was mapped from the gora-cassandra-mapping.xml
- * @return
+ * Retrieves the cassandraMapping which holds whatever was mapped
+ * from the gora-cassandra-mapping.xml
+ * @return
*/
public CassandraMapping getCassandraMapping(){
return this.cassandraMapping;
}
-
+
/**
- * Select the field names according to the column names, which format if fully qualified: "family:column"
+ * Select the field names according to the column names, which format
+ * if fully qualified: "family:column"
* @param query
- * @return a map which keys are the fully qualified column names and values the query fields
+ * @return a map which keys are the fully qualified column
+ * names and values the query fields
*/
public Map<String, String> getReverseMap(Query<K, T> query) {
Map<String, String> map = new HashMap<String, String>();
for (String field: query.getFields()) {
String family = this.getMappingFamily(field);
String column = this.getMappingColumn(field);
-
+
map.put(family + ":" + column, field);
}
+
return map;
}
+ /**
+ * Determines if a column is a superColumn or not.
+ * @param family
+ * @return boolean
+ */
public boolean isSuper(String family) {
return this.cassandraMapping.isSuper(family);
}
@@ -542,19 +508,20 @@ public class CassandraClient<K, T extends PersistentBase> {
}
K startKey = query.getStartKey();
K endKey = query.getEndKey();
-
- RangeSuperSlicesQuery<K, String, ByteBuffer, ByteBuffer> rangeSuperSlicesQuery =
- HFactory.createRangeSuperSlicesQuery(this.keyspace, this.keySerializer, StringSerializer.get(),
- ByteBufferSerializer.get(), ByteBufferSerializer.get());
+
+ RangeSuperSlicesQuery<K, String, ByteBuffer, ByteBuffer> rangeSuperSlicesQuery = HFactory.createRangeSuperSlicesQuery(this.keyspace, this.keySerializer, StringSerializer.get(), ByteBufferSerializer.get(), ByteBufferSerializer.get());
rangeSuperSlicesQuery.setColumnFamily(family);
rangeSuperSlicesQuery.setKeys(startKey, endKey);
rangeSuperSlicesQuery.setRange("", "", false, GoraRecordReader.BUFFER_LIMIT_READ_VALUE);
rangeSuperSlicesQuery.setRowCount(limit);
rangeSuperSlicesQuery.setColumnNames(columnNames);
-
+
+
QueryResult<OrderedSuperRows<K, String, ByteBuffer, ByteBuffer>> queryResult = rangeSuperSlicesQuery.execute();
OrderedSuperRows<K, String, ByteBuffer, ByteBuffer> orderedRows = queryResult.get();
return orderedRows.getList();
+
+
}
/**
@@ -562,6 +529,6 @@ public class CassandraClient<K, T extends PersistentBase> {
* @return Keyspace
*/
public String getKeyspaceName() {
- return this.cassandraMapping.getKeyspaceName();
+ return this.cassandraMapping.getKeyspaceName();
}
}
http://git-wip-us.apache.org/repos/asf/gora/blob/136fc595/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java
----------------------------------------------------------------------
diff --git a/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java b/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java
index 9a0f492..f8d5315 100644
--- a/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java
+++ b/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java
@@ -53,6 +53,7 @@ public class CassandraMapping {
private String clusterName;
private String keyspaceName;
+
/**
* List of the super column families.
*/
@@ -168,9 +169,10 @@ public class CassandraMapping {
LOG.warn("Using default set to: " + DEFAULT_GCGRACE_SECONDS);
} else {
if (LOG.isDebugEnabled()) {
- LOG.debug("Located gc_grace_seconds: '" + gcgrace_scs + "'" );
+ LOG.debug("Located gc_grace_seconds: '" + gcgrace_scs + "'" );
}
}
+
String superAttribute = element.getAttributeValue(SUPER_ATTRIBUTE);
if (superAttribute != null) {
if (LOG.isDebugEnabled()) {
@@ -183,11 +185,12 @@ public class CassandraMapping {
cfDef.setColumnType(ColumnType.SUPER);
cfDef.setSubComparatorType(ComparatorType.BYTESTYPE);
}
-
+
cfDef.setKeyspaceName(this.keyspaceName);
cfDef.setName(familyName);
cfDef.setComparatorType(ComparatorType.BYTESTYPE);
cfDef.setDefaultValidationClass(ComparatorType.BYTESTYPE.getClassName());
+
cfDef.setGcGraceSeconds(gcgrace_scs!=null?Integer.parseInt(gcgrace_scs):DEFAULT_GCGRACE_SECONDS);
this.columnFamilyDefinitions.put(familyName, cfDef);
@@ -200,7 +203,6 @@ public class CassandraMapping {
String familyName = element.getAttributeValue(FAMILY_ATTRIBUTE);
String columnName = element.getAttributeValue(COLUMN_ATTRIBUTE);
String ttlValue = element.getAttributeValue(COLUMNS_TTL_ATTRIBUTE);
-
if (fieldName == null) {
LOG.error("Field name is not declared.");
continue;
@@ -221,55 +223,37 @@ public class CassandraMapping {
if (columnFamilyDefinition == null) {
LOG.warn("Family " + familyName + " was not declared in the keyspace.");
}
-
+
this.familyMap.put(fieldName, familyName);
this.columnMap.put(fieldName, columnName);
- // TODO we should find a way of storing more values into this map i.e. more column attributes
+ // TODO we should find a way of storing more values into this map
this.columnAttrMap.put(columnName, ttlValue!=null?ttlValue:DEFAULT_COLUMNS_TTL);
}
}
/**
- * Add new column to CassandraMapping using the self-explanatory parameters
- * @param pFamilyName
- * @param pFieldName
- * @param pColumnName
+ * Add new column to the CassandraMapping using the the below parameters
+ * @param pFamilyName the column family name
+ * @param pFieldName the Avro field from the Schema
+ * @param pColumnName the column name within the column family.
*/
public void addColumn(String pFamilyName, String pFieldName, String pColumnName){
this.familyMap.put(pFieldName, pFamilyName);
this.columnMap.put(pFieldName, pColumnName);
}
- /**
- * Gets the columnFamily related to the column name.
- * @param name
- * @return
- */
public String getFamily(String name) {
return this.familyMap.get(name);
}
- /**
- * Gets the column related to a field.
- * @param name
- * @return
- */
public String getColumn(String name) {
return this.columnMap.get(name);
}
- /**
- * Gets all the columnFamilies defined.
- * @return
- */
public Map<String,String> getFamilyMap(){
return this.familyMap;
}
- /**
- * Gets all attributes related to a column.
- * @return
- */
public Map<String, String> getColumnsAttribs(){
return this.columnAttrMap;
}
http://git-wip-us.apache.org/repos/asf/gora/blob/136fc595/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMappingManager.java
----------------------------------------------------------------------
diff --git a/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMappingManager.java b/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMappingManager.java
index 0c77abc..536c8ce 100644
--- a/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMappingManager.java
+++ b/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMappingManager.java
@@ -49,7 +49,7 @@ public class CassandraMappingManager {
return manager;
}
- /**
+ /**
* Objects to maintain mapped keyspaces
*/
private Map<String, Element> keyspaceMap = null;
@@ -78,7 +78,7 @@ public class CassandraMappingManager {
}
String keyspaceName = mappingElement.getAttributeValue(KEYSPACE_ELEMENT);
if (LOG.isDebugEnabled()) {
- LOG.debug("className=" + className + " -> keyspaceName=" + keyspaceName);
+ LOG.debug("persistentClassName=" + className + " -> keyspaceName=" + keyspaceName);
}
Element keyspaceElement = keyspaceMap.get(keyspaceName);
if (keyspaceElement == null) {