You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gora.apache.org by le...@apache.org on 2017/08/23 20:55:22 UTC
[24/37] gora git commit: Refactored the code
http://git-wip-us.apache.org/repos/asf/gora/blob/962d7a6a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/bean/KeySpace.java
----------------------------------------------------------------------
diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/bean/KeySpace.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/bean/KeySpace.java
index 6f8284d..7deb49a 100644
--- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/bean/KeySpace.java
+++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/bean/KeySpace.java
@@ -25,43 +25,30 @@ import java.util.Map;
*/
public class KeySpace {
- public enum PlacementStrategy {
- SimpleStrategy,
- NetworkTopologyStrategy,
- }
-
private String name;
-
private PlacementStrategy placementStrategy;
-
private boolean durableWritesEnabled;
-
private int replicationFactor;
-
private Map<String, Integer> dataCenters;
public String getName() {
return name;
}
- public boolean isDurableWritesEnabled() {
- return durableWritesEnabled;
- }
-
- public PlacementStrategy getPlacementStrategy() {
- return placementStrategy;
+ public void setName(String name) {
+ this.name = name;
}
- public int getReplicationFactor() {
- return replicationFactor;
+ public boolean isDurableWritesEnabled() {
+ return durableWritesEnabled;
}
- public Map<String, Integer> getDataCenters() {
- return dataCenters;
+ public void setDurableWritesEnabled(boolean durableWritesEnabled) {
+ this.durableWritesEnabled = durableWritesEnabled;
}
- public void addDataCenter(String key, Integer value) {
- this.dataCenters.put(key, value);
+ public PlacementStrategy getPlacementStrategy() {
+ return placementStrategy;
}
public void setPlacementStrategy(PlacementStrategy placementStrategy) {
@@ -71,15 +58,24 @@ public class KeySpace {
}
}
+ public int getReplicationFactor() {
+ return replicationFactor;
+ }
+
public void setReplicationFactor(int replicationFactor) {
this.replicationFactor = replicationFactor;
}
- public void setName(String name) {
- this.name = name;
+ public Map<String, Integer> getDataCenters() {
+ return dataCenters;
}
- public void setDurableWritesEnabled(boolean durableWritesEnabled) {
- this.durableWritesEnabled = durableWritesEnabled;
+ public void addDataCenter(String key, Integer value) {
+ this.dataCenters.put(key, value);
+ }
+
+ public enum PlacementStrategy {
+ SimpleStrategy,
+ NetworkTopologyStrategy,
}
}
http://git-wip-us.apache.org/repos/asf/gora/blob/962d7a6a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/bean/PartitionKeyField.java
----------------------------------------------------------------------
diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/bean/PartitionKeyField.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/bean/PartitionKeyField.java
index a0d9c4c..3aa4e36 100644
--- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/bean/PartitionKeyField.java
+++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/bean/PartitionKeyField.java
@@ -35,7 +35,7 @@ public class PartitionKeyField extends Field {
public void setComposite(boolean composite) {
isComposite = composite;
- if(isComposite && fields == null) {
+ if (isComposite && fields == null) {
fields = new ArrayList<>();
}
}
http://git-wip-us.apache.org/repos/asf/gora/blob/962d7a6a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/package-info.java
----------------------------------------------------------------------
diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/package-info.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/package-info.java
index 5247ecc..3ad9186 100644
--- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/package-info.java
+++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/package-info.java
@@ -5,15 +5,16 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
/**
* This package contains Casandra datastore related all classes.
*/
http://git-wip-us.apache.org/repos/asf/gora/blob/962d7a6a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/persistent/CassandraNativePersistent.java
----------------------------------------------------------------------
diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/persistent/CassandraNativePersistent.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/persistent/CassandraNativePersistent.java
index bd17dcd..9d6e103 100644
--- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/persistent/CassandraNativePersistent.java
+++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/persistent/CassandraNativePersistent.java
@@ -21,7 +21,6 @@ import com.datastax.driver.mapping.annotations.Transient;
import org.apache.avro.Schema;
import org.apache.gora.persistency.Persistent;
import org.apache.gora.persistency.Tombstone;
-import org.apache.gora.persistency.impl.PersistentBase;
import java.util.List;
@@ -61,12 +60,6 @@ public abstract class CassandraNativePersistent implements Persistent {
@Transient
@Override
- public void setDirty(String field) {
-
- }
-
- @Transient
- @Override
public void clearDirty(int fieldIndex) {
}
@@ -103,6 +96,12 @@ public abstract class CassandraNativePersistent implements Persistent {
@Transient
@Override
+ public void setDirty(String field) {
+
+ }
+
+ @Transient
+ @Override
public void clearDirty() {
}
http://git-wip-us.apache.org/repos/asf/gora/blob/962d7a6a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/query/CassandraQuery.java
----------------------------------------------------------------------
diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/query/CassandraQuery.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/query/CassandraQuery.java
index d6ba99c..1479686 100644
--- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/query/CassandraQuery.java
+++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/query/CassandraQuery.java
@@ -70,7 +70,7 @@ public class CassandraQuery<K, T extends Persistent> extends QueryWSBase<K, T> {
@Override
public String[] getFields() {
- if(updateFields.size() == 0) {
+ if (updateFields.size() == 0) {
return super.getFields();
} else {
String[] updateFieldsArray = new String[updateFields.size()];
http://git-wip-us.apache.org/repos/asf/gora/blob/962d7a6a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/query/CassandraResultSet.java
----------------------------------------------------------------------
diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/query/CassandraResultSet.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/query/CassandraResultSet.java
index 7ab3726..c3b2e59 100644
--- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/query/CassandraResultSet.java
+++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/query/CassandraResultSet.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -19,16 +19,13 @@
package org.apache.gora.cassandra.query;
import org.apache.gora.persistency.Persistent;
-import org.apache.gora.persistency.impl.PersistentBase;
import org.apache.gora.query.Query;
import org.apache.gora.query.impl.ResultBase;
import org.apache.gora.store.DataStore;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
/**
* CassandraResult specific implementation of the {@link org.apache.gora.query.Result}
@@ -45,7 +42,6 @@ public class CassandraResultSet<K, T extends Persistent> extends ResultBase<K, T
private int position = 0;
/**
- *
* @param dataStore
* @param query
*/
@@ -54,34 +50,37 @@ public class CassandraResultSet<K, T extends Persistent> extends ResultBase<K, T
}
/**
- *{@inheritDoc}
+ * {@inheritDoc}
+ *
* @return
* @throws IOException
*/
@Override
protected boolean nextInner() throws IOException {
- if(offset < size) {
+ if (offset < size) {
persistent = persistentObject.get(position);
key = persistentKey.get(position);
- position ++;
+ position++;
return true;
}
return false;
}
/**
- *{@inheritDoc}
+ * {@inheritDoc}
+ *
* @return
* @throws IOException
* @throws InterruptedException
*/
@Override
public float getProgress() throws IOException, InterruptedException {
- return ((float)position)/size;
+ return ((float) position) / size;
}
/**
- *{@inheritDoc}
+ * {@inheritDoc}
+ *
* @return
*/
@Override
@@ -91,6 +90,7 @@ public class CassandraResultSet<K, T extends Persistent> extends ResultBase<K, T
/**
* {@inheritDoc}
+ *
* @return
*/
@Override
@@ -99,7 +99,6 @@ public class CassandraResultSet<K, T extends Persistent> extends ResultBase<K, T
}
/**
- *
* @param key
* @param token
*/
http://git-wip-us.apache.org/repos/asf/gora/blob/962d7a6a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/query/package-info.java
----------------------------------------------------------------------
diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/query/package-info.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/query/package-info.java
index 49faefa..275c8d9 100644
--- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/query/package-info.java
+++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/query/package-info.java
@@ -5,15 +5,16 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
/**
* This package contains all the Cassandra store query representation class as well as Result set representing class
* when query is executed over the Cassandra dataStore.
http://git-wip-us.apache.org/repos/asf/gora/blob/962d7a6a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/AvroCassandraUtils.java
----------------------------------------------------------------------
diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/AvroCassandraUtils.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/AvroCassandraUtils.java
index 5383949..7baa1b1 100644
--- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/AvroCassandraUtils.java
+++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/AvroCassandraUtils.java
@@ -24,6 +24,7 @@ import org.apache.avro.util.Utf8;
import org.apache.gora.cassandra.bean.CassandraKey;
import org.apache.gora.cassandra.bean.Field;
import org.apache.gora.cassandra.store.CassandraMapping;
+import org.apache.gora.hbase.util.HBaseByteInterface;
import org.apache.gora.persistency.Persistent;
import org.apache.gora.persistency.impl.DirtyListWrapper;
import org.apache.gora.persistency.impl.DirtyMapWrapper;
@@ -31,6 +32,7 @@ import org.apache.gora.persistency.impl.PersistentBase;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
@@ -52,32 +54,26 @@ class AvroCassandraUtils {
static void processKeys(CassandraMapping cassandraMapping, Object key, List<String> keys, List<Object> values) {
CassandraKey cassandraKey = cassandraMapping.getCassandraKey();
- if (cassandraMapping.isPartitionKeyDefined()) {
- if (cassandraKey != null) {
- if (key instanceof PersistentBase) {
- PersistentBase keyBase = (PersistentBase) key;
- for (Schema.Field field : keyBase.getSchema().getFields()) {
- if (cassandraMapping.getFieldFromFieldName(field.name()) != null) {
- keys.add(field.name());
- Object value = keyBase.get(field.pos());
- value = getFieldValueFromAvroBean(field.schema(), field.schema().getType(), value);
- values.add(value);
- } else {
- LOG.debug("Ignoring field {}, Since field couldn't find in the {} mapping", new Object[]{field.name(), cassandraMapping.getPersistentClass()});
- }
+ if (cassandraKey != null) {
+ if (key instanceof PersistentBase) {
+ PersistentBase keyBase = (PersistentBase) key;
+ for (Schema.Field field : keyBase.getSchema().getFields()) {
+ Field mappedField = cassandraKey.getFieldFromFieldName(field.name());
+ if (mappedField != null) {
+ keys.add(field.name());
+ Object value = keyBase.get(field.pos());
+ value = getFieldValueFromAvroBean(field.schema(), field.schema().getType(), value, mappedField);
+ values.add(value);
+ } else {
+ LOG.debug("Ignoring field {}, Since field couldn't find in the {} mapping", new Object[]{field.name(), cassandraMapping.getPersistentClass()});
}
- } else {
- LOG.error("Key bean isn't extended by {} .", new Object[]{cassandraMapping.getKeyClass(), PersistentBase.class});
}
} else {
- for (Field field : cassandraMapping.getInlinedDefinedPartitionKeys()) {
- keys.add(field.getFieldName());
- values.add(key);
- }
+ LOG.error("Key bean isn't extended by {} .", new Object[]{cassandraMapping.getKeyClass(), PersistentBase.class});
}
} else {
- keys.add(cassandraMapping.getDefaultCassandraKey().getFieldName());
- values.add(key.toString());
+ keys.add(cassandraMapping.getInlinedDefinedPartitionKey().getFieldName());
+ values.add(key);
}
}
@@ -91,21 +87,33 @@ class AvroCassandraUtils {
* @param fieldValue the field value.
* @return field value
*/
- static Object getFieldValueFromAvroBean(Schema fieldSchema, Schema.Type type, Object fieldValue) {
+ static Object getFieldValueFromAvroBean(Schema fieldSchema, Schema.Type type, Object fieldValue, Field field) {
switch (type) {
+ // Record can be persist with two ways, udt and bytes
case RECORD:
PersistentBase persistent = (PersistentBase) fieldValue;
- PersistentBase newRecord = (PersistentBase) SpecificData.get().newRecord(persistent, persistent.getSchema());
- for (Schema.Field member : fieldSchema.getFields()) {
- if (member.pos() == 0 || !persistent.isDirty()) {
- continue;
+ if (field.getType().contains("frozen")) {
+ PersistentBase newRecord = (PersistentBase) SpecificData.get().newRecord(persistent, persistent.getSchema());
+ for (Schema.Field member : fieldSchema.getFields()) {
+ if (member.pos() == 0 || !persistent.isDirty()) {
+ continue;
+ }
+ Schema memberSchema = member.schema();
+ Schema.Type memberType = memberSchema.getType();
+ Object memberValue = persistent.get(member.pos());
+ newRecord.put(member.pos(), getFieldValueFromAvroBean(memberSchema, memberType, memberValue, field));
+ }
+ fieldValue = newRecord;
+ } else if (field.getType().contains("blob")) {
+ try {
+ byte[] serializedBytes = HBaseByteInterface.toBytes(fieldValue, fieldSchema);
+ fieldValue = ByteBuffer.wrap(serializedBytes);
+ } catch (IOException e) {
+ LOG.error("Error occurred when serializing {} field. {}", new Object[]{field.getFieldName(), e.getMessage()});
}
- Schema memberSchema = member.schema();
- Schema.Type memberType = memberSchema.getType();
- Object memberValue = persistent.get(member.pos());
- newRecord.put(member.pos(), getFieldValueFromAvroBean(memberSchema, memberType, memberValue));
+ } else {
+ throw new RuntimeException("");
}
- fieldValue = newRecord;
break;
case MAP:
Schema valueSchema = fieldSchema.getValueType();
@@ -114,7 +122,7 @@ class AvroCassandraUtils {
for (Map.Entry<CharSequence, ?> e : ((Map<CharSequence, ?>) fieldValue).entrySet()) {
String mapKey = e.getKey().toString();
Object mapValue = e.getValue();
- mapValue = getFieldValueFromAvroBean(valueSchema, valuetype, mapValue);
+ mapValue = getFieldValueFromAvroBean(valueSchema, valuetype, mapValue, field);
map.put(mapKey, mapValue);
}
fieldValue = map;
@@ -124,7 +132,7 @@ class AvroCassandraUtils {
valuetype = valueSchema.getType();
ArrayList<Object> list = new ArrayList<>();
for (Object item : (Collection<?>) fieldValue) {
- Object value = getFieldValueFromAvroBean(valueSchema, valuetype, item);
+ Object value = getFieldValueFromAvroBean(valueSchema, valuetype, item, field);
list.add(value);
}
fieldValue = list;
@@ -136,7 +144,7 @@ class AvroCassandraUtils {
int schemaPos = getUnionSchema(fieldValue, fieldSchema);
Schema unionSchema = fieldSchema.getTypes().get(schemaPos);
Schema.Type unionType = unionSchema.getType();
- fieldValue = getFieldValueFromAvroBean(unionSchema, unionType, fieldValue);
+ fieldValue = getFieldValueFromAvroBean(unionSchema, unionType, fieldValue, field);
}
break;
case STRING:
@@ -154,7 +162,7 @@ class AvroCassandraUtils {
* If no data type can be inferred then we return a default value
* of position 0.
*
- * @param pValue Object
+ * @param pValue Object
* @param pUnionSchema avro Schema
* @return the unionSchemaPosition.
*/
@@ -183,74 +191,84 @@ class AvroCassandraUtils {
return unionSchemaPos;
else if (pValue instanceof Persistent && schemaType.equals(Schema.Type.RECORD))
return unionSchemaPos;
+ else if (pValue != null && ByteBuffer.class.isAssignableFrom(pValue.getClass()) && schemaType.equals(Schema.Type.STRING))
+ return unionSchemaPos;
+ else if (pValue != null && ByteBuffer.class.isAssignableFrom(pValue.getClass()) && schemaType.equals(Schema.Type.INT))
+ return unionSchemaPos;
+ else if (pValue != null && ByteBuffer.class.isAssignableFrom(pValue.getClass()) && schemaType.equals(Schema.Type.LONG))
+ return unionSchemaPos;
+ else if (pValue != null && ByteBuffer.class.isAssignableFrom(pValue.getClass()) && schemaType.equals(Schema.Type.DOUBLE))
+ return unionSchemaPos;
+ else if (pValue != null && ByteBuffer.class.isAssignableFrom(pValue.getClass()) && schemaType.equals(Schema.Type.FLOAT))
+ return unionSchemaPos;
+ else if (pValue != null && ByteBuffer.class.isAssignableFrom(pValue.getClass()) && schemaType.equals(Schema.Type.BOOLEAN))
+ return unionSchemaPos;
+ else if (pValue != null && ByteBuffer.class.isAssignableFrom(pValue.getClass()) && schemaType.equals(Schema.Type.MAP))
+ return unionSchemaPos;
+ else if (pValue != null && ByteBuffer.class.isAssignableFrom(pValue.getClass()) && schemaType.equals(Schema.Type.ARRAY))
+ return unionSchemaPos;
+ else if (pValue != null && ByteBuffer.class.isAssignableFrom(pValue.getClass()) && schemaType.equals(Schema.Type.ENUM))
+ return unionSchemaPos;
+ else if (pValue != null && ByteBuffer.class.isAssignableFrom(pValue.getClass()) && schemaType.equals(Schema.Type.FIXED))
+ return unionSchemaPos;
+ else if (pValue != null && ByteBuffer.class.isAssignableFrom(pValue.getClass()) && schemaType.equals(Schema.Type.RECORD))
+ return unionSchemaPos;
unionSchemaPos++;
}
// if we weren't able to determine which data type it is, then we return the default
return DEFAULT_UNION_SCHEMA;
}
- static String encodeFieldKey(final String key) {
- if (key == null) {
- return null;
- }
- return key.replace(".", "\u00B7")
- .replace(":", "\u00FF")
- .replace(";", "\u00FE")
- .replace(" ", "\u00FD")
- .replace("%", "\u00FC")
- .replace("=", "\u00FB");
- }
-
- static String decodeFieldKey(final String key) {
- if (key == null) {
- return null;
- }
- return key.replace("\u00B7", ".")
- .replace("\u00FF", ":")
- .replace("\u00FE", ";")
- .replace("\u00FD", " ")
- .replace("\u00FC", "%")
- .replace("\u00FB", "=");
- }
-
static Object getAvroFieldValue(Object value, Schema schema) {
Object result;
switch (schema.getType()) {
case MAP:
Map<String, Object> rawMap = (Map<String, Object>) value;
- Map<Utf8, Object> deserializableMap = new HashMap<>();
+ Map<Utf8, Object> utf8ObjectHashMap = new HashMap<>();
if (rawMap == null) {
- result = new DirtyMapWrapper(deserializableMap);
+ result = new DirtyMapWrapper(utf8ObjectHashMap);
break;
}
for (Map.Entry<?, ?> e : rawMap.entrySet()) {
Schema innerSchema = schema.getValueType();
Object obj = getAvroFieldValue(e.getValue(), innerSchema);
if (e.getKey().getClass().getSimpleName().equalsIgnoreCase("Utf8")) {
- deserializableMap.put((Utf8) e.getKey(), obj);
+ utf8ObjectHashMap.put((Utf8) e.getKey(), obj);
} else {
- deserializableMap.put(new Utf8((String) e.getKey()), obj);
+ utf8ObjectHashMap.put(new Utf8((String) e.getKey()), obj);
}
}
- result = new DirtyMapWrapper<>(deserializableMap);
+ result = new DirtyMapWrapper<>(utf8ObjectHashMap);
break;
case ARRAY:
List<Object> rawList = (List<Object>) value;
- List<Object> deserializableList = new ArrayList<>();
+ List<Object> objectArrayList = new ArrayList<>();
if (rawList == null) {
- return new DirtyListWrapper(deserializableList);
+ return new DirtyListWrapper(objectArrayList);
}
for (Object item : rawList) {
Object obj = getAvroFieldValue(item, schema.getElementType());
- deserializableList.add(obj);
+ objectArrayList.add(obj);
}
- result = new DirtyListWrapper<>(deserializableList);
+ result = new DirtyListWrapper<>(objectArrayList);
break;
case RECORD:
- result = (PersistentBase) value;
+ if (value != null && ByteBuffer.class.isAssignableFrom(value.getClass())) {
+ ByteBuffer buffer = (ByteBuffer) value;
+ byte[] arr = new byte[buffer.remaining()];
+ buffer.get(arr);
+ try {
+ result = (PersistentBase) HBaseByteInterface.fromBytes(schema, arr);
+ } catch (IOException e) {
+ LOG.error("");
+ result = null;
+ }
+ } else {
+ result = (PersistentBase) value;
+ }
break;
case UNION:
http://git-wip-us.apache.org/repos/asf/gora/blob/962d7a6a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/AvroSerializer.java
----------------------------------------------------------------------
diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/AvroSerializer.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/AvroSerializer.java
index 4498caf..57d03f1 100644
--- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/AvroSerializer.java
+++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/AvroSerializer.java
@@ -22,6 +22,8 @@ import com.datastax.driver.core.DataType;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import org.apache.avro.Schema;
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.gora.cassandra.bean.CassandraKey;
import org.apache.gora.cassandra.bean.Field;
import org.apache.gora.cassandra.query.CassandraResultSet;
import org.apache.gora.cassandra.store.CassandraClient;
@@ -50,7 +52,7 @@ class AvroSerializer<K, T extends PersistentBase> extends CassandraSerializer {
private DataStore<K, T> cassandraDataStore;
AvroSerializer(CassandraClient cassandraClient, DataStore<K, T> dataStore, CassandraMapping mapping) {
- super(cassandraClient, dataStore.getKeyClass(), dataStore.getKeyClass(), mapping);
+ super(cassandraClient, dataStore.getKeyClass(), dataStore.getPersistentClass(), mapping);
this.cassandraDataStore = dataStore;
}
@@ -70,7 +72,7 @@ class AvroSerializer<K, T extends PersistentBase> extends CassandraSerializer {
if (iterator.hasNext()) {
obj = cassandraDataStore.newPersistent();
Row row = iterator.next();
- populateValuesToPersistent(row, definitions, obj);
+ populateValuesToPersistent(row, definitions, obj, fields);
}
return obj;
}
@@ -85,13 +87,14 @@ class AvroSerializer<K, T extends PersistentBase> extends CassandraSerializer {
AvroCassandraUtils.processKeys(mapping, key, fields, values);
for (Schema.Field f : persistentBase.getSchema().getFields()) {
String fieldName = f.name();
- if (mapping.getFieldFromFieldName(fieldName) == null) {
+ Field field = mapping.getFieldFromFieldName(fieldName);
+ if (field == null) {
LOG.debug("Ignoring {} adding field, {} field can't find in {} mapping", new Object[]{fieldName, fieldName, persistentClass});
continue;
}
- if (persistent.isDirty(f.pos()) || mapping.getInlinedDefinedPartitionKeys().contains(mapping.getFieldFromFieldName(fieldName))) {
+ if (persistent.isDirty(f.pos()) || mapping.getInlinedDefinedPartitionKey().equals(mapping.getFieldFromFieldName(fieldName))) {
Object value = persistentBase.get(f.pos());
- value = AvroCassandraUtils.getFieldValueFromAvroBean(f.schema(), f.schema().getType(), value);
+ value = AvroCassandraUtils.getFieldValueFromAvroBean(f.schema(), f.schema().getType(), value, field);
values.add(value);
fields.add(fieldName);
}
@@ -120,7 +123,7 @@ class AvroSerializer<K, T extends PersistentBase> extends CassandraSerializer {
if (iterator.hasNext()) {
obj = cassandraDataStore.newPersistent();
Row row = iterator.next();
- populateValuesToPersistent(row, definitions, obj);
+ populateValuesToPersistent(row, definitions, obj, mapping.getFieldNames());
}
return obj;
}
@@ -128,104 +131,129 @@ class AvroSerializer<K, T extends PersistentBase> extends CassandraSerializer {
/**
* This method wraps result set data in to DataEntry and creates a list of DataEntry.
**/
- private void populateValuesToPersistent(Row row, ColumnDefinitions columnDefinitions, PersistentBase base) {
-
+ private void populateValuesToPersistent(Row row, ColumnDefinitions columnDefinitions, PersistentBase base, String[] fields) {
Object paramValue;
- for (Schema.Field avrofield : base.getSchema().getFields()) {
-
- Field field = mapping.getFieldFromFieldName(avrofield.name());
+ for (String fieldName : fields) {
+ Schema.Field avroField = base.getSchema().getField(fieldName);
+ Field field = mapping.getFieldFromFieldName(fieldName);
//to ignore unspecified fields in the mapping
- if (field == null) {
+ if (field == null || avroField == null) {
continue;
}
- Schema fieldSchema = avrofield.schema();
+ Schema fieldSchema = avroField.schema();
String columnName = field.getColumnName();
- DataType columnType = columnDefinitions.getType(columnName);
-
- switch (columnType.getName()) {
- case ASCII:
- paramValue = row.getString(columnName);
- break;
- case BIGINT:
- paramValue = row.isNull(columnName) ? null : row.getLong(columnName);
- break;
- case BLOB:
- paramValue = row.isNull(columnName) ? null : row.getBytes(columnName);
- break;
- case BOOLEAN:
- paramValue = row.isNull(columnName) ? null : row.getBool(columnName);
- break;
- case COUNTER:
- paramValue = row.isNull(columnName) ? null : row.getLong(columnName);
- break;
- case DECIMAL:
- paramValue = row.isNull(columnName) ? null : row.getDecimal(columnName);
- break;
- case DOUBLE:
- paramValue = row.isNull(columnName) ? null : row.getDouble(columnName);
- break;
- case FLOAT:
- paramValue = row.isNull(columnName) ? null : row.getFloat(columnName);
- break;
- case INET:
- paramValue = row.isNull(columnName) ? null : row.getInet(columnName).toString();
- break;
- case INT:
- paramValue = row.isNull(columnName) ? null : row.getInt(columnName);
- break;
- case TEXT:
- paramValue = row.getString(columnName);
- break;
- case TIMESTAMP:
- paramValue = row.isNull(columnName) ? null : row.getDate(columnName);
- break;
- case UUID:
- paramValue = row.isNull(columnName) ? null : row.getUUID(columnName);
- break;
- case VARCHAR:
- paramValue = row.getString(columnName);
- break;
- case VARINT:
- paramValue = row.isNull(columnName) ? null : row.getVarint(columnName);
- break;
- case TIMEUUID:
- paramValue = row.isNull(columnName) ? null : row.getUUID(columnName);
- break;
- case LIST:
- String dataType = field.getType();
- dataType = dataType.substring(dataType.indexOf("<") + 1, dataType.indexOf(">"));
- paramValue = row.isNull(columnName) ? null : row.getList(columnName, getRelevantClassForCassandraDataType(dataType));
- break;
- case SET:
- dataType = field.getType();
- dataType = dataType.substring(dataType.indexOf("<") + 1, dataType.indexOf(">"));
- paramValue = row.isNull(columnName) ? null : row.getList(columnName, getRelevantClassForCassandraDataType(dataType));
- break;
- case MAP:
- dataType = field.getType();
- dataType = dataType.substring(dataType.indexOf("<") + 1, dataType.indexOf(">"));
- dataType = dataType.split(",")[1];
- // Avro supports only String for keys
- paramValue = row.isNull(columnName) ? null : row.getMap(columnName, String.class, getRelevantClassForCassandraDataType(dataType));
- break;
- case UDT:
- paramValue = row.isNull(columnName) ? null : row.getUDTValue(columnName).toString();
- break;
- case TUPLE:
- paramValue = row.isNull(columnName) ? null : row.getTupleValue(columnName).toString();
- break;
- case CUSTOM:
- paramValue = row.isNull(columnName) ? null : row.getBytes(columnName);
- break;
- default:
- paramValue = row.getString(columnName);
- break;
- }
+ paramValue = getValue(row, columnDefinitions, columnName);
Object value = AvroCassandraUtils.getAvroFieldValue(paramValue, fieldSchema);
- base.put(avrofield.pos(), value);
+ base.put(avroField.pos(), value);
+ }
+ }
+
+ private Object getValue(Row row, ColumnDefinitions columnDefinitions, String columnName) {
+ Object paramValue;
+ Field field = mapping.getFieldFromColumnName(columnName);
+ DataType columnType = columnDefinitions.getType(columnName);
+ switch (columnType.getName()) {
+ case ASCII:
+ paramValue = row.getString(columnName);
+ break;
+ case BIGINT:
+ paramValue = row.isNull(columnName) ? null : row.getLong(columnName);
+ break;
+ case BLOB:
+ paramValue = row.isNull(columnName) ? null : row.getBytes(columnName);
+ break;
+ case BOOLEAN:
+ paramValue = row.isNull(columnName) ? null : row.getBool(columnName);
+ break;
+ case COUNTER:
+ paramValue = row.isNull(columnName) ? null : row.getLong(columnName);
+ break;
+ case DECIMAL:
+ paramValue = row.isNull(columnName) ? null : row.getDecimal(columnName);
+ break;
+ case DOUBLE:
+ paramValue = row.isNull(columnName) ? null : row.getDouble(columnName);
+ break;
+ case FLOAT:
+ paramValue = row.isNull(columnName) ? null : row.getFloat(columnName);
+ break;
+ case INET:
+ paramValue = row.isNull(columnName) ? null : row.getInet(columnName).toString();
+ break;
+ case INT:
+ paramValue = row.isNull(columnName) ? null : row.getInt(columnName);
+ break;
+ case TEXT:
+ paramValue = row.getString(columnName);
+ break;
+ case TIMESTAMP:
+ paramValue = row.isNull(columnName) ? null : row.getDate(columnName);
+ break;
+ case UUID:
+ paramValue = row.isNull(columnName) ? null : row.getUUID(columnName);
+ break;
+ case VARCHAR:
+ paramValue = row.getString(columnName);
+ break;
+ case VARINT:
+ paramValue = row.isNull(columnName) ? null : row.getVarint(columnName);
+ break;
+ case TIMEUUID:
+ paramValue = row.isNull(columnName) ? null : row.getUUID(columnName);
+ break;
+ case LIST:
+ String dataType = field.getType();
+ dataType = dataType.substring(dataType.indexOf("<") + 1, dataType.indexOf(">"));
+ paramValue = row.isNull(columnName) ? null : row.getList(columnName, getRelevantClassForCassandraDataType(dataType));
+ break;
+ case SET:
+ dataType = field.getType();
+ dataType = dataType.substring(dataType.indexOf("<") + 1, dataType.indexOf(">"));
+ paramValue = row.isNull(columnName) ? null : row.getList(columnName, getRelevantClassForCassandraDataType(dataType));
+ break;
+ case MAP:
+ dataType = field.getType();
+ dataType = dataType.substring(dataType.indexOf("<") + 1, dataType.indexOf(">"));
+ dataType = dataType.split(",")[1];
+ // Avro supports only String for keys
+ paramValue = row.isNull(columnName) ? null : row.getMap(columnName, String.class, getRelevantClassForCassandraDataType(dataType));
+ break;
+ case UDT:
+ paramValue = row.isNull(columnName) ? null : row.getUDTValue(columnName);
+ break;
+ case TUPLE:
+ paramValue = row.isNull(columnName) ? null : row.getTupleValue(columnName).toString();
+ break;
+ case CUSTOM:
+ paramValue = row.isNull(columnName) ? null : row.getBytes(columnName);
+ break;
+ default:
+ paramValue = row.getString(columnName);
+ break;
}
+ return paramValue;
}
+/* public Collection<Object> getFieldValues(Object o) {
+ UDTValue udtValue = (UDTValue) o;
+ UserType type = udtValue.getType();
+
+ Collection<Object> values = new ArrayList<Object>(type.size());
+
+ *//* for (UserType.Field field : type) {
+ udtValue.
+ ByteBuffer bytes = udtValue.getBytesUnsafe(field.getName());
+ DataType value = field.getType();
+ for(DataType type1 : value.getTypeArguments()) {
+ type1.
+ }
+ values.add(value);
+ }*//*
+
+ return values;
+ }*/
+
+
private Class getRelevantClassForCassandraDataType(String dataType) {
switch (dataType) {
//// TODO: 7/25/17 support all the datatypes
@@ -254,8 +282,14 @@ class AvroSerializer<K, T extends PersistentBase> extends CassandraSerializer {
@Override
public Result execute(DataStore dataStore, Query query) {
List<Object> objectArrayList = new ArrayList<>();
+ String[] fields = query.getFields();
+ if (fields != null) {
+ fields = (String[]) ArrayUtils.addAll(fields, mapping.getAllKeys());
+ } else {
+ fields = mapping.getAllFieldsIncludingKeys();
+ }
CassandraResultSet<K, T> cassandraResult = new CassandraResultSet<>(dataStore, query);
- String cqlQuery = CassandraQueryFactory.getExecuteQuery(mapping, query, objectArrayList);
+ String cqlQuery = CassandraQueryFactory.getExecuteQuery(mapping, query, objectArrayList, fields);
ResultSet results;
if (objectArrayList.size() == 0) {
results = client.getSession().execute(cqlQuery);
@@ -266,12 +300,18 @@ class AvroSerializer<K, T extends PersistentBase> extends CassandraSerializer {
ColumnDefinitions definitions = results.getColumnDefinitions();
T obj;
K keyObject;
+ CassandraKey cassandraKey = mapping.getCassandraKey();
while (iterator.hasNext()) {
Row row = iterator.next();
obj = cassandraDataStore.newPersistent();
keyObject = cassandraDataStore.newKey();
- populateValuesToPersistent(row, definitions, obj);
- populateValuesToPersistent(row, definitions, (PersistentBase) keyObject);
+ populateValuesToPersistent(row, definitions, obj, fields);
+ if (cassandraKey != null) {
+ populateValuesToPersistent(row, definitions, (PersistentBase) keyObject, cassandraKey.getFieldNames());
+ } else {
+ Field key = mapping.getInlinedDefinedPartitionKey();
+ keyObject = (K) getValue(row, definitions, key.getColumnName());
+ }
cassandraResult.addResultElement(keyObject, obj);
}
return cassandraResult;
http://git-wip-us.apache.org/repos/asf/gora/blob/962d7a6a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraQueryFactory.java
----------------------------------------------------------------------
diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraQueryFactory.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraQueryFactory.java
index 4362a04..10c8f68 100644
--- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraQueryFactory.java
+++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraQueryFactory.java
@@ -36,8 +36,8 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
-import java.util.Locale;
import java.util.Map;
+import java.util.Set;
/**
* This class is used create Cassandra Queries.
@@ -97,29 +97,17 @@ class CassandraQueryFactory {
stringBuffer.append("CREATE TABLE IF NOT EXISTS ").append(mapping.getKeySpace().getName()).append(".").append(mapping.getCoreName()).append(" (");
boolean isCommaNeeded = false;
CassandraKey cassandraKey = mapping.getCassandraKey();
- // appending Cassandra key columns into db schema
- for (Field field : mapping.getFieldList()) {
- if (isCommaNeeded) {
- stringBuffer.append(", ");
- }
- stringBuffer.append(field.getColumnName()).append(" ").append(field.getType());
- boolean isStaticColumn = Boolean.parseBoolean(field.getProperty("static"));
- boolean isPrimaryKey = Boolean.parseBoolean(field.getProperty("primarykey"));
- if (isStaticColumn) {
- stringBuffer.append(" STATIC");
- }
- if (isPrimaryKey) {
- stringBuffer.append(" PRIMARY KEY ");
- }
- isCommaNeeded = true;
- }
+ // appending Cassandra Persistent columns into db schema
+ processFieldsForCreateTableQuery(mapping.getFieldList(), isCommaNeeded, stringBuffer);
if (cassandraKey != null) {
- List<PartitionKeyField> pkey = cassandraKey.getPartitionKeyFields();
- if (pkey != null) {
+ isCommaNeeded = true;
+ processFieldsForCreateTableQuery(cassandraKey.getFieldList(), isCommaNeeded, stringBuffer);
+ List<PartitionKeyField> partitionKeys = cassandraKey.getPartitionKeyFields();
+ if (partitionKeys != null) {
stringBuffer.append(", PRIMARY KEY (");
boolean isCommaNeededToApply = false;
- for (PartitionKeyField keyField : pkey) {
+ for (PartitionKeyField keyField : partitionKeys) {
if (isCommaNeededToApply) {
stringBuffer.append(",");
}
@@ -141,11 +129,6 @@ class CassandraQueryFactory {
}
stringBuffer.append(")");
}
- } else {
- if (!stringBuffer.toString().toLowerCase(Locale.ENGLISH).contains("primary key")) {
- Field field = mapping.getDefaultCassandraKey();
- stringBuffer.append(", ").append(field.getFieldName()).append(" ").append(field.getType()).append(" PRIMARY KEY ");
- }
}
stringBuffer.append(")");
@@ -191,6 +174,24 @@ class CassandraQueryFactory {
return stringBuffer.toString();
}
+ private static void processFieldsForCreateTableQuery(List<Field> fields, boolean isCommaNeeded, StringBuilder stringBuilder) {
+ for (Field field : fields) {
+ if (isCommaNeeded) {
+ stringBuilder.append(", ");
+ }
+ stringBuilder.append(field.getColumnName()).append(" ").append(field.getType());
+ boolean isStaticColumn = Boolean.parseBoolean(field.getProperty("static"));
+ boolean isPrimaryKey = Boolean.parseBoolean(field.getProperty("primarykey"));
+ if (isStaticColumn) {
+ stringBuilder.append(" STATIC");
+ }
+ if (isPrimaryKey) {
+ stringBuilder.append(" PRIMARY KEY ");
+ }
+ isCommaNeeded = true;
+ }
+ }
+
/**
* This method returns the CQL query to drop table.
* refer : http://docs.datastax.com/en/cql/3.1/cql/cql_reference/drop_table_r.html
@@ -221,7 +222,7 @@ class CassandraQueryFactory {
* @return CQL query
*/
static String getTruncateTableQuery(CassandraMapping mapping) {
- return "TRUNCATE TABLE " + mapping.getKeySpace().getName() + "." + mapping.getCoreName();
+ return QueryBuilder.truncate(mapping.getKeySpace().getName(), mapping.getCoreName()).getQueryString();
}
/**
@@ -371,9 +372,7 @@ class CassandraQueryFactory {
* @param objects object list
* @return CQL Query
*/
- static String getExecuteQuery(CassandraMapping mapping, Query cassandraQuery, List<Object> objects) {
- String[] fields = cassandraQuery.getFields();
- fields = fields != null ? fields : mapping.getFieldNames();
+ static String getExecuteQuery(CassandraMapping mapping, Query cassandraQuery, List<Object> objects, String[] fields) {
Object startKey = cassandraQuery.getStartKey();
Object endKey = cassandraQuery.getEndKey();
Object key = cassandraQuery.getKey();
@@ -470,14 +469,17 @@ class CassandraQueryFactory {
ArrayList<String> columnNames = new ArrayList<>();
for (String field : fields) {
Field fieldBean = mapping.getFieldFromFieldName(field);
+ CassandraKey cassandraKey = mapping.getCassandraKey();
+ Field keyBean = null;
+ if (cassandraKey != null) {
+ keyBean = cassandraKey.getFieldFromFieldName(field);
+ }
if (fieldBean != null) {
columnNames.add(fieldBean.getColumnName());
+ } else if (keyBean != null) {
+ columnNames.add(keyBean.getColumnName());
} else {
- if (mapping.getDefaultCassandraKey().getFieldName().equals(field)) {
- columnNames.add(field);
- } else {
- LOG.warn("{} field is ignored, couldn't find relavant field in the persistent mapping", field);
- }
+ LOG.warn("{} field is ignored, couldn't find relevant field in the persistent mapping", field);
}
}
return columnNames.toArray(new String[0]);
@@ -504,7 +506,7 @@ class CassandraQueryFactory {
*/
static String getDeleteByQuery(CassandraMapping mapping, Query cassandraQuery, List<Object> objects) {
String[] columns = null;
- if (!Arrays.equals(cassandraQuery.getFields(), mapping.getFieldNames())) {
+ if (cassandraQuery.getFields() != null) {
columns = getColumnNames(mapping, Arrays.asList(cassandraQuery.getFields()));
}
Object startKey = cassandraQuery.getStartKey();
@@ -549,17 +551,17 @@ class CassandraQueryFactory {
String[] columnKeys = getColumnNames(mapping, cassandraKeys);
for (int i = 0; i < cassandraKeys.size(); i++) {
if (isWhereNeeded) {
- query = delete.where(QueryBuilder.gte(columnKeys[i], "?"));
+ query = delete.where(QueryBuilder.gte(QueryBuilder.token(columnKeys[i]), QueryBuilder.token("?")));
objects.add(cassandraValues.get(i));
isWhereNeeded = false;
} else {
- query = query.and(QueryBuilder.gte(columnKeys[i], "?"));
+ query = query.and(QueryBuilder.gte(QueryBuilder.token(columnKeys[i]), QueryBuilder.token("?")));
objects.add(cassandraValues.get(i));
}
}
} else {
primaryKey = getPKey(mapping.getFieldList());
- query = delete.where(QueryBuilder.gte(primaryKey, "?"));
+ query = delete.where(QueryBuilder.gte(QueryBuilder.token(primaryKey), QueryBuilder.token("?")));
objects.add(startKey);
isWhereNeeded = false;
}
@@ -572,20 +574,20 @@ class CassandraQueryFactory {
String[] columnKeys = getColumnNames(mapping, cassandraKeys);
for (int i = 0; i < cassandraKeys.size(); i++) {
if (isWhereNeeded) {
- query = delete.where(QueryBuilder.lte(columnKeys[i], "?"));
+ query = delete.where(QueryBuilder.lte(QueryBuilder.token(columnKeys[i]), QueryBuilder.token("?")));
objects.add(cassandraValues.get(i));
isWhereNeeded = false;
} else {
- query = query.and(QueryBuilder.lte(columnKeys[i], "?"));
+ query = query.and(QueryBuilder.lte(QueryBuilder.token(columnKeys[i]), QueryBuilder.token("?")));
objects.add(cassandraValues.get(i));
}
}
} else {
primaryKey = primaryKey != null ? primaryKey : getPKey(mapping.getFieldList());
if (isWhereNeeded) {
- query = delete.where(QueryBuilder.lte(primaryKey, "?"));
+ query = delete.where(QueryBuilder.lte(QueryBuilder.token(primaryKey), QueryBuilder.token("?")));
} else {
- query = query.and(QueryBuilder.lte(primaryKey, "?"));
+ query = query.and(QueryBuilder.lte(QueryBuilder.token(primaryKey), QueryBuilder.token("?")));
}
objects.add(endKey);
}
@@ -611,7 +613,7 @@ class CassandraQueryFactory {
Update.Assignments updateAssignments = null;
if (cassandraQuery instanceof CassandraQuery) {
String[] columnNames = getColumnNames(mapping, Arrays.asList(cassandraQuery.getFields()));
- if(CassandraNativePersistent.class.isAssignableFrom(mapping.getPersistentClass())) {
+ if (CassandraNativePersistent.class.isAssignableFrom(mapping.getPersistentClass())) {
for (String column : columnNames) {
updateAssignments = update.with(QueryBuilder.set(column, "?"));
objects.add(((CassandraQuery) cassandraQuery).getUpdateFieldValue(mapping.getFieldFromColumnName(column).getFieldName()));
@@ -619,12 +621,12 @@ class CassandraQueryFactory {
} else {
for (String column : columnNames) {
updateAssignments = update.with(QueryBuilder.set(column, "?"));
- String field = mapping.getFieldFromColumnName(column).getFieldName();
- Object value = ((CassandraQuery) cassandraQuery).getUpdateFieldValue(field);
+ Field field = mapping.getFieldFromColumnName(column);
+ Object value = ((CassandraQuery) cassandraQuery).getUpdateFieldValue(field.getFieldName());
try {
Schema schema = (Schema) mapping.getPersistentClass().getField("SCHEMA$").get(null);
- Schema schemaField = schema.getField(field).schema();
- objects.add(AvroCassandraUtils.getFieldValueFromAvroBean(schemaField, schemaField.getType(), value));
+ Schema schemaField = schema.getField(field.getFieldName()).schema();
+ objects.add(AvroCassandraUtils.getFieldValueFromAvroBean(schemaField, schemaField.getType(), value, field));
} catch (IllegalAccessException | NoSuchFieldException e) {
throw new RuntimeException("SCHEMA$ field can't accessible, Please recompile the Avro schema with goracompiler.");
} catch (NullPointerException e) {
@@ -717,4 +719,111 @@ class CassandraQueryFactory {
return query.getQueryString();
}
+ /**
+ * This method returns create Type CQL query to create user define types.
+ * refer : http://docs.datastax.com/en/cql/3.1/cql/cql_reference/cqlRefcreateType.html
+ *
+ * @param fieldSchema avroSchema {@link Schema}
+ * @param mapping Cassandra mapping {@link CassandraMapping}
+ * @return CQL Query
+ */
+ static String getCreateUDTType(Schema fieldSchema, CassandraMapping mapping, Set<String> udtQueryStack) {
+ StringBuilder stringBuffer = new StringBuilder();
+ if (fieldSchema.getType().equals(Schema.Type.UNION)) {
+ for (Schema fieldTypeSchema : fieldSchema.getTypes()) {
+ if (fieldTypeSchema.getType().equals(Schema.Type.RECORD)) {
+ fieldSchema = fieldTypeSchema;
+ break;
+ }
+ }
+ }
+ stringBuffer.append("CREATE TYPE IF NOT EXISTS ").append(mapping.getKeySpace().getName()).append(".").append(fieldSchema.getName()).append(" (");
+ processRecord(fieldSchema, stringBuffer, mapping, udtQueryStack);
+ stringBuffer.append(")");
+ return stringBuffer.toString();
+ }
+
+ private static void processRecord(Schema recordSchema, StringBuilder stringBuilder, CassandraMapping mapping, Set<String> udtQueryStack) {
+ boolean isCommaNeeded = false;
+ for (Schema.Field field : recordSchema.getFields()) {
+ if (isCommaNeeded) {
+ stringBuilder.append(", ");
+ }
+ String fieldName = field.name();
+ stringBuilder.append(fieldName).append(" ");
+ try {
+ populateFieldsToQuery(field.schema(), stringBuilder, mapping, udtQueryStack);
+ isCommaNeeded = true;
+ } catch (Exception e) {
+ int i = stringBuilder.indexOf(fieldName);
+ if (i != -1) {
+ stringBuilder.delete(i, i + fieldName.length());
+ isCommaNeeded = false;
+ }
+ }
+ }
+ }
+
+ private static void populateFieldsToQuery(Schema schema, StringBuilder builder, CassandraMapping mapping, Set<String> udtQueryStack) throws Exception {
+ switch (schema.getType()) {
+ case INT:
+ builder.append("int");
+ break;
+ case MAP:
+ builder.append("map<text,");
+ populateFieldsToQuery(schema.getValueType(), builder, mapping, udtQueryStack);
+ builder.append(">");
+ break;
+ case ARRAY:
+ builder.append("list<");
+ populateFieldsToQuery(schema.getElementType(), builder, mapping, udtQueryStack);
+ builder.append(">");
+ break;
+ case LONG:
+ builder.append("bigint");
+ break;
+ case FLOAT:
+ builder.append("float");
+ break;
+ case DOUBLE:
+ builder.append("double");
+ break;
+ case BOOLEAN:
+ builder.append("boolean");
+ break;
+ case BYTES:
+ builder.append("blob");
+ break;
+ case RECORD:
+ builder.append("frozen<").append(schema.getName()).append(">");
+ String query = getCreateUDTType(schema, mapping, udtQueryStack);
+ udtQueryStack.add(query);
+ break;
+ case STRING:
+ case FIXED:
+ case ENUM:
+ builder.append("text");
+ break;
+ case UNION:
+ for (Schema unionElementSchema : schema.getTypes()) {
+ if (unionElementSchema.getType().equals(Schema.Type.RECORD)) {
+ String recordName = unionElementSchema.getName();
+ if (!builder.toString().contains(recordName)) {
+ builder.append("frozen<").append(recordName).append(">");
+ query = getCreateUDTType(unionElementSchema, mapping, udtQueryStack);
+ udtQueryStack.add(query);
+ } else {
+ LOG.warn("Same Field Type can't be mapped recursively. This is not supported with Cassandra UDT types, Please use byte dataType for recursive mapping.");
+ throw new Exception("Same Field Type has mapped recursively");
+ }
+ break;
+ } else if (!unionElementSchema.getType().equals(Schema.Type.NULL)) {
+ populateFieldsToQuery(unionElementSchema, builder, mapping, udtQueryStack);
+ break;
+ }
+ }
+ break;
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/gora/blob/962d7a6a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraSerializer.java
----------------------------------------------------------------------
diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraSerializer.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraSerializer.java
index ac4da42..17e0568 100644
--- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraSerializer.java
+++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraSerializer.java
@@ -20,6 +20,7 @@ package org.apache.gora.cassandra.serializers;
import com.datastax.driver.core.KeyspaceMetadata;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.TableMetadata;
+import org.apache.avro.Schema;
import org.apache.gora.cassandra.bean.Field;
import org.apache.gora.cassandra.store.CassandraClient;
import org.apache.gora.cassandra.store.CassandraMapping;
@@ -32,22 +33,22 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
+import java.util.LinkedHashSet;
import java.util.List;
import java.util.Locale;
+import java.util.Set;
/**
* This is the abstract Cassandra Serializer class.
*/
public abstract class CassandraSerializer<K, T extends Persistent> {
- CassandraClient client;
-
+ private static final Logger LOG = LoggerFactory.getLogger(CassandraStore.class);
protected Class<K> keyClass;
protected Class<T> persistentClass;
protected CassandraMapping mapping;
-
- private static final Logger LOG = LoggerFactory.getLogger(CassandraStore.class);
+ CassandraClient client;
CassandraSerializer(CassandraClient cc, Class<K> keyClass, Class<T> persistantClass, CassandraMapping mapping) {
this.keyClass = keyClass;
@@ -56,9 +57,35 @@ public abstract class CassandraSerializer<K, T extends Persistent> {
this.mapping = mapping;
}
+ /**
+ * This method returns the Cassandra Serializer according the Cassandra serializer property.
+ *
+ * @param cc Cassandra Client
+ * @param type Serialization type
+ * @param dataStore Cassandra DataStore
+ * @param mapping Cassandra Mapping
+ * @param <K> key class
+ * @param <T> persistent class
+ * @return Serializer
+ */
+ public static <K, T extends Persistent> CassandraSerializer getSerializer(CassandraClient cc, String type, final DataStore<K, T> dataStore, CassandraMapping mapping) {
+ CassandraStore.SerializerType serType = type.isEmpty() ? CassandraStore.SerializerType.NATIVE : CassandraStore.SerializerType.valueOf(type.toUpperCase(Locale.ENGLISH));
+ CassandraSerializer serializer;
+ switch (serType) {
+ case AVRO:
+ serializer = new AvroSerializer(cc, dataStore, mapping);
+ break;
+ case NATIVE:
+ default:
+ serializer = new NativeSerializer(cc, dataStore.getKeyClass(), dataStore.getPersistentClass(), mapping);
+ }
+ return serializer;
+ }
+
public void createSchema() {
LOG.debug("creating Cassandra keyspace {}", mapping.getKeySpace().getName());
this.client.getSession().execute(CassandraQueryFactory.getCreateKeySpaceQuery(mapping));
+ processUDTSchemas(); //TODO complete functionality
LOG.debug("creating Cassandra column family / table {}", mapping.getCoreName());
this.client.getSession().execute(CassandraQueryFactory.getCreateTableQuery(mapping));
}
@@ -89,29 +116,30 @@ public abstract class CassandraSerializer<K, T extends Persistent> {
}
}
- /**
- * This method returns the Cassandra Serializer according the Cassandra serializer property.
- *
- * @param cc Cassandra Client
- * @param type Serialization type
- * @param dataStore Cassandra DataStore
- * @param mapping Cassandra Mapping
- * @param <K> key class
- * @param <T> persistent class
- * @return Serializer
- */
- public static <K, T extends Persistent> CassandraSerializer getSerializer(CassandraClient cc, String type, final DataStore<K,T> dataStore, CassandraMapping mapping) {
- CassandraStore.SerializerType serType = type.isEmpty() ? CassandraStore.SerializerType.NATIVE : CassandraStore.SerializerType.valueOf(type.toUpperCase(Locale.ENGLISH));
- CassandraSerializer serializer;
- switch (serType) {
- case AVRO:
- serializer = new AvroSerializer(cc, dataStore, mapping);
- break;
- case NATIVE:
- default:
- serializer = new NativeSerializer(cc, dataStore.getKeyClass(), dataStore.getPersistentClass(), mapping);
+ private void processUDTSchemas() {
+ Set<String> schemaStack = new LinkedHashSet<>();
+ for (Field field : mapping.getFieldList()) {
+ if (field.getType().contains("frozen")) {
+ try {
+ Schema schema = (Schema) mapping.getPersistentClass().getField("SCHEMA$").get(null);
+ Schema schemaField = schema.getField(field.getFieldName()).schema();
+ String cqlQuery = CassandraQueryFactory.getCreateUDTType(schemaField, mapping, schemaStack);
+ schemaStack.add(cqlQuery);
+ } catch (IllegalAccessException | NoSuchFieldException e) {
+ throw new RuntimeException("SCHEMA$ field can't accessible, Please recompile the Avro schema with goracompiler.");
+ } catch (NullPointerException e) {
+ throw new RuntimeException(field + " field couldn't find in the class " + mapping.getPersistentClass() + ".");
+ }
+ }
+ }
+ createUserDefineTypes(schemaStack);
+
+ }
+
+ private void createUserDefineTypes(Set<String> queries) {
+ for (String cqlQuery : queries) {
+ this.client.getSession().execute(cqlQuery);
}
- return serializer;
}
protected String[] getFields() {
@@ -146,14 +174,22 @@ public abstract class CassandraSerializer<K, T extends Persistent> {
public long deleteByQuery(Query query) {
List<Object> objectArrayList = new ArrayList<>();
- String cqlQuery = CassandraQueryFactory.getDeleteByQuery(mapping, query, objectArrayList);
- ResultSet results;
- if (objectArrayList.size() == 0) {
- results = client.getSession().execute(cqlQuery);
+ if (query.getKey() == null && query.getEndKey() == null && query.getStartKey() == null) {
+ if (query.getFields() == null) {
+ client.getSession().execute(CassandraQueryFactory.getTruncateTableQuery(mapping));
+ } else {
+ LOG.error("Delete by Query is not supported for the Queries which didn't specify Query keys with fields.");
+ }
} else {
- results = client.getSession().execute(cqlQuery, objectArrayList.toArray());
+ String cqlQuery = CassandraQueryFactory.getDeleteByQuery(mapping, query, objectArrayList);
+ ResultSet results;
+ if (objectArrayList.size() == 0) {
+ results = client.getSession().execute(cqlQuery);
+ } else {
+ results = client.getSession().execute(cqlQuery, objectArrayList.toArray());
+ }
+ LOG.debug("Delete by Query was applied : " + results.wasApplied());
}
- LOG.debug("Delete by Query was applied : " + results.wasApplied());
LOG.info("Delete By Query method doesn't return the deleted element count.");
return 0;
}
http://git-wip-us.apache.org/repos/asf/gora/blob/962d7a6a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/NativeSerializer.java
----------------------------------------------------------------------
diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/NativeSerializer.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/NativeSerializer.java
index 4695498..f8bb066 100644
--- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/NativeSerializer.java
+++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/NativeSerializer.java
@@ -21,6 +21,7 @@ import com.datastax.driver.core.ResultSet;
import com.datastax.driver.mapping.Mapper;
import com.datastax.driver.mapping.MappingManager;
import com.datastax.driver.mapping.Result;
+import org.apache.commons.lang.ArrayUtils;
import org.apache.gora.cassandra.bean.Field;
import org.apache.gora.cassandra.persistent.CassandraNativePersistent;
import org.apache.gora.cassandra.query.CassandraResultSet;
@@ -46,16 +47,23 @@ class NativeSerializer<K, T extends CassandraNativePersistent> extends Cassandra
private Mapper<T> mapper;
+ NativeSerializer(CassandraClient cassandraClient, Class<K> keyClass, Class<T> persistentClass, CassandraMapping mapping) {
+ super(cassandraClient, keyClass, persistentClass, mapping);
+ this.createSchema();
+ MappingManager mappingManager = new MappingManager(cassandraClient.getSession());
+ mapper = mappingManager.mapper(persistentClass);
+ }
+
@Override
public void put(Object key, Persistent value) {
- LOG.debug("Object is saved with key : {} and value : {}",key,value);
+ LOG.debug("Object is saved with key : {} and value : {}", key, value);
mapper.save((T) value);
}
@Override
public T get(Object key) {
T object = mapper.get(key);
- if(object != null) {
+ if (object != null) {
LOG.debug("Object is found for key : {}", key);
} else {
LOG.debug("Object is not found for key : {}", key);
@@ -72,7 +80,7 @@ class NativeSerializer<K, T extends CassandraNativePersistent> extends Cassandra
@Override
public Persistent get(Object key, String[] fields) {
- if(fields == null) {
+ if (fields == null) {
fields = getFields();
}
String cqlQuery = CassandraQueryFactory.getSelectObjectWithFieldsQuery(mapping, fields);
@@ -83,15 +91,21 @@ class NativeSerializer<K, T extends CassandraNativePersistent> extends Cassandra
LOG.debug("Object is found for key : {}", key);
return objectList.get(0);
}
- LOG.debug("Object is not found for key : {}" , key);
+ LOG.debug("Object is not found for key : {}", key);
return null;
}
@Override
public org.apache.gora.query.Result execute(DataStore dataStore, Query query) {
List<Object> objectArrayList = new ArrayList<>();
+ String[] fields = query.getFields();
+ if (fields != null) {
+ fields = (String[]) ArrayUtils.addAll(fields, mapping.getAllKeys());
+ } else {
+ fields = mapping.getAllFieldsIncludingKeys();
+ }
CassandraResultSet<K, T> cassandraResult = new CassandraResultSet<>(dataStore, query);
- String cqlQuery = CassandraQueryFactory.getExecuteQuery(mapping, query, objectArrayList);
+ String cqlQuery = CassandraQueryFactory.getExecuteQuery(mapping, query, objectArrayList, fields);
ResultSet results;
if (objectArrayList.size() == 0) {
results = client.getSession().execute(cqlQuery);
@@ -108,13 +122,6 @@ class NativeSerializer<K, T extends CassandraNativePersistent> extends Cassandra
return cassandraResult;
}
- NativeSerializer(CassandraClient cassandraClient, Class<K> keyClass, Class<T> persistentClass, CassandraMapping mapping) {
- super(cassandraClient, keyClass, persistentClass, mapping);
- this.createSchema();
- MappingManager mappingManager = new MappingManager(cassandraClient.getSession());
- mapper = mappingManager.mapper(persistentClass);
- }
-
private K getKey(T object) {
String keyField = null;
for (Field field : mapping.getFieldList()) {
http://git-wip-us.apache.org/repos/asf/gora/blob/962d7a6a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/package-info.java
----------------------------------------------------------------------
diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/package-info.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/package-info.java
index 5d22d94..ce1e3e7 100644
--- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/package-info.java
+++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/package-info.java
@@ -5,15 +5,16 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
/**
* This package contains Cassandra store related util classes for serializer.
*/
http://git-wip-us.apache.org/repos/asf/gora/blob/962d7a6a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java
----------------------------------------------------------------------
diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java
index 196f6a3..c973fe4 100644
--- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java
+++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.gora.cassandra.store;
import com.datastax.driver.core.Cluster;
@@ -53,6 +70,8 @@ public class CassandraClient {
private Cluster cluster;
+ private Session session;
+ private CassandraMapping mapping;
public Session getSession() {
return session;
@@ -62,11 +81,6 @@ public class CassandraClient {
return cluster;
}
- private Session session;
-
- private CassandraMapping mapping;
-
-
void initialize(Properties properties, CassandraMapping mapping) throws Exception {
Cluster.Builder builder = Cluster.builder();
List<String> codecs = readCustomCodec(properties);
http://git-wip-us.apache.org/repos/asf/gora/blob/962d7a6a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java
----------------------------------------------------------------------
diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java
index 5699355..ac46a30 100644
--- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java
+++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java
@@ -31,54 +31,80 @@ import java.util.Map;
*/
public class CassandraMapping {
+ private static final String PRIMARY_KEY = "primarykey";
private CassandraKey cassandraKey;
-
private Map<String, String> tableProperties;
-
private Class keyClass;
-
private Class persistentClass;
-
private KeySpace keySpace;
-
private List<Field> fieldList;
-
- private List<Field> inlinedDefinedPartitionKeys;
-
- private static final String PRIMARY_KEY = "primarykey";
-
+ private Field inlinedDefinedPartitionKey;
private String coreName;
+ /**
+ * Constructor of the class
+ */
+ CassandraMapping() {
+ this.fieldList = new ArrayList<>();
+ this.tableProperties = new HashMap<>();
+ }
+
+ /**
+ * This method returns the KeySpace in the mapping file,
+ * @return Key space {@link KeySpace}
+ */
public KeySpace getKeySpace() {
return keySpace;
}
- public void setKeySpace(KeySpace keySpace) {
+ /**
+ * This method set the KeySpace in the Cassandra mapping.
+ * @param keySpace Key space {@link KeySpace}
+ */
+ void setKeySpace(KeySpace keySpace) {
this.keySpace = keySpace;
}
+ /**
+ * Thi method returns only the fields which belongs only for the Persistent Object.
+ * @return List of Fields
+ */
public List<Field> getFieldList() {
return fieldList;
}
+ /**
+ * This method returns the Persistent Object's Field from the mapping, according to the FieldName.
+ * @param fieldName Field Name
+ * @return Field {@link Field}
+ */
public Field getFieldFromFieldName(String fieldName) {
for (Field field1 : fieldList) {
- if (field1.getFieldName().equals(fieldName)) {
+ if (field1.getFieldName().equalsIgnoreCase(fieldName)) {
return field1;
}
}
return null;
}
+ /**
+ * This method returns the Persistent Object's Field from the mapping, according to the ColumnName.
+ * @param columnName Column Name
+ * @return Field {@link Field}
+ */
public Field getFieldFromColumnName(String columnName) {
for (Field field1 : fieldList) {
- if (field1.getColumnName().equals(columnName)) {
+ if (field1.getColumnName().equalsIgnoreCase(columnName)) {
return field1;
}
}
return null;
}
+ /**
+ * This method returns the Field Names
+ * @return array of Field Names
+ */
public String[] getFieldNames() {
List<String> fieldNames = new ArrayList<>(fieldList.size());
for (Field field : fieldList) {
@@ -88,60 +114,79 @@ public class CassandraMapping {
return fieldNames.toArray(fieldNameArray);
}
+ /**
+ * This method returns
+ * @return
+ */
+ public String[] getAllFieldsIncludingKeys() {
+ List<String> fieldNames = new ArrayList<>(fieldList.size());
+ for (Field field : fieldList) {
+ fieldNames.add(field.getFieldName());
+ }
+ if (cassandraKey != null) {
+ for (Field field : cassandraKey.getFieldList()) {
+ fieldNames.add(field.getFieldName());
+ }
+ }
+ String[] fieldNameArray = new String[fieldNames.size()];
+ return fieldNames.toArray(fieldNameArray);
+ }
+
+ /**
+ *
+ * @return
+ */
+ public String[] getAllKeys() {
+ List<String> fieldNames = new ArrayList<>();
+ Field keyField = getInlinedDefinedPartitionKey();
+ if (cassandraKey != null) {
+ for (Field field : cassandraKey.getFieldList()) {
+ fieldNames.add(field.getFieldName());
+ }
+ } else {
+ fieldNames.add(keyField.getFieldName());
+ }
+ String[] fieldNameArray = new String[fieldNames.size()];
+ return fieldNames.toArray(fieldNameArray);
+ }
+
public CassandraKey getCassandraKey() {
return cassandraKey;
}
void setCassandraKey(CassandraKey cassandraKey) {
this.cassandraKey = cassandraKey;
- this.fieldList.addAll(cassandraKey.getFieldList());
}
- CassandraMapping() {
- this.fieldList = new ArrayList<>();
- this.tableProperties = new HashMap<>();
+ public String getCoreName() {
+ return coreName;
}
- public void setCoreName(String coreName) {
+ void setCoreName(String coreName) {
this.coreName = coreName;
}
- public String getCoreName() {
- return coreName;
- }
-
- public void addCassandraField(Field field) {
+ void addCassandraField(Field field) {
this.fieldList.add(field);
}
- public void addProperty(String key, String value) {
- this.tableProperties.put(key,value);
+ void addProperty(String key, String value) {
+ this.tableProperties.put(key, value);
}
public String getProperty(String key) {
return this.tableProperties.get(key);
}
- public Field getDefaultCassandraKey() {
+ private Field getDefaultCassandraKey() {
Field field = new Field();
field.setFieldName("defaultId");
field.setColumnName("defaultId");
- field.setType("text");
+ field.setType("varchar");
+ field.addProperty("primarykey", "true");
return field;
}
- public boolean isPartitionKeyDefined() {
- if (cassandraKey == null) {
- for (Field field : fieldList) {
- if (Boolean.parseBoolean(field.getProperty(PRIMARY_KEY))) {
- return true;
- }
- }
- return false;
- }
- return true;
- }
-
public Class getKeyClass() {
return keyClass;
}
@@ -154,21 +199,38 @@ public class CassandraMapping {
return persistentClass;
}
- public void setPersistentClass(Class persistentClass) {
+ void setPersistentClass(Class persistentClass) {
this.persistentClass = persistentClass;
}
- public List<Field> getInlinedDefinedPartitionKeys() {
- if(inlinedDefinedPartitionKeys != null) {
- return inlinedDefinedPartitionKeys;
+ /**
+ * This method return the Inlined defined Partition Key,
+ * If there isn't any inlined define partition keys,
+ * this method returns default predefined partition key "defaultId".
+ *
+ * @return Partition Key {@link Field}
+ */
+ public Field getInlinedDefinedPartitionKey() {
+ if (inlinedDefinedPartitionKey != null) {
+ return inlinedDefinedPartitionKey;
} else {
- inlinedDefinedPartitionKeys = new ArrayList<>();
for (Field field : fieldList) {
if (Boolean.parseBoolean(field.getProperty(PRIMARY_KEY))) {
- inlinedDefinedPartitionKeys.add(field);
+ inlinedDefinedPartitionKey = field;
+ break;
}
}
- return inlinedDefinedPartitionKeys;
+ if (inlinedDefinedPartitionKey == null) {
+ return getDefaultCassandraKey();
+ }
+ return inlinedDefinedPartitionKey;
+ }
+ }
+
+ void finalized() {
+ Field field = getInlinedDefinedPartitionKey();
+ if (!fieldList.contains(field) && cassandraKey == null) {
+ fieldList.add(field);
}
}
}
http://git-wip-us.apache.org/repos/asf/gora/blob/962d7a6a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraMappingBuilder.java
----------------------------------------------------------------------
diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraMappingBuilder.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraMappingBuilder.java
index c501cc5..f151458 100644
--- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraMappingBuilder.java
+++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraMappingBuilder.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.gora.cassandra.store;
import org.apache.gora.cassandra.bean.CassandraKey;
@@ -203,6 +220,7 @@ class CassandraMappingBuilder<K, T extends Persistent> {
} catch (Exception ex) {
throw new IOException(ex);
}
+ cassandraMapping.finalized();
return cassandraMapping;
}
@@ -219,6 +237,10 @@ class CassandraMappingBuilder<K, T extends Persistent> {
fieldKey.setColumnName(attributeValue);
break;
case "type":
+ // replace UDT into frozen
+ if (attributeValue.contains("udt(")) {
+ attributeValue = attributeValue.replace("udt(", "frozen(");
+ }
fieldKey.setType(attributeValue.replace("(", "<").replace(")", ">"));
break;
default:
@@ -229,8 +251,8 @@ class CassandraMappingBuilder<K, T extends Persistent> {
}
private int getReplicationFactor(Element element) {
- String value = element.getAttributeValue("replication_factor");
- if(value == null) {
+ String value = element.getAttributeValue("replication_factor");
+ if (value == null) {
return 1;
} else {
return Integer.parseInt(value);
http://git-wip-us.apache.org/repos/asf/gora/blob/962d7a6a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
----------------------------------------------------------------------
diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
index 74d3862..c481610 100644
--- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
+++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
@@ -17,11 +17,13 @@
package org.apache.gora.cassandra.store;
+import org.apache.avro.data.RecordBuilder;
import org.apache.gora.cassandra.persistent.CassandraNativePersistent;
import org.apache.gora.cassandra.query.CassandraQuery;
import org.apache.gora.cassandra.serializers.CassandraSerializer;
import org.apache.gora.persistency.BeanFactory;
import org.apache.gora.persistency.Persistent;
+import org.apache.gora.persistency.impl.PersistentBase;
import org.apache.gora.query.PartitionQuery;
import org.apache.gora.query.Query;
import org.apache.gora.query.Result;
@@ -58,16 +60,6 @@ public class CassandraStore<K, T extends Persistent> implements DataStore<K, T>
private CassandraSerializer cassandraSerializer;
- public enum SerializerType {
- AVRO("AVRO"), NATIVE("NATIVE");
- String val;
-
- SerializerType(String v) {
- this.val = v;
- }
- }
-
-
public CassandraStore() {
super();
}
@@ -96,6 +88,12 @@ public class CassandraStore<K, T extends Persistent> implements DataStore<K, T>
}
}
+ @SuppressWarnings("all")
+ @Override
+ public Class<T> getPersistentClass() {
+ return (Class<T>) this.persistentClass;
+ }
+
/**
* {@inheritDoc}
* <p>
@@ -110,12 +108,6 @@ public class CassandraStore<K, T extends Persistent> implements DataStore<K, T>
this.persistentClass = persistentClass;
}
- @SuppressWarnings("all")
- @Override
- public Class<T> getPersistentClass() {
- return (Class<T>) this.persistentClass;
- }
-
@Override
public String getSchemaName() {
return mapping.getCoreName();
@@ -148,7 +140,6 @@ public class CassandraStore<K, T extends Persistent> implements DataStore<K, T>
if (beanFactory != null) {
return beanFactory.newKey();
} else {
- LOG.warn("beanFactory is hasn't been initialized. It's recommended to initialize beanFactory.");
return keyClass.newInstance();
}
} catch (Exception ex) {
@@ -162,8 +153,10 @@ public class CassandraStore<K, T extends Persistent> implements DataStore<K, T>
try {
if (beanFactory != null) {
return this.beanFactory.newPersistent();
+ } else if (PersistentBase.class.isAssignableFrom(persistentClass)) {
+ RecordBuilder builder = (RecordBuilder) persistentClass.getMethod("newBuilder").invoke(null);
+ return (T) RecordBuilder.class.getMethod("build").invoke(builder);
} else {
- LOG.warn("beanFactory is hasn't been initialized. It's recommended to initialize beanFactory.");
return persistentClass.newInstance();
}
} catch (Exception ex) {
@@ -171,10 +164,6 @@ public class CassandraStore<K, T extends Persistent> implements DataStore<K, T>
}
}
- @Override
- public void setBeanFactory(BeanFactory<K, T> beanFactory) {
- this.beanFactory = beanFactory;
- }
@Override
public BeanFactory<K, T> getBeanFactory() {
@@ -182,6 +171,11 @@ public class CassandraStore<K, T extends Persistent> implements DataStore<K, T>
}
@Override
+ public void setBeanFactory(BeanFactory<K, T> beanFactory) {
+ this.beanFactory = beanFactory;
+ }
+
+ @Override
public void close() {
this.cassandraSerializer.close();
}
@@ -229,7 +223,6 @@ public class CassandraStore<K, T extends Persistent> implements DataStore<K, T>
@Override
public Query<K, T> newQuery() {
Query<K, T> query = new CassandraQuery(this);
- query.setFields(mapping.getFieldNames());
return query;
}
@@ -262,4 +255,13 @@ public class CassandraStore<K, T extends Persistent> implements DataStore<K, T>
return cassandraSerializer.schemaExists();
}
+ public enum SerializerType {
+ AVRO("AVRO"), NATIVE("NATIVE");
+ String val;
+
+ SerializerType(String v) {
+ this.val = v;
+ }
+ }
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/gora/blob/962d7a6a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraStoreParameters.java
----------------------------------------------------------------------
diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraStoreParameters.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraStoreParameters.java
index 95e1c0f..bb758f6 100644
--- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraStoreParameters.java
+++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraStoreParameters.java
@@ -28,13 +28,13 @@ public class CassandraStoreParameters {
*/
public static final String CASSANDRA_SERVERS = "gora.cassandrastore.cassandraServers";
/**
- *Property pointing to the Cassandra keyspace.
+ * Property pointing to the Cassandra keyspace.
* string
*/
public static final String KEYSPACE = "gora.cassandrastore.keyspace";
/**
- * Property pointing to the port to use to connect to the Cassandra hosts.
- * integer
+ * Property pointing to the port to use to connect to the Cassandra hosts.
+ * integer
*/
public static final String PORT = "gora.cassandrastore.port";
@@ -97,7 +97,7 @@ public class CassandraStoreParameters {
* Property pointing to set local host new connection threshold.
* integer
*/
- public static final String LOCAL_NEW_CONNECTION_THRESHOLD= "gora.cassandrastore.localNewConnectionThreshold";
+ public static final String LOCAL_NEW_CONNECTION_THRESHOLD = "gora.cassandrastore.localNewConnectionThreshold";
/**
* Property pointing to set remote host new connection threshold.
* integer
http://git-wip-us.apache.org/repos/asf/gora/blob/962d7a6a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/package-info.java
----------------------------------------------------------------------
diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/package-info.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/package-info.java
index e6d0176..2cd9003 100644
--- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/package-info.java
+++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/package-info.java
@@ -5,15 +5,16 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
/**
* This package contains all the Cassandra store related classes.
*/