You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gora.apache.org by al...@apache.org on 2011/07/22 02:34:02 UTC
svn commit: r1149420 - in /incubator/gora/trunk: gora-cassandra/ivy/
gora-cassandra/lib-ext/
gora-cassandra/src/main/java/org/apache/gora/cassandra/client/
gora-cassandra/src/main/java/org/apache/gora/cassandra/query/
gora-cassandra/src/main/java/org/a...
Author: alexis
Date: Fri Jul 22 00:33:59 2011
New Revision: 1149420
URL: http://svn.apache.org/viewvc?rev=1149420&view=rev
Log:
Cassandra 0.8 backend witch Hector client
Added:
incubator/gora/trunk/gora-cassandra/lib-ext/cassandra-all-0.8.0.jar (with props)
incubator/gora/trunk/gora-cassandra/lib-ext/hector-core-0.8.0-1.jar (with props)
incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraColumn.java
incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResultSet.java
incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraRow.java
incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSubColumn.java
incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSuperColumn.java
incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java
Removed:
incubator/gora/trunk/gora-cassandra/lib-ext/apache-cassandra-0.6.4.jar
incubator/gora/trunk/gora-cassandra/lib-ext/libthrift-r917130.jar
incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/client/
incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraColumn.java
Modified:
incubator/gora/trunk/gora-cassandra/ivy/ivy.xml
incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraPartitionQuery.java
incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraQuery.java
incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResult.java
incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java
incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/GoraRecordReader.java
Modified: incubator/gora/trunk/gora-cassandra/ivy/ivy.xml
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/ivy/ivy.xml?rev=1149420&r1=1149419&r2=1149420&view=diff
==============================================================================
--- incubator/gora/trunk/gora-cassandra/ivy/ivy.xml (original)
+++ incubator/gora/trunk/gora-cassandra/ivy/ivy.xml Fri Jul 22 00:33:59 2011
@@ -22,7 +22,7 @@
organisation="org.apache.gora"
module="gora-cassandra"
status="integration"/>
-
+
<configurations>
<include file="${project.dir}/ivy/ivy-configurations.xml"/>
</configurations>
@@ -32,15 +32,25 @@
<artifact name="gora-cassandra-test" conf="test"/>
</publications>
+
<dependencies>
<!-- conf="*->@" means every conf is mapped to the conf of the same name of the artifact-->
- <dependency org="org.apache.gora" name="gora-core" rev="latest.integration" changing="true" conf="*->@"/>
- <dependency org="org.jdom" name="jdom" rev="1.1" conf="*->master"/>
-
- <dependency org="org.slf4j" name="slf4j-jdk14" rev="1.5.8" conf="*->master"/>
- <dependency org="org.slf4j" name="slf4j-api" rev="1.5.8" conf="*->master"/>
-
- <dependency org="com.google.guava" name="guava" rev="r06"/>
+
+ <dependency org="org.apache.gora" name="gora-core" rev="latest.integration" changing="true" conf="*->@"/>
+
+ <dependency org="org.jdom" name="jdom" rev="1.1">
+ <exclude org="xerces" name="xercesImpl"/>
+ </dependency>
+
+ <!--
+ <dependency org="org.apache.cassandra" name="apache-cassandra" rev="0.8.1"/>
+ <dependency org="me.prettyprint" name="hector" rev="0.8.0-1"/>
+ -->
+ <dependency org="org.apache.cassandra" name="cassandra-thrift" rev="0.8.1"/>
+ <dependency org="com.ecyrd.speed4j" name="speed4j" rev="0.9" conf="*->*,!javadoc,!sources"/>
+ <dependency org="com.github.stephenc.high-scale-lib" name="high-scale-lib" rev="1.1.2" conf="*->*,!javadoc,!sources"/>
+ <dependency org="com.google.collections" name="google-collections" rev="1.0" conf="*->*,!javadoc,!sources"/>
+ <dependency org="com.google.guava" name="guava" rev="r09" conf="*->*,!javadoc,!sources"/>
<!-- test dependencies -->
Added: incubator/gora/trunk/gora-cassandra/lib-ext/cassandra-all-0.8.0.jar
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/lib-ext/cassandra-all-0.8.0.jar?rev=1149420&view=auto
==============================================================================
Binary file - no diff available.
Propchange: incubator/gora/trunk/gora-cassandra/lib-ext/cassandra-all-0.8.0.jar
------------------------------------------------------------------------------
svn:mime-type = application/octet-stream
Added: incubator/gora/trunk/gora-cassandra/lib-ext/hector-core-0.8.0-1.jar
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/lib-ext/hector-core-0.8.0-1.jar?rev=1149420&view=auto
==============================================================================
Binary file - no diff available.
Propchange: incubator/gora/trunk/gora-cassandra/lib-ext/hector-core-0.8.0-1.jar
------------------------------------------------------------------------------
svn:mime-type = application/octet-stream
Added: incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraColumn.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraColumn.java?rev=1149420&view=auto
==============================================================================
--- incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraColumn.java (added)
+++ incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraColumn.java Fri Jul 22 00:33:59 2011
@@ -0,0 +1,41 @@
+package org.apache.gora.cassandra.query;
+
+import org.apache.avro.Schema.Field;
+
+
+/**
+ * Represents a unit of data: a key value pair tagged by a family name
+ */
+public abstract class CassandraColumn {
+ public static final int SUB = 0;
+ public static final int SUPER = 1;
+
+ private String family;
+ private int type;
+ private Field field;
+
+ public String getFamily() {
+ return family;
+ }
+ public void setFamily(String family) {
+ this.family = family;
+ }
+ public int getType() {
+ return type;
+ }
+ public void setType(int type) {
+ this.type = type;
+ }
+ public void setField(Field field) {
+ this.field = field;
+ }
+
+ protected Field getField() {
+ return this.field;
+ }
+
+ public abstract String getName();
+ public abstract Object getValue();
+
+
+}
Modified: incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraPartitionQuery.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraPartitionQuery.java?rev=1149420&r1=1149419&r2=1149420&view=diff
==============================================================================
--- incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraPartitionQuery.java (original)
+++ incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraPartitionQuery.java Fri Jul 22 00:33:59 2011
@@ -1,93 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gora.cassandra.query;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.gora.persistency.Persistent;
-import org.apache.gora.query.Query;
-import org.apache.gora.query.impl.PartitionQueryImpl;
-import org.apache.hadoop.io.Text;
-
-public class CassandraPartitionQuery<K, T extends Persistent>
-extends PartitionQueryImpl<K, T> {
-
- private String startToken;
-
- private String endToken;
-
- private String[] endPoints;
-
- private int splitSize;
-
- public CassandraPartitionQuery() {
- this.dataStore = null;
- }
-
- public CassandraPartitionQuery(Query<K, T> baseQuery, String startToken, String endToken, String[] endPoints,
- int splitSize) {
- super(baseQuery);
- this.startToken = startToken;
- this.endToken = endToken;
- this.endPoints = endPoints;
- this.splitSize = splitSize;
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- super.write(out);
- Text.writeString(out, startToken);
- Text.writeString(out, endToken);
- out.writeInt(endPoints.length);
- for (String endPoint : endPoints) {
- Text.writeString(out, endPoint);
- }
- out.writeInt(splitSize);
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- super.readFields(in);
- startToken = Text.readString(in);
- endToken = Text.readString(in);
- int size = in.readInt();
- endPoints = new String[size];
- for (int i = 0; i < size; i++) {
- endPoints[i] = Text.readString(in);
- }
- splitSize = in.readInt();
- }
-
- public String getStartToken() {
- return startToken;
- }
-
- public String getEndToken() {
- return endToken;
- }
-
- public String[] getEndPoints() {
- return endPoints;
- }
-
- public int getSplitSize() {
- return splitSize;
- }
-}
Modified: incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraQuery.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraQuery.java?rev=1149420&r1=1149419&r2=1149420&view=diff
==============================================================================
--- incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraQuery.java (original)
+++ incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraQuery.java Fri Jul 22 00:33:59 2011
@@ -1,34 +1,55 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
package org.apache.gora.cassandra.query;
+import java.util.List;
+import java.util.Map;
+
import org.apache.gora.persistency.Persistent;
+import org.apache.gora.query.Query;
import org.apache.gora.query.impl.QueryBase;
import org.apache.gora.store.DataStore;
-public class CassandraQuery<K, T extends Persistent>
-extends QueryBase<K, T> {
+public class CassandraQuery<K, T extends Persistent> extends QueryBase<K, T> {
+ private Query<K, T> query;
+
+ /**
+ * Maps Avro fields to Cassandra columns.
+ */
+ private Map<String, List<String>> familyMap;
+
public CassandraQuery() {
super(null);
}
-
public CassandraQuery(DataStore<K, T> dataStore) {
super(dataStore);
}
+ public void setFamilyMap(Map<String, List<String>> familyMap) {
+ this.familyMap = familyMap;
+ }
+ public Map<String, List<String>> getFamilyMap() {
+ return familyMap;
+ }
+
+ /**
+ * @param family the family name
+ * @return an array of the query column names belonging to the family
+ */
+ public String[] getColumns(String family) {
+
+ List<String> columnList = familyMap.get(family);
+ String[] columns = new String[columnList.size()];
+ for (int i = 0; i < columns.length; ++i) {
+ columns[i] = columnList.get(i);
+ }
+ return columns;
+ }
+ public Query<K, T> getQuery() {
+ return query;
+ }
+ public void setQuery(Query<K, T> query) {
+ this.query = query;
+ }
+
+
+
}
Modified: incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResult.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResult.java?rev=1149420&r1=1149419&r2=1149420&view=diff
==============================================================================
--- incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResult.java (original)
+++ incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResult.java Fri Jul 22 00:33:59 2011
@@ -1,157 +1,97 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
package org.apache.gora.cassandra.query;
import java.io.IOException;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.Iterator;
-
-import org.apache.gora.cassandra.client.CassandraClient;
-import org.apache.gora.cassandra.client.Row;
-import org.apache.gora.cassandra.client.Select;
-import org.apache.gora.cassandra.store.CassandraStore;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
import org.apache.gora.persistency.Persistent;
import org.apache.gora.query.Query;
import org.apache.gora.query.impl.ResultBase;
import org.apache.gora.store.DataStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-public class CassandraResult<K, T extends Persistent>
-extends ResultBase<K, T> {
-
- private Iterator<Row> rowIter;
-
- private CassandraStore<K, T> store;
+public class CassandraResult<K, T extends Persistent> extends ResultBase<K, T> {
+ public static final Logger LOG = LoggerFactory.getLogger(CassandraResult.class);
+
+ private int rowNumber;
+
+ private CassandraResultSet cassandraResultSet;
+
+ /**
+ * Maps Cassandra columns to Avro fields.
+ */
+ private Map<String, String> reverseMap;
- private String[] fields;
-
- public CassandraResult(DataStore<K, T> dataStore, Query<K, T> query,
- int batchRowCount) throws IOException {
+ public CassandraResult(DataStore<K, T> dataStore, Query<K, T> query) {
super(dataStore, query);
-
- store = (CassandraStore<K, T>) dataStore;
- fields = query.getFields();
-
- boolean isUsingTokens = (query instanceof CassandraPartitionQuery);
- String startTokenOrKey;
- String endTokenOrKey;
-
- if (isUsingTokens) {
- CassandraPartitionQuery<K, T> partitionQuery = (CassandraPartitionQuery<K, T>) query;
- startTokenOrKey = partitionQuery.getStartToken();
- endTokenOrKey = partitionQuery.getEndToken();
- } else {
- CassandraQuery<K, T> cassandraQuery = (CassandraQuery<K, T>) query;
- startTokenOrKey = cassandraQuery.getStartKey().toString();
- endTokenOrKey = cassandraQuery.getEndKey().toString();
- }
-
- Select select = store.createSelect(fields);
-
- CassandraClient client = store.getClientByLocation(getLocation(query));
- if (isUsingTokens) {
- rowIter =
- client.getTokenRange(startTokenOrKey, endTokenOrKey,
- batchRowCount, select).iterator();
- } else {
- rowIter = client.getRange(startTokenOrKey, endTokenOrKey,
- batchRowCount, select).iterator();
- }
- }
-
- @Override
- public float getProgress() throws IOException {
- return 0;
}
@Override
protected boolean nextInner() throws IOException {
- if (!rowIter.hasNext()) {
- return false;
- }
- Row row = rowIter.next();
- if (row == null) {
- return false;
+ if (this.rowNumber < this.cassandraResultSet.size()) {
+ updatePersistent();
}
-
- key = toKey(row.getKey());
- persistent = store.newInstance(row, fields);
-
- return true;
+ ++this.rowNumber;
+ return (this.rowNumber <= this.cassandraResultSet.size());
}
+
+ /**
+ * Load key/value pair from Cassandra row to Avro record.
+ * @throws IOException
+ */
@SuppressWarnings("unchecked")
- private K toKey(String keyStr) {
- Class<K> keyClass = dataStore.getKeyClass();
- if (keyClass.isAssignableFrom(String.class)) {
- return (K) keyStr;
- }
- if (keyClass.isAssignableFrom(Integer.class)) {
- return (K) (Integer) Integer.parseInt(keyStr);
- }
- if (keyClass.isAssignableFrom(Float.class)) {
- return (K) (Float) Float.parseFloat(keyStr);
- }
- if (keyClass.isAssignableFrom(Double.class)) {
- return (K) (Double) Double.parseDouble(keyStr);
- }
- if (keyClass.isAssignableFrom(Long.class)) {
- return (K) (Long) Long.parseLong(keyStr);
- }
- if (keyClass.isAssignableFrom(Short.class)) {
- return (K) (Short) Short.parseShort(keyStr);
- }
- if (keyClass.isAssignableFrom(Byte.class)) {
- return (K) (Byte) Byte.parseByte(keyStr);
+ private void updatePersistent() throws IOException {
+ CassandraRow cassandraRow = this.cassandraResultSet.get(this.rowNumber);
+
+ // load key
+ this.key = (K) cassandraRow.getKey();
+
+ // load value
+ Schema schema = this.persistent.getSchema();
+ List<Field> fields = schema.getFields();
+
+ for (CassandraColumn cassandraColumn: cassandraRow) {
+
+ // get field name
+ String family = cassandraColumn.getFamily();
+ String fieldName = this.reverseMap.get(family + ":" + cassandraColumn.getName());
+
+ // get field
+ int pos = this.persistent.getFieldIndex(fieldName);
+ Field field = fields.get(pos);
+
+ // get value
+ cassandraColumn.setField(field);
+ Object value = cassandraColumn.getValue();
+
+ this.persistent.put(pos, value);
+ // this field does not need to be written back to the store
+ this.persistent.clearDirty(pos);
}
- throw new RuntimeException("Can't parse " + keyStr +
- " as an instance of " + keyClass);
}
@Override
- public void close() throws IOException { }
+ public void close() throws IOException {
+ // TODO Auto-generated method stub
+
+ }
- private String getLocation(Query<K, T> query) {
- if (!(query instanceof CassandraPartitionQuery)) {
- return null;
- }
- CassandraPartitionQuery<K, T> partitonQuery =
- (CassandraPartitionQuery<K, T>) query;
- InetAddress[] localAddresses = new InetAddress[0];
- try {
- localAddresses = InetAddress.getAllByName(InetAddress.getLocalHost().getHostAddress());
- } catch (UnknownHostException e) {
- throw new AssertionError(e);
- }
- for (InetAddress address : localAddresses) {
- for (String location : partitonQuery.getEndPoints()) {
- InetAddress locationAddress = null;
- try {
- locationAddress = InetAddress.getByName(location);
- } catch (UnknownHostException e) {
- throw new AssertionError(e);
- }
- if (address.equals(locationAddress)) {
- return location;
- }
- }
- }
- return partitonQuery.getEndPoints()[0];
+ @Override
+ public float getProgress() throws IOException {
+ return (((float) this.rowNumber) / this.cassandraResultSet.size());
}
-}
\ No newline at end of file
+
+ public void setResultSet(CassandraResultSet cassandraResultSet) {
+ this.cassandraResultSet = cassandraResultSet;
+ }
+
+ public void setReverseMap(Map<String, String> reverseMap) {
+ this.reverseMap = reverseMap;
+ }
+
+}
Added: incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResultSet.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResultSet.java?rev=1149420&view=auto
==============================================================================
--- incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResultSet.java (added)
+++ incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResultSet.java Fri Jul 22 00:33:59 2011
@@ -0,0 +1,36 @@
+package org.apache.gora.cassandra.query;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+
+/**
+ * List data structure to keep the order coming from the Cassandra selects.
+ */
+public class CassandraResultSet extends ArrayList<CassandraRow> {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = -7620939600192859652L;
+
+ /**
+ * Maps keys to indices in the list.
+ */
+ private HashMap<String, Integer> indexMap = new HashMap<String, Integer>();
+
+ public CassandraRow getRow(String key) {
+ Integer integer = this.indexMap.get(key);
+ if (integer == null) {
+ return null;
+ }
+
+ return this.get(integer);
+ }
+
+ public void putRow(String key, CassandraRow cassandraRow) {
+ this.add(cassandraRow);
+ this.indexMap.put(key, this.size()-1);
+ }
+
+
+}
\ No newline at end of file
Added: incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraRow.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraRow.java?rev=1149420&view=auto
==============================================================================
--- incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraRow.java (added)
+++ incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraRow.java Fri Jul 22 00:33:59 2011
@@ -0,0 +1,24 @@
+package org.apache.gora.cassandra.query;
+
+import java.util.ArrayList;
+
+/**
+ * List of key value pairs representing a row, tagged by a key.
+ */
+public class CassandraRow extends ArrayList<CassandraColumn> {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = -7620939600192859652L;
+ private String key;
+
+ public String getKey() {
+ return this.key;
+ }
+
+ public void setKey(String key) {
+ this.key = key;
+ }
+
+}
Added: incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSubColumn.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSubColumn.java?rev=1149420&view=auto
==============================================================================
--- incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSubColumn.java (added)
+++ incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSubColumn.java Fri Jul 22 00:33:59 2011
@@ -0,0 +1,104 @@
+package org.apache.gora.cassandra.query;
+
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.charset.CharacterCodingException;
+import java.nio.charset.Charset;
+import java.nio.charset.CharsetEncoder;
+
+import me.prettyprint.hector.api.beans.HColumn;
+
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.Schema.Type;
+import org.apache.avro.generic.GenericArray;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.util.Utf8;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CassandraSubColumn extends CassandraColumn {
+ public static final Logger LOG = LoggerFactory.getLogger(CassandraSubColumn.class);
+
+ private static final String ENCODING = "UTF-8";
+
+ private static CharsetEncoder charsetEncoder = Charset.forName(ENCODING).newEncoder();;
+
+
+ /**
+ * Key-value pair containing the raw data.
+ */
+ private HColumn<String, String> hColumn;
+
+ public String getName() {
+ return hColumn.getName();
+ }
+
+ /**
+ * Deserialize a String into an typed Object, according to the field schema.
+ * @see org.apache.gora.cassandra.query.CassandraColumn#getValue()
+ */
+ public Object getValue() {
+ Field field = getField();
+ Schema fieldSchema = field.schema();
+ Type type = fieldSchema.getType();
+ String valueString = hColumn.getValue();
+ Object value = null;
+
+ switch (type) {
+ case STRING:
+ value = new Utf8(valueString);
+ break;
+ case BYTES:
+ // convert string to bytebuffer
+ value = getByteBuffer(valueString);
+ break;
+ case INT:
+ value = Integer.parseInt(valueString);
+ break;
+ case LONG:
+ value = Long.parseLong(valueString);
+ break;
+ case FLOAT:
+ value = Float.parseFloat(valueString);
+ break;
+ case ARRAY:
+ // convert string to array
+ valueString = valueString.substring(1, valueString.length()-1);
+ String[] elements = valueString.split(", ");
+
+ Type elementType = fieldSchema.getElementType().getType();
+ if (elementType == Schema.Type.STRING) {
+ // the array type is String
+ GenericArray<String> genericArray = new GenericData.Array<String>(elements.length, Schema.createArray(Schema.create(Schema.Type.STRING)));
+ for (String element: elements) {
+ genericArray.add(element);
+ }
+
+ value = genericArray;
+ } else {
+ LOG.info("Element type not supported: " + elementType);
+ }
+ break;
+ default:
+ LOG.info("Type not supported: " + type);
+ }
+
+ return value;
+
+ }
+
+ public void setValue(HColumn<String, String> hColumn) {
+ this.hColumn = hColumn;
+ }
+
+ public static ByteBuffer getByteBuffer(String valueString) {
+ ByteBuffer byteBuffer = null;
+ try {
+ byteBuffer = charsetEncoder.encode(CharBuffer.wrap(valueString));
+ } catch (CharacterCodingException cce) {
+ LOG.warn("Unable to encode " + valueString + " into " + ENCODING);
+ }
+ return byteBuffer;
+ }
+}
Added: incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSuperColumn.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSuperColumn.java?rev=1149420&view=auto
==============================================================================
--- incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSuperColumn.java (added)
+++ incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSuperColumn.java Fri Jul 22 00:33:59 2011
@@ -0,0 +1,102 @@
+package org.apache.gora.cassandra.query;
+
+import java.util.Map;
+
+import me.prettyprint.hector.api.beans.HColumn;
+import me.prettyprint.hector.api.beans.HSuperColumn;
+
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.Schema.Type;
+import org.apache.avro.util.Utf8;
+import org.apache.gora.persistency.StatefulHashMap;
+import org.apache.gora.persistency.impl.PersistentBase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CassandraSuperColumn extends CassandraColumn {
+ public static final Logger LOG = LoggerFactory.getLogger(CassandraSuperColumn.class);
+
+ private HSuperColumn<String, String, String> hSuperColumn;
+
+ public String getName() {
+ return hSuperColumn.getName();
+ }
+
+ public Object getValue() {
+ Field field = getField();
+ Schema fieldSchema = field.schema();
+ Type type = fieldSchema.getType();
+
+ Object value = null;
+
+ switch (type) {
+ case MAP:
+ Map<Utf8, Object> map = new StatefulHashMap<Utf8, Object>();
+ Type valueType = fieldSchema.getValueType().getType();
+
+ for (HColumn<String, String> hColumn : this.hSuperColumn.getColumns()) {
+ String memberString = hColumn.getValue();
+ Object memberValue = null;
+ switch (valueType) {
+ case STRING:
+ memberValue = new Utf8(memberString);
+ break;
+ case BYTES:
+ memberValue = CassandraSubColumn.getByteBuffer(memberString);
+ break;
+ default:
+ LOG.info("Type for the map value is not supported: " + valueType);
+
+ }
+ map.put(new Utf8(hColumn.getName()), memberValue);
+ }
+ value = map;
+
+ break;
+ case RECORD:
+ String fullName = fieldSchema.getFullName();
+
+ Class<?> claz = null;
+ try {
+ claz = Class.forName(fullName);
+ } catch (ClassNotFoundException cnfe) {
+ LOG.warn("Unable to load class " + fullName, cnfe);
+ break;
+ }
+
+ try {
+ value = claz.newInstance();
+ } catch (InstantiationException ie) {
+ LOG.warn("Instantiation error", ie);
+ break;
+ } catch (IllegalAccessException iae) {
+ LOG.warn("Illegal access error", iae);
+ break;
+ }
+
+ // we updated the value instance, now update its members
+ if (value instanceof PersistentBase) {
+ PersistentBase record = (PersistentBase) value;
+
+ for (HColumn<String, String> hColumn : this.hSuperColumn.getColumns()) {
+ Field memberField = fieldSchema.getField(hColumn.getName());
+ CassandraSubColumn cassandraColumn = new CassandraSubColumn();
+ cassandraColumn.setField(memberField);
+ cassandraColumn.setValue(hColumn);
+ record.put(record.getFieldIndex(hColumn.getName()), cassandraColumn.getValue());
+ }
+ }
+ break;
+ default:
+ LOG.info("Type not supported: " + type);
+ }
+
+ return value;
+ }
+
+ public void setValue(HSuperColumn<String, String, String> hSuperColumn) {
+ this.hSuperColumn = hSuperColumn;
+ }
+
+}
Added: incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java?rev=1149420&view=auto
==============================================================================
--- incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java (added)
+++ incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java Fri Jul 22 00:33:59 2011
@@ -0,0 +1,253 @@
+package org.apache.gora.cassandra.store;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import me.prettyprint.cassandra.serializers.StringSerializer;
+import me.prettyprint.cassandra.service.CassandraHostConfigurator;
+import me.prettyprint.hector.api.Cluster;
+import me.prettyprint.hector.api.Keyspace;
+import me.prettyprint.hector.api.Serializer;
+import me.prettyprint.hector.api.beans.OrderedRows;
+import me.prettyprint.hector.api.beans.OrderedSuperRows;
+import me.prettyprint.hector.api.beans.Row;
+import me.prettyprint.hector.api.beans.SuperRow;
+import me.prettyprint.hector.api.ddl.ColumnFamilyDefinition;
+import me.prettyprint.hector.api.ddl.KeyspaceDefinition;
+import me.prettyprint.hector.api.factory.HFactory;
+import me.prettyprint.hector.api.mutation.Mutator;
+import me.prettyprint.hector.api.query.QueryResult;
+import me.prettyprint.hector.api.query.RangeSlicesQuery;
+import me.prettyprint.hector.api.query.RangeSuperSlicesQuery;
+
+import org.apache.gora.cassandra.query.CassandraQuery;
+import org.apache.gora.mapreduce.GoraRecordReader;
+import org.apache.gora.persistency.Persistent;
+import org.apache.gora.query.Query;
+import org.apache.gora.util.ByteUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CassandraClient<K, T extends Persistent> {
+ public static final Logger LOG = LoggerFactory.getLogger(CassandraClient.class);
+
+ private Cluster cluster;
+ private Keyspace keyspace;
+ private Mutator<String> mutator;
+
+ private CassandraMapping cassandraMapping = new CassandraMapping();
+
+ private Serializer<String> stringSerializer = new StringSerializer();
+
+ public void init() throws Exception {
+ this.cassandraMapping.loadConfiguration();
+ this.cluster = HFactory.getOrCreateCluster(this.cassandraMapping.getClusterName(), new CassandraHostConfigurator(this.cassandraMapping.getHostName()));
+
+ // add keyspace to cluster
+ checkKeyspace();
+
+ // Just create a Keyspace object on the client side, corresponding to an already existing keyspace with already created column families.
+ this.keyspace = HFactory.createKeyspace(this.cassandraMapping.getKeyspaceName(), this.cluster);
+
+ this.mutator = HFactory.createMutator(this.keyspace, this.stringSerializer);
+ }
+
+ /**
+ * Check if keyspace already exists. If not, create it.
+ */
+ public void checkKeyspace() {
+ // "describe keyspace <keyspaceName>;" query
+ KeyspaceDefinition keyspaceDefinition = this.cluster.describeKeyspace(this.cassandraMapping.getKeyspaceName());
+ if (keyspaceDefinition == null) {
+ List<ColumnFamilyDefinition> columnFamilyDefinitions = this.cassandraMapping.getColumnFamilyDefinitions();
+ keyspaceDefinition = HFactory.createKeyspaceDefinition(this.cassandraMapping.getKeyspaceName(), "org.apache.cassandra.locator.SimpleStrategy", 1, columnFamilyDefinitions);
+ this.cluster.addKeyspace(keyspaceDefinition);
+ LOG.info("Keyspace '" + this.cassandraMapping.getKeyspaceName() + "' in cluster '" + this.cassandraMapping.getClusterName() + "' was created on host '" + this.cassandraMapping.getHostName() + "'");
+
+ keyspaceDefinition = null;
+ }
+
+
+ }
+
+ /**
+ * Drop keyspace.
+ */
+ public void dropKeyspace() {
+ // "drop keyspace <keyspaceName>;" query
+ this.cluster.dropKeyspace(this.cassandraMapping.getKeyspaceName());
+ }
+
+ /**
+ * Insert a field in a column.
+ * @param key the row key
+ * @param fieldName the field name
+ * @param value the field value.
+ */
+ public void addColumn(String key, String fieldName, Object value) {
+ if (value == null) {
+ return;
+ }
+ if (value instanceof ByteBuffer) {
+ value = toString((ByteBuffer) value);
+ }
+
+ String columnFamily = this.cassandraMapping.getFamily(fieldName);
+ String columnName = this.cassandraMapping.getColumn(fieldName);
+
+ this.mutator.insert(key, columnFamily, HFactory.createStringColumn(columnName, value.toString()));
+ }
+
+ /**
+ * TODO do no convert bytes to string to store a binary field
+ * @param value
+ * @return
+ */
+ private static String toString(ByteBuffer value) {
+ ByteBuffer byteBuffer = (ByteBuffer) value;
+ return ByteUtils.toString(byteBuffer.array(), 0, byteBuffer.limit());
+ }
+
+ /**
+ * Insert a member in a super column. This is used for map and record Avro types.
+ * @param key the row key
+ * @param fieldName the field name
+ * @param memberName the member name
+ * @param value the member value
+ */
+ public void addSubColumn(String key, String fieldName, String memberName, Object value) {
+ if (value == null) {
+ return;
+ }
+
+ if (value instanceof ByteBuffer) {
+ value = toString((ByteBuffer) value);
+ }
+
+ String columnFamily = this.cassandraMapping.getFamily(fieldName);
+ String superColumnName = this.cassandraMapping.getColumn(fieldName);
+
+ this.mutator.insert(key, columnFamily, HFactory.createSuperColumn(superColumnName, Arrays.asList(HFactory.createStringColumn(memberName, value.toString())), this.stringSerializer, this.stringSerializer, this.stringSerializer));
+
+ }
+
+ /**
+ * Select a family column in the keyspace.
+ * @param cassandraQuery a wrapper of the query
+ * @param family the family name to be queried
+ * @return a list of family rows
+ */
+ public List<Row<String, String, String>> execute(CassandraQuery<K, T> cassandraQuery, String family) {
+
+ String[] columnNames = cassandraQuery.getColumns(family);
+ Query<K, T> query = cassandraQuery.getQuery();
+ int limit = (int) query.getLimit();
+ String startKey = (String) query.getStartKey();
+ String endKey = (String) query.getEndKey();
+
+ if (startKey == null) {
+ startKey = "";
+ }
+ if (endKey == null) {
+ endKey = "";
+ }
+
+
+ RangeSlicesQuery<String, String, String> rangeSlicesQuery = HFactory.createRangeSlicesQuery(this.keyspace, this.stringSerializer, stringSerializer, stringSerializer);
+ rangeSlicesQuery.setColumnFamily(family);
+ rangeSlicesQuery.setKeys(startKey, endKey);
+ rangeSlicesQuery.setRange("", "", false, GoraRecordReader.BUFFER_LIMIT_READ_VALUE);
+ rangeSlicesQuery.setRowCount(limit);
+ rangeSlicesQuery.setColumnNames(columnNames);
+
+
+ QueryResult<OrderedRows<String, String, String>> queryResult = rangeSlicesQuery.execute();
+ OrderedRows<String, String, String> orderedRows = queryResult.get();
+
+
+ return orderedRows.getList();
+ }
+
+ /**
+ * Select the families that contain at least one column mapped to a query field.
+ * @param query indicates the columns to select
+ * @return a map which keys are the family names and values the corresponding column names required to get all the query fields.
+ */
+ public Map<String, List<String>> getFamilyMap(Query<K, T> query) {
+ Map<String, List<String>> map = new HashMap<String, List<String>>();
+ for (String field: query.getFields()) {
+ String family = this.cassandraMapping.getFamily(field);
+ String column = this.cassandraMapping.getColumn(field);
+
+ // check if the family value was already initialized
+ List<String> list = map.get(family);
+ if (list == null) {
+ list = new ArrayList<String>();
+ map.put(family, list);
+ }
+
+ if (column != null) {
+ list.add(column);
+ }
+
+ }
+
+ return map;
+ }
+
+ /**
+ * Select the field names according to the column names, which format if fully qualified: "family:column"
+ * @param query
+ * @return a map which keys are the fully qualified column names and values the query fields
+ */
+ public Map<String, String> getReverseMap(Query<K, T> query) {
+ Map<String, String> map = new HashMap<String, String>();
+ for (String field: query.getFields()) {
+ String family = this.cassandraMapping.getFamily(field);
+ String column = this.cassandraMapping.getColumn(field);
+
+ map.put(family + ":" + column, field);
+ }
+
+ return map;
+
+ }
+
+ public boolean isSuper(String family) {
+ return this.cassandraMapping.isSuper(family);
+ }
+
+ public List<SuperRow<String, String, String, String>> executeSuper(CassandraQuery<K, T> cassandraQuery, String family) {
+ String[] columnNames = cassandraQuery.getColumns(family);
+ Query<K, T> query = cassandraQuery.getQuery();
+ int limit = (int) query.getLimit();
+ String startKey = (String) query.getStartKey();
+ String endKey = (String) query.getEndKey();
+
+ if (startKey == null) {
+ startKey = "";
+ }
+ if (endKey == null) {
+ endKey = "";
+ }
+
+
+ RangeSuperSlicesQuery<String, String, String, String> rangeSuperSlicesQuery = HFactory.createRangeSuperSlicesQuery(this.keyspace, this.stringSerializer, this.stringSerializer, this.stringSerializer, this.stringSerializer);
+ rangeSuperSlicesQuery.setColumnFamily(family);
+ rangeSuperSlicesQuery.setKeys(startKey, endKey);
+ rangeSuperSlicesQuery.setRange("", "", false, GoraRecordReader.BUFFER_LIMIT_READ_VALUE);
+ rangeSuperSlicesQuery.setRowCount(limit);
+ rangeSuperSlicesQuery.setColumnNames(columnNames);
+
+
+ QueryResult<OrderedSuperRows<String, String, String, String>> queryResult = rangeSuperSlicesQuery.execute();
+ OrderedSuperRows<String, String, String, String> orderedRows = queryResult.get();
+ return orderedRows.getList();
+
+
+ }
+}
Modified: incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java?rev=1149420&r1=1149419&r2=1149420&view=diff
==============================================================================
--- incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java (original)
+++ incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java Fri Jul 22 00:33:59 2011
@@ -1,50 +1,155 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
package org.apache.gora.cassandra.store;
+import java.io.IOException;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
-import java.util.Set;
+
+import me.prettyprint.cassandra.model.BasicColumnFamilyDefinition;
+import me.prettyprint.cassandra.service.ThriftCfDef;
+import me.prettyprint.hector.api.ddl.ColumnFamilyDefinition;
+import me.prettyprint.hector.api.ddl.ColumnType;
+import me.prettyprint.hector.api.ddl.ComparatorType;
+
+import org.jdom.Document;
+import org.jdom.Element;
+import org.jdom.JDOMException;
+import org.jdom.input.SAXBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class CassandraMapping {
+
+ public static final Logger LOG = LoggerFactory.getLogger(CassandraMapping.class);
+
+ private static final String MAPPING_FILE = "gora-cassandra-mapping.xml";
+ private static final String KEYSPACE_ELEMENT = "keyspace";
+ private static final String NAME_ATTRIBUTE = "name";
+ private static final String MAPPING_ELEMENT = "class";
+ private static final String COLUMN_ATTRIBUTE = "qualifier";
+ private static final String FAMILY_ATTRIBUTE = "family";
+ private static final String SUPER_ATTRIBUTE = "type";
+ private static final String CLUSTER_ATTRIBUTE = "cluster";
+ private static final String HOST_ATTRIBUTE = "host";
+
+
+ private String hostName;
+ private String clusterName;
+ private String keyspaceName;
+
+
+ /**
+ * List of the super column families.
+ */
+ private List<String> superFamilies = new ArrayList<String>();
+
+ /**
+ * Look up the column family associated to the Avro field.
+ */
+ private Map<String, String> familyMap = new HashMap<String, String>();
+
+ /**
+ * Look up the column associated to the Avro field.
+ */
+ private Map<String, String> columnMap = new HashMap<String, String>();
+
+ /**
+ * Look up the column family from its name.
+ */
+ private Map<String, BasicColumnFamilyDefinition> columnFamilyDefinitions = new HashMap<String, BasicColumnFamilyDefinition>();
- private String keySpace;
+ public String getHostName() {
+ return this.hostName;
+ }
- private Map<String, Boolean> families =
- new HashMap<String, Boolean>();
+ public String getClusterName() {
+ return this.clusterName;
+ }
- public String getKeySpace() {
- return keySpace;
+ public String getKeyspaceName() {
+ return this.keyspaceName;
}
- public void setKeySpace(String keySpace) {
- this.keySpace = keySpace;
+
+ @SuppressWarnings("unchecked")
+ public void loadConfiguration() throws JDOMException, IOException {
+ SAXBuilder saxBuilder = new SAXBuilder();
+ Document document = saxBuilder.build(getClass().getClassLoader().getResourceAsStream(MAPPING_FILE));
+ Element root = document.getRootElement();
+
+ Element keyspace = root.getChild(KEYSPACE_ELEMENT);
+ this.keyspaceName = keyspace.getAttributeValue(NAME_ATTRIBUTE);
+ this.clusterName = keyspace.getAttributeValue(CLUSTER_ATTRIBUTE);
+ this.hostName = keyspace.getAttributeValue(HOST_ATTRIBUTE);
+
+ // load column family definitions
+ List<Element> elements = keyspace.getChildren();
+ for (Element element: elements) {
+ BasicColumnFamilyDefinition cfDef = new BasicColumnFamilyDefinition();
+
+ String familyName = element.getAttributeValue(NAME_ATTRIBUTE);
+
+ String superAttribute = element.getAttributeValue(SUPER_ATTRIBUTE);
+ if (superAttribute != null) {
+ this.superFamilies.add(familyName);
+ cfDef.setColumnType(ColumnType.SUPER);
+ cfDef.setSubComparatorType(ComparatorType.UTF8TYPE);
+ }
+
+ cfDef.setKeyspaceName(this.keyspaceName);
+ cfDef.setName(familyName);
+ cfDef.setComparatorType(ComparatorType.UTF8TYPE);
+ cfDef.setDefaultValidationClass(ComparatorType.UTF8TYPE.getClassName());
+
+ this.columnFamilyDefinitions.put(familyName, cfDef);
+
+ }
+
+ // load column definitions
+ Element mapping = root.getChild(MAPPING_ELEMENT);
+ elements = mapping.getChildren();
+ for (Element element: elements) {
+ String fieldName = element.getAttributeValue(NAME_ATTRIBUTE);
+ String familyName = element.getAttributeValue(FAMILY_ATTRIBUTE);
+ String columnName = element.getAttributeValue(COLUMN_ATTRIBUTE);
+ BasicColumnFamilyDefinition columnFamilyDefinition = this.columnFamilyDefinitions.get(familyName);
+ if (columnFamilyDefinition == null) {
+ LOG.warn("Family " + familyName + " was not declared in the keyspace.");
+ }
+
+ this.familyMap.put(fieldName, familyName);
+ this.columnMap.put(fieldName, columnName);
+
+ }
}
- public Set<String> getColumnFamilies() {
- return families.keySet();
+ public String getFamily(String name) {
+ return this.familyMap.get(name);
}
- public void addColumnFamily(String columnFamily, boolean isSuper) {
- families.put(columnFamily, isSuper);
+ public String getColumn(String name) {
+ return this.columnMap.get(name);
}
- public boolean isColumnFamilySuper(String columnFamily) {
- return families.get(columnFamily);
+ /**
+ * Read family super attribute.
+ * @param family the family name
+ * @return true is the family is a super column family
+ */
+ public boolean isSuper(String family) {
+ return this.superFamilies.indexOf(family) != -1;
}
+
+ public List<ColumnFamilyDefinition> getColumnFamilyDefinitions() {
+ List<ColumnFamilyDefinition> list = new ArrayList<ColumnFamilyDefinition>();
+ for (String key: this.columnFamilyDefinitions.keySet()) {
+ ColumnFamilyDefinition columnFamilyDefinition = this.columnFamilyDefinitions.get(key);
+ ThriftCfDef thriftCfDef = new ThriftCfDef(columnFamilyDefinition);
+ list.add(thriftCfDef);
+ }
+
+ return list;
+ }
+
}
Modified: incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java?rev=1149420&r1=1149419&r2=1149420&view=diff
==============================================================================
--- incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java (original)
+++ incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java Fri Jul 22 00:33:59 2011
@@ -1,465 +1,334 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
package org.apache.gora.cassandra.store;
import java.io.IOException;
-import java.net.InetAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Properties;
-import java.util.Set;
+
+import me.prettyprint.hector.api.beans.ColumnSlice;
+import me.prettyprint.hector.api.beans.HColumn;
+import me.prettyprint.hector.api.beans.HSuperColumn;
+import me.prettyprint.hector.api.beans.Row;
+import me.prettyprint.hector.api.beans.SuperRow;
+import me.prettyprint.hector.api.beans.SuperSlice;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.Schema.Type;
import org.apache.avro.generic.GenericArray;
import org.apache.avro.util.Utf8;
-import org.apache.cassandra.thrift.TokenRange;
-import org.apache.gora.cassandra.client.CassandraClient;
-import org.apache.gora.cassandra.client.Mutate;
-import org.apache.gora.cassandra.client.Row;
-import org.apache.gora.cassandra.client.Select;
-import org.apache.gora.cassandra.client.SimpleCassandraClient;
-import org.apache.gora.cassandra.query.CassandraPartitionQuery;
import org.apache.gora.cassandra.query.CassandraQuery;
import org.apache.gora.cassandra.query.CassandraResult;
-import org.apache.gora.persistency.ListGenericArray;
+import org.apache.gora.cassandra.query.CassandraResultSet;
+import org.apache.gora.cassandra.query.CassandraRow;
+import org.apache.gora.cassandra.query.CassandraSubColumn;
+import org.apache.gora.cassandra.query.CassandraSuperColumn;
import org.apache.gora.persistency.Persistent;
-import org.apache.gora.persistency.State;
-import org.apache.gora.persistency.StateManager;
import org.apache.gora.persistency.StatefulHashMap;
-import org.apache.gora.persistency.StatefulMap;
+import org.apache.gora.persistency.impl.PersistentBase;
+import org.apache.gora.persistency.impl.StateManagerImpl;
import org.apache.gora.query.PartitionQuery;
import org.apache.gora.query.Query;
import org.apache.gora.query.Result;
-import org.apache.gora.store.DataStoreFactory;
+import org.apache.gora.query.impl.PartitionQueryImpl;
import org.apache.gora.store.impl.DataStoreBase;
-import org.apache.gora.util.ByteUtils;
-import org.jdom.Document;
-import org.jdom.Element;
-import org.jdom.input.SAXBuilder;
-
-/**
- * DataStore for Cassandra.
- *
- * <p> Note: CassandraStore is not thread-safe. </p>
- */
-public class CassandraStore<K, T extends Persistent>
-extends DataStoreBase<K, T> {
-
- private static final String ERROR_MESSAGE =
- "Cassandra does not support creating or modifying ColumnFamilies during runtime";
-
- private static final String DEFAULT_MAPPING_FILE = "gora-cassandra-mapping.xml";
-
- private static final int SPLIT_SIZE = 65536;
-
- private static final int BATCH_COUNT = 256;
-
- private CassandraClient client;
-
- private Map<String, CassandraColumn> columnMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
- private CassandraMapping mapping;
-
- @Override
- public void initialize(Class<K> keyClass, Class<T> persistentClass,
- Properties properties) throws IOException {
- super.initialize(keyClass, persistentClass, properties);
-
- String mappingFile =
- DataStoreFactory.getMappingFile(properties, this, DEFAULT_MAPPING_FILE);
-
- readMapping(mappingFile);
+public class CassandraStore<K, T extends Persistent> extends DataStoreBase<K, T> {
+ public static final Logger LOG = LoggerFactory.getLogger(CassandraStore.class);
+
+ private CassandraClient<K, T> cassandraClient = new CassandraClient<K, T>();
+
+ /**
+ * The values are Avro fields pending to be stored.
+ */
+ private Map<K, T> buffer = new HashMap<K, T>();
+
+ public CassandraStore() throws Exception {
+ this.cassandraClient.init();
}
@Override
- public String getSchemaName() {
- return mapping.getKeySpace();
+ public void close() throws IOException {
+ LOG.debug("close");
+ flush();
}
@Override
- public void createSchema() throws IOException {
- throw new UnsupportedOperationException(ERROR_MESSAGE);
+ public void createSchema() {
+ LOG.debug("create schema");
+ this.cassandraClient.checkKeyspace();
}
@Override
- public void deleteSchema() throws IOException {
- throw new UnsupportedOperationException(ERROR_MESSAGE);
+ public boolean delete(K key) throws IOException {
+ LOG.debug("delete " + key);
+ return false;
}
@Override
- public boolean schemaExists() throws IOException {
- return true;
+ public long deleteByQuery(Query<K, T> query) throws IOException {
+ LOG.debug("delete by query " + query);
+ return 0;
}
- public CassandraClient getClientByLocation(String endPoint) {
- return client;
+ @Override
+ public void deleteSchema() throws IOException {
+ LOG.debug("delete schema");
+ this.cassandraClient.dropKeyspace();
}
- public Select createSelect(String[] fields) {
- Select select = new Select();
- if (fields == null) {
- fields = beanFactory.getCachedPersistent().getFields();
- }
- for (String f : fields) {
- CassandraColumn col = columnMap.get(f);
- Schema fieldSchema = fieldMap.get(f).schema();
- switch (fieldSchema.getType()) {
- case MAP:
- case ARRAY:
- if (col.isSuperColumn()) {
- select.addAllColumnsForSuperColumn(col.family, col.superColumn);
- } else {
- select.addColumnAll(col.family);
- }
- break;
- default:
- if (col.isSuperColumn()) {
- select.addColumnName(col.family, col.superColumn, col.column);
- } else {
- select.addColumnName(col.family, col.column);
- }
- break;
+ @Override
+ public Result<K, T> execute(Query<K, T> query) throws IOException {
+
+ Map<String, List<String>> familyMap = this.cassandraClient.getFamilyMap(query);
+ Map<String, String> reverseMap = this.cassandraClient.getReverseMap(query);
+
+ CassandraQuery<K, T> cassandraQuery = new CassandraQuery<K, T>();
+ cassandraQuery.setQuery(query);
+ cassandraQuery.setFamilyMap(familyMap);
+
+ CassandraResult<K, T> cassandraResult = new CassandraResult<K, T>(this, query);
+ cassandraResult.setReverseMap(reverseMap);
+
+ CassandraResultSet cassandraResultSet = new CassandraResultSet();
+
+ // We query Cassandra keyspace by families.
+ for (String family : familyMap.keySet()) {
+ if (this.cassandraClient.isSuper(family)) {
+ addSuperColumns(family, cassandraQuery, cassandraResultSet);
+
+ } else {
+ addSubColumns(family, cassandraQuery, cassandraResultSet);
+
}
+
}
- return select;
- }
+
+ cassandraResult.setResultSet(cassandraResultSet);
+
+
+ return cassandraResult;
- @Override
- public T get(K key, String[] fields) throws IOException {
- if (fields == null) {
- fields = beanFactory.getCachedPersistent().getFields();
- }
- Select select = createSelect(fields);
- try {
- Row result = client.get(key.toString(), select);
- return newInstance(result, fields);
- } catch (Exception e) {
- throw new IOException(e);
- }
}
- @SuppressWarnings("rawtypes")
- private void setField(T persistent, Field field, StatefulMap map) {
- persistent.put(field.pos(), map);
- }
-
- private void setField(T persistent, Field field, byte[] val)
- throws IOException {
- persistent.put(field.pos()
- , ByteUtils.fromBytes(val, field.schema(), datumReader, persistent.get(field.pos())));
- }
-
- @SuppressWarnings("rawtypes")
- private void setField(T persistent, Field field, GenericArray list) {
- persistent.put(field.pos(), list);
- }
-
- @SuppressWarnings({ "rawtypes", "unchecked" })
- public T newInstance(Row result, String[] fields)
- throws IOException {
- if(result == null)
- return null;
-
- T persistent = newPersistent();
- StateManager stateManager = persistent.getStateManager();
- for (String f : fields) {
- CassandraColumn col = columnMap.get(f);
- Field field = fieldMap.get(f);
- Schema fieldSchema = field.schema();
- Map<String, byte[]> qualMap;
- switch(fieldSchema.getType()) {
- case MAP:
- if (col.isSuperColumn()) {
- qualMap = result.getSuperColumn(col.family, col.superColumn);
- } else {
- qualMap = result.getColumn(col.family);
- }
- if (qualMap == null) {
- continue;
- }
- Schema valueSchema = fieldSchema.getValueType();
- StatefulMap map = new StatefulHashMap();
- for (Entry<String, byte[]> e : qualMap.entrySet()) {
- Utf8 mapKey = new Utf8(e.getKey());
- map.put(mapKey, ByteUtils.fromBytes(e.getValue(), valueSchema, datumReader, null));
- map.putState(mapKey, State.CLEAN);
- }
- setField(persistent, field, map);
- break;
- case ARRAY:
- if (col.isSuperColumn()) {
- qualMap = result.getSuperColumn(col.family, col.superColumn);
- } else {
- qualMap = result.getColumn(col.family);
- }
- if (qualMap == null) {
- continue;
- }
- valueSchema = fieldSchema.getElementType();
- ArrayList arrayList = new ArrayList();
- for (Entry<String, byte[]> e : qualMap.entrySet()) {
- arrayList.add(ByteUtils.fromBytes(e.getValue(), valueSchema, datumReader, null));
- }
- ListGenericArray arr = new ListGenericArray(fieldSchema, arrayList);
- setField(persistent, field, arr);
- break;
- default:
- byte[] val;
- if (col.isSuperColumn()) {
- val = result.get(col.family, col.superColumn, col.column);
- } else {
- val = result.get(col.family, col.column);
- }
- if (val == null) {
- continue;
- }
- setField(persistent, field, val);
- break;
+ private void addSubColumns(String family, CassandraQuery<K, T> cassandraQuery,
+ CassandraResultSet cassandraResultSet) {
+ // select family columns that are included in the query
+ List<Row<String, String, String>> rows = this.cassandraClient.execute(cassandraQuery, family);
+
+ for (Row<String, String, String> row : rows) {
+ String key = row.getKey();
+
+ // find associated row in the resultset
+ CassandraRow cassandraRow = cassandraResultSet.getRow(key);
+ if (cassandraRow == null) {
+ cassandraRow = new CassandraRow();
+ cassandraResultSet.putRow(key, cassandraRow);
+ cassandraRow.setKey(key);
}
+
+ ColumnSlice<String, String> columnSlice = row.getColumnSlice();
+
+ for (HColumn<String, String> hColumn : columnSlice.getColumns()) {
+ CassandraSubColumn cassandraSubColumn = new CassandraSubColumn();
+ cassandraSubColumn.setValue(hColumn);
+ cassandraSubColumn.setFamily(family);
+ cassandraRow.add(cassandraSubColumn);
+ }
+
}
- stateManager.clearDirty(persistent);
- return persistent;
}
- @Override
- public void put(K key, T obj) throws IOException {
- Mutate mutate = new Mutate();
- Schema schema = obj.getSchema();
- StateManager stateManager = obj.getStateManager();
- List<Field> fields = schema.getFields();
- String qual;
- byte[] value;
- for (int i = 0; i < fields.size(); i++) {
- if (!stateManager.isDirty(obj, i)) {
- continue;
+ private void addSuperColumns(String family, CassandraQuery<K, T> cassandraQuery,
+ CassandraResultSet cassandraResultSet) {
+
+ List<SuperRow<String, String, String, String>> superRows = this.cassandraClient.executeSuper(cassandraQuery, family);
+ for (SuperRow<String, String, String, String> superRow: superRows) {
+ String key = superRow.getKey();
+ CassandraRow cassandraRow = cassandraResultSet.getRow(key);
+ if (cassandraRow == null) {
+ cassandraRow = new CassandraRow();
+ cassandraResultSet.putRow(key, cassandraRow);
+ cassandraRow.setKey(key);
}
- Field field = fields.get(i);
- Type type = field.schema().getType();
- Object o = obj.get(i);
- CassandraColumn col = columnMap.get(field.name());
-
- switch(type) {
- case MAP:
- if(o instanceof StatefulMap) {
- @SuppressWarnings("unchecked")
- StatefulMap<Utf8, ?> map = (StatefulMap<Utf8, ?>) o;
- for (Entry<Utf8, State> e : map.states().entrySet()) {
- Utf8 mapKey = e.getKey();
- switch (e.getValue()) {
- case DIRTY:
- qual = mapKey.toString();
- value = ByteUtils.toBytes(map.get(mapKey), field.schema().getValueType(), datumWriter);
- if (col.isSuperColumn()) {
- mutate.put(col.family, col.superColumn, qual, value);
- } else {
- mutate.put(col.family, qual, value);
- }
- break;
- case DELETED:
- qual = mapKey.toString();
- if (col.isSuperColumn()) {
- mutate.delete(col.family, col.superColumn, qual);
- } else {
- mutate.delete(col.family, qual);
- }
- break;
- }
- }
- } else {
- @SuppressWarnings({ "rawtypes", "unchecked" })
- Set<Map.Entry> set = ((Map)o).entrySet();
- for(@SuppressWarnings("rawtypes") Entry entry: set) {
- qual = entry.getKey().toString();
- value = ByteUtils.toBytes(entry.getValue().toString());
- if (col.isSuperColumn()) {
- mutate.put(col.family, col.superColumn, qual, value);
- } else {
- mutate.put(col.family, qual, value);
- }
- }
- }
- break;
- case ARRAY:
- if(o instanceof GenericArray) {
- @SuppressWarnings("rawtypes")
- GenericArray arr = (GenericArray) o;
- int j=0;
- for(Object item : arr) {
- value = ByteUtils.toBytes(item.toString());
- if (col.isSuperColumn()) {
- mutate.put(col.family, col.superColumn, Integer.toString(j), value);
- } else {
- mutate.put(col.family, Integer.toString(j), value);
- }
- j++;
- }
- }
- break;
- default:
- value = ByteUtils.toBytes(o, field.schema(), datumWriter);
- if (col.isSuperColumn()) {
- mutate.put(col.family, col.superColumn, col.column, value);
- } else {
- mutate.put(col.family, col.column, value);
- }
- break;
+
+ SuperSlice<String, String, String> superSlice = superRow.getSuperSlice();
+ for (HSuperColumn<String, String, String> hSuperColumn: superSlice.getSuperColumns()) {
+ CassandraSuperColumn cassandraSuperColumn = new CassandraSuperColumn();
+ cassandraSuperColumn.setValue(hSuperColumn);
+ cassandraSuperColumn.setFamily(family);
+ cassandraRow.add(cassandraSuperColumn);
}
}
-
- if(!mutate.isEmpty())
- client.mutate(key.toString(), mutate);
}
- @Override
- public boolean delete(K key) throws IOException {
- Mutate mutate = new Mutate();
- for (String family : mapping.getColumnFamilies()) {
- mutate.deleteAll(family);
+ /**
+ * Flush the buffer. Write the buffered rows.
+ * @see org.apache.gora.store.DataStore#flush()
+ */
+ @Override
+ public void flush() throws IOException {
+ for (K key: this.buffer.keySet()) {
+ T value = this.buffer.get(key);
+ Schema schema = value.getSchema();
+ for (Field field: schema.getFields()) {
+ if (value.isDirty(field.pos())) {
+ addOrUpdateField((String) key, field, value.get(field.pos()));
+ }
+ }
}
-
- client.mutate(key.toString(), mutate);
- return true;
+
+ this.buffer.clear();
}
@Override
- public void flush() throws IOException { }
-
- @Override
- public void close() throws IOException {
- client.close();
+ public T get(K key, String[] fields) throws IOException {
+ LOG.info("get " + key);
+ return null;
}
@Override
- public Query<K, T> newQuery() {
- return new CassandraQuery<K, T>(this);
+ public List<PartitionQuery<K, T>> getPartitions(Query<K, T> query)
+ throws IOException {
+ // just a single partition
+ List<PartitionQuery<K,T>> partitions = new ArrayList<PartitionQuery<K,T>>();
+ partitions.add(new PartitionQueryImpl<K,T>(query));
+ return partitions;
}
@Override
- public long deleteByQuery(Query<K, T> query) throws IOException {
- // TODO Auto-generated method stub
- return 0;
+ public String getSchemaName() {
+ LOG.info("get schema name");
+ return null;
}
@Override
- public Result<K, T> execute(Query<K, T> query) throws IOException {
- return new CassandraResult<K, T>(this, query, BATCH_COUNT);
+ public Query<K, T> newQuery() {
+ return new CassandraQuery<K, T>(this);
}
- @Override
- public List<PartitionQuery<K, T>> getPartitions(Query<K, T> query)
- throws IOException {
- List<PartitionQuery<K,T>> partitions = new ArrayList<PartitionQuery<K,T>>();
-
- List<TokenRange> rangeList = client.describeRing();
- for (TokenRange range : rangeList) {
- List<String> tokens =
- client.describeSplits(range.start_token, range.end_token, SPLIT_SIZE);
- // turn the sub-ranges into InputSplits
- String[] endpoints = range.endpoints.toArray(new String[range.endpoints.size()]);
- // hadoop needs hostname, not ip
- for (int i = 0; i < endpoints.length; i++) {
- endpoints[i] = InetAddress.getByName(endpoints[i]).getHostName();
- }
-
- for (int i = 1; i < tokens.size(); i++) {
- CassandraPartitionQuery<K, T> partitionQuery =
- new CassandraPartitionQuery<K, T>(query, tokens.get(i - 1), tokens.get(i), endpoints, SPLIT_SIZE);
- partitions.add(partitionQuery);
+ /**
+ * Duplicate instance to keep all the objects in memory till flushing.
+ * @see org.apache.gora.store.DataStore#put(java.lang.Object, org.apache.gora.persistency.Persistent)
+ */
+ @Override
+ public void put(K key, T value) throws IOException {
+ T p = (T) value.newInstance(new StateManagerImpl());
+ Schema schema = value.getSchema();
+ for (Field field: schema.getFields()) {
+ if (value.isDirty(field.pos())) {
+ Object fieldValue = value.get(field.pos());
+
+ // check if field has a nested structure (map or record)
+ Schema fieldSchema = field.schema();
+ Type type = fieldSchema.getType();
+ switch(type) {
+ case RECORD:
+ Persistent persistent = (Persistent) fieldValue;
+ Persistent newRecord = persistent.newInstance(new StateManagerImpl());
+ for (Field member: fieldSchema.getFields()) {
+ newRecord.put(member.pos(), persistent.get(member.pos()));
+ }
+ fieldValue = newRecord;
+ break;
+ case MAP:
+ StatefulHashMap<?, ?> map = (StatefulHashMap<?, ?>) fieldValue;
+ StatefulHashMap<?, ?> newMap = new StatefulHashMap(map);
+ fieldValue = newMap;
+ break;
+ }
+
+ p.put(field.pos(), fieldValue);
}
}
- return partitions;
- }
-
- private CassandraClient createClient() throws IOException {
- String serverStr =
- DataStoreFactory.findPropertyOrDie(properties, this, "servers");
- String[] server1Parts = serverStr.split(",")[0].split(":");
- try {
- return new SimpleCassandraClient(server1Parts[0],
- Integer.parseInt(server1Parts[1]), mapping.getKeySpace());
- } catch (Exception e) {
- throw new IOException(e);
- }
- }
-
- @SuppressWarnings("unchecked")
- protected void readMapping(String filename) throws IOException {
-
- mapping = new CassandraMapping();
- columnMap = new HashMap<String, CassandraColumn>();
-
- try {
- SAXBuilder builder = new SAXBuilder();
- Document doc = builder.build(getClass().getClassLoader()
- .getResourceAsStream(filename));
-
- List<Element> classes = doc.getRootElement().getChildren("class");
-
- for(Element classElement: classes) {
- if(classElement.getAttributeValue("keyClass").equals(keyClass.getCanonicalName())
- && classElement.getAttributeValue("name").equals(
- persistentClass.getCanonicalName())) {
-
- String keySpace = classElement.getAttributeValue("keyspace");
- mapping.setKeySpace(keySpace);
- client = createClient();
- Map<String, Map<String, String>> keySpaceDesc = client.describeKeySpace();
- for (Entry<String, Map<String, String>> e : keySpaceDesc.entrySet()) {
- boolean isSuper = e.getValue().get("Type").equals("Super");
- mapping.addColumnFamily(e.getKey(), isSuper);
- }
-
- List<Element> fields = classElement.getChildren("field");
-
- for(Element field:fields) {
- String fieldName = field.getAttributeValue("name");
- String path = field.getAttributeValue("path");
- String[] parts = path.split(":");
- String columnFamily = parts[0];
- String superColumn = null;
- String column = null;
-
- boolean isSuper = mapping.isColumnFamilySuper(columnFamily);
- if (isSuper) {
- superColumn = parts[1];
- if (parts.length == 3) {
- column = parts[2];
+
+ this.buffer.put(key, p);
+ }
+
+ /**
+ * Add a field to Cassandra according to its type.
+ * @param key the key of the row where the field should be added
+ * @param field the Avro field representing a datum
+ * @param value the field value
+ */
+ private void addOrUpdateField(String key, Field field, Object value) {
+ Schema schema = field.schema();
+ Type type = schema.getType();
+ //LOG.info(field.name() + " " + type.name());
+ switch (type) {
+ case STRING:
+ this.cassandraClient.addColumn(key, field.name(), value);
+ break;
+ case INT:
+ this.cassandraClient.addColumn(key, field.name(), value);
+ break;
+ case LONG:
+ this.cassandraClient.addColumn(key, field.name(), value);
+ break;
+ case BYTES:
+ this.cassandraClient.addColumn(key, field.name(), value);
+ break;
+ case FLOAT:
+ this.cassandraClient.addColumn(key, field.name(), value);
+ break;
+ case RECORD:
+ if (value != null) {
+ if (value instanceof PersistentBase) {
+ PersistentBase persistentBase = (PersistentBase) value;
+ for (Field member: schema.getFields()) {
+
+ // TODO: hack, do not store empty arrays
+ Object memberValue = persistentBase.get(member.pos());
+ if (memberValue instanceof GenericArray<?>) {
+ GenericArray<String> array = (GenericArray<String>) memberValue;
+ if (array.size() == 0) {
+ continue;
+ }
}
- } else {
- if (parts.length == 2) {
- column = parts[1];
+
+ this.cassandraClient.addSubColumn(key, field.name(), member.name(), memberValue);
+ }
+ } else {
+ LOG.info("Record not supported: " + value.toString());
+
+ }
+ }
+ break;
+ case MAP:
+ if (value != null) {
+ if (value instanceof StatefulHashMap<?, ?>) {
+ //TODO cast to stateful map and only write dirty keys
+ Map<Utf8, Object> map = (Map<Utf8, Object>) value;
+ for (Utf8 mapKey: map.keySet()) {
+
+ // TODO: hack, do not store empty arrays
+ Object keyValue = map.get(mapKey);
+ if (keyValue instanceof GenericArray<?>) {
+ GenericArray<String> array = (GenericArray<String>) keyValue;
+ if (array.size() == 0) {
+ continue;
+ }
}
+
+ this.cassandraClient.addSubColumn(key, field.name(), mapKey.toString(), keyValue);
}
-
- columnMap.put(fieldName,
- new CassandraColumn(columnFamily, superColumn, column));
+ } else {
+ LOG.info("Map not supported: " + value.toString());
}
-
- break;
}
- }
- } catch(Exception ex) {
- throw new IOException(ex);
+ break;
+ default:
+ LOG.info("Type not considered: " + type.name());
}
}
-}
\ No newline at end of file
+
+ @Override
+ public boolean schemaExists() throws IOException {
+ LOG.info("schema exists");
+ return false;
+ }
+
+}
Modified: incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/GoraRecordReader.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/GoraRecordReader.java?rev=1149420&r1=1149419&r2=1149420&view=diff
==============================================================================
--- incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/GoraRecordReader.java (original)
+++ incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/GoraRecordReader.java Fri Jul 22 00:33:59 2011
@@ -36,8 +36,8 @@ import org.slf4j.LoggerFactory;
public class GoraRecordReader<K, T extends Persistent> extends RecordReader<K,T> {
public static final Logger LOG = LoggerFactory.getLogger(GoraRecordReader.class);
- private static final String BUFFER_LIMIT_READ_NAME = "gora.buffer.read.limit";
- private static final int BUFFER_LIMIT_READ_VALUE = 10000;
+ public static final String BUFFER_LIMIT_READ_NAME = "gora.buffer.read.limit";
+ public static final int BUFFER_LIMIT_READ_VALUE = 10000;
protected Query<K,T> query;
protected Result<K,T> result;