You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@gora.apache.org by Henry Saputra <he...@gmail.com> on 2011/07/22 02:39:16 UTC
Re: svn commit: r1149420 - in /incubator/gora/trunk:
gora-cassandra/ivy/ gora-cassandra/lib-ext/ gora-cassandra/src/main/java/org/apache/gora/cassandra/client/
gora-cassandra/src/main/java/org/apache/gora/cassandra/query/ gora-cassandra/src/main/java
Looks like some files missing Apache license header.
- Henry
On Thu, Jul 21, 2011 at 5:34 PM, <al...@apache.org> wrote:
> Author: alexis
> Date: Fri Jul 22 00:33:59 2011
> New Revision: 1149420
>
> URL: http://svn.apache.org/viewvc?rev=1149420&view=rev
> Log:
> Cassandra 0.8 backend witch Hector client
>
> Added:
> incubator/gora/trunk/gora-cassandra/lib-ext/cassandra-all-0.8.0.jar (with props)
> incubator/gora/trunk/gora-cassandra/lib-ext/hector-core-0.8.0-1.jar (with props)
> incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraColumn.java
> incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResultSet.java
> incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraRow.java
> incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSubColumn.java
> incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSuperColumn.java
> incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java
> Removed:
> incubator/gora/trunk/gora-cassandra/lib-ext/apache-cassandra-0.6.4.jar
> incubator/gora/trunk/gora-cassandra/lib-ext/libthrift-r917130.jar
> incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/client/
> incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraColumn.java
> Modified:
> incubator/gora/trunk/gora-cassandra/ivy/ivy.xml
> incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraPartitionQuery.java
> incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraQuery.java
> incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResult.java
> incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java
> incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
> incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/GoraRecordReader.java
>
> Modified: incubator/gora/trunk/gora-cassandra/ivy/ivy.xml
> URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/ivy/ivy.xml?rev=1149420&r1=1149419&r2=1149420&view=diff
> ==============================================================================
> --- incubator/gora/trunk/gora-cassandra/ivy/ivy.xml (original)
> +++ incubator/gora/trunk/gora-cassandra/ivy/ivy.xml Fri Jul 22 00:33:59 2011
> @@ -22,7 +22,7 @@
> organisation="org.apache.gora"
> module="gora-cassandra"
> status="integration"/>
> -
> +
> <configurations>
> <include file="${project.dir}/ivy/ivy-configurations.xml"/>
> </configurations>
> @@ -32,15 +32,25 @@
> <artifact name="gora-cassandra-test" conf="test"/>
> </publications>
>
> +
> <dependencies>
> <!-- conf="*->@" means every conf is mapped to the conf of the same name of the artifact-->
> - <dependency org="org.apache.gora" name="gora-core" rev="latest.integration" changing="true" conf="*->@"/>
> - <dependency org="org.jdom" name="jdom" rev="1.1" conf="*->master"/>
> -
> - <dependency org="org.slf4j" name="slf4j-jdk14" rev="1.5.8" conf="*->master"/>
> - <dependency org="org.slf4j" name="slf4j-api" rev="1.5.8" conf="*->master"/>
> -
> - <dependency org="com.google.guava" name="guava" rev="r06"/>
> +
> + <dependency org="org.apache.gora" name="gora-core" rev="latest.integration" changing="true" conf="*->@"/>
> +
> + <dependency org="org.jdom" name="jdom" rev="1.1">
> + <exclude org="xerces" name="xercesImpl"/>
> + </dependency>
> +
> + <!--
> + <dependency org="org.apache.cassandra" name="apache-cassandra" rev="0.8.1"/>
> + <dependency org="me.prettyprint" name="hector" rev="0.8.0-1"/>
> + -->
> + <dependency org="org.apache.cassandra" name="cassandra-thrift" rev="0.8.1"/>
> + <dependency org="com.ecyrd.speed4j" name="speed4j" rev="0.9" conf="*->*,!javadoc,!sources"/>
> + <dependency org="com.github.stephenc.high-scale-lib" name="high-scale-lib" rev="1.1.2" conf="*->*,!javadoc,!sources"/>
> + <dependency org="com.google.collections" name="google-collections" rev="1.0" conf="*->*,!javadoc,!sources"/>
> + <dependency org="com.google.guava" name="guava" rev="r09" conf="*->*,!javadoc,!sources"/>
>
> <!-- test dependencies -->
>
>
> Added: incubator/gora/trunk/gora-cassandra/lib-ext/cassandra-all-0.8.0.jar
> URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/lib-ext/cassandra-all-0.8.0.jar?rev=1149420&view=auto
> ==============================================================================
> Binary file - no diff available.
>
> Propchange: incubator/gora/trunk/gora-cassandra/lib-ext/cassandra-all-0.8.0.jar
> ------------------------------------------------------------------------------
> svn:mime-type = application/octet-stream
>
> Added: incubator/gora/trunk/gora-cassandra/lib-ext/hector-core-0.8.0-1.jar
> URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/lib-ext/hector-core-0.8.0-1.jar?rev=1149420&view=auto
> ==============================================================================
> Binary file - no diff available.
>
> Propchange: incubator/gora/trunk/gora-cassandra/lib-ext/hector-core-0.8.0-1.jar
> ------------------------------------------------------------------------------
> svn:mime-type = application/octet-stream
>
> Added: incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraColumn.java
> URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraColumn.java?rev=1149420&view=auto
> ==============================================================================
> --- incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraColumn.java (added)
> +++ incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraColumn.java Fri Jul 22 00:33:59 2011
> @@ -0,0 +1,41 @@
> +package org.apache.gora.cassandra.query;
> +
> +import org.apache.avro.Schema.Field;
> +
> +
> +/**
> + * Represents a unit of data: a key value pair tagged by a family name
> + */
> +public abstract class CassandraColumn {
> + public static final int SUB = 0;
> + public static final int SUPER = 1;
> +
> + private String family;
> + private int type;
> + private Field field;
> +
> + public String getFamily() {
> + return family;
> + }
> + public void setFamily(String family) {
> + this.family = family;
> + }
> + public int getType() {
> + return type;
> + }
> + public void setType(int type) {
> + this.type = type;
> + }
> + public void setField(Field field) {
> + this.field = field;
> + }
> +
> + protected Field getField() {
> + return this.field;
> + }
> +
> + public abstract String getName();
> + public abstract Object getValue();
> +
> +
> +}
>
> Modified: incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraPartitionQuery.java
> URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraPartitionQuery.java?rev=1149420&r1=1149419&r2=1149420&view=diff
> ==============================================================================
> --- incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraPartitionQuery.java (original)
> +++ incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraPartitionQuery.java Fri Jul 22 00:33:59 2011
> @@ -1,93 +0,0 @@
> -/**
> - * Licensed to the Apache Software Foundation (ASF) under one
> - * or more contributor license agreements. See the NOTICE file
> - * distributed with this work for additional information
> - * regarding copyright ownership. The ASF licenses this file
> - * to you under the Apache License, Version 2.0 (the
> - * "License"); you may not use this file except in compliance
> - * with the License. You may obtain a copy of the License at
> - *
> - * http://www.apache.org/licenses/LICENSE-2.0
> - *
> - * Unless required by applicable law or agreed to in writing, software
> - * distributed under the License is distributed on an "AS IS" BASIS,
> - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> - * See the License for the specific language governing permissions and
> - * limitations under the License.
> - */
> -package org.apache.gora.cassandra.query;
> -
> -import java.io.DataInput;
> -import java.io.DataOutput;
> -import java.io.IOException;
> -
> -import org.apache.gora.persistency.Persistent;
> -import org.apache.gora.query.Query;
> -import org.apache.gora.query.impl.PartitionQueryImpl;
> -import org.apache.hadoop.io.Text;
> -
> -public class CassandraPartitionQuery<K, T extends Persistent>
> -extends PartitionQueryImpl<K, T> {
> -
> - private String startToken;
> -
> - private String endToken;
> -
> - private String[] endPoints;
> -
> - private int splitSize;
> -
> - public CassandraPartitionQuery() {
> - this.dataStore = null;
> - }
> -
> - public CassandraPartitionQuery(Query<K, T> baseQuery, String startToken, String endToken, String[] endPoints,
> - int splitSize) {
> - super(baseQuery);
> - this.startToken = startToken;
> - this.endToken = endToken;
> - this.endPoints = endPoints;
> - this.splitSize = splitSize;
> - }
> -
> - @Override
> - public void write(DataOutput out) throws IOException {
> - super.write(out);
> - Text.writeString(out, startToken);
> - Text.writeString(out, endToken);
> - out.writeInt(endPoints.length);
> - for (String endPoint : endPoints) {
> - Text.writeString(out, endPoint);
> - }
> - out.writeInt(splitSize);
> - }
> -
> - @Override
> - public void readFields(DataInput in) throws IOException {
> - super.readFields(in);
> - startToken = Text.readString(in);
> - endToken = Text.readString(in);
> - int size = in.readInt();
> - endPoints = new String[size];
> - for (int i = 0; i < size; i++) {
> - endPoints[i] = Text.readString(in);
> - }
> - splitSize = in.readInt();
> - }
> -
> - public String getStartToken() {
> - return startToken;
> - }
> -
> - public String getEndToken() {
> - return endToken;
> - }
> -
> - public String[] getEndPoints() {
> - return endPoints;
> - }
> -
> - public int getSplitSize() {
> - return splitSize;
> - }
> -}
>
> Modified: incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraQuery.java
> URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraQuery.java?rev=1149420&r1=1149419&r2=1149420&view=diff
> ==============================================================================
> --- incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraQuery.java (original)
> +++ incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraQuery.java Fri Jul 22 00:33:59 2011
> @@ -1,34 +1,55 @@
> -/**
> - * Licensed to the Apache Software Foundation (ASF) under one
> - * or more contributor license agreements. See the NOTICE file
> - * distributed with this work for additional information
> - * regarding copyright ownership. The ASF licenses this file
> - * to you under the Apache License, Version 2.0 (the
> - * "License"); you may not use this file except in compliance
> - * with the License. You may obtain a copy of the License at
> - *
> - * http://www.apache.org/licenses/LICENSE-2.0
> - *
> - * Unless required by applicable law or agreed to in writing, software
> - * distributed under the License is distributed on an "AS IS" BASIS,
> - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> - * See the License for the specific language governing permissions and
> - * limitations under the License.
> - */
> package org.apache.gora.cassandra.query;
>
> +import java.util.List;
> +import java.util.Map;
> +
> import org.apache.gora.persistency.Persistent;
> +import org.apache.gora.query.Query;
> import org.apache.gora.query.impl.QueryBase;
> import org.apache.gora.store.DataStore;
>
> -public class CassandraQuery<K, T extends Persistent>
> -extends QueryBase<K, T> {
> +public class CassandraQuery<K, T extends Persistent> extends QueryBase<K, T> {
>
> + private Query<K, T> query;
> +
> + /**
> + * Maps Avro fields to Cassandra columns.
> + */
> + private Map<String, List<String>> familyMap;
> +
> public CassandraQuery() {
> super(null);
> }
> -
> public CassandraQuery(DataStore<K, T> dataStore) {
> super(dataStore);
> }
> + public void setFamilyMap(Map<String, List<String>> familyMap) {
> + this.familyMap = familyMap;
> + }
> + public Map<String, List<String>> getFamilyMap() {
> + return familyMap;
> + }
> +
> + /**
> + * @param family the family name
> + * @return an array of the query column names belonging to the family
> + */
> + public String[] getColumns(String family) {
> +
> + List<String> columnList = familyMap.get(family);
> + String[] columns = new String[columnList.size()];
> + for (int i = 0; i < columns.length; ++i) {
> + columns[i] = columnList.get(i);
> + }
> + return columns;
> + }
> + public Query<K, T> getQuery() {
> + return query;
> + }
> + public void setQuery(Query<K, T> query) {
> + this.query = query;
> + }
> +
> +
> +
> }
>
> Modified: incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResult.java
> URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResult.java?rev=1149420&r1=1149419&r2=1149420&view=diff
> ==============================================================================
> --- incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResult.java (original)
> +++ incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResult.java Fri Jul 22 00:33:59 2011
> @@ -1,157 +1,97 @@
> -/**
> - * Licensed to the Apache Software Foundation (ASF) under one
> - * or more contributor license agreements. See the NOTICE file
> - * distributed with this work for additional information
> - * regarding copyright ownership. The ASF licenses this file
> - * to you under the Apache License, Version 2.0 (the
> - * "License"); you may not use this file except in compliance
> - * with the License. You may obtain a copy of the License at
> - *
> - * http://www.apache.org/licenses/LICENSE-2.0
> - *
> - * Unless required by applicable law or agreed to in writing, software
> - * distributed under the License is distributed on an "AS IS" BASIS,
> - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> - * See the License for the specific language governing permissions and
> - * limitations under the License.
> - */
> package org.apache.gora.cassandra.query;
>
> import java.io.IOException;
> -import java.net.InetAddress;
> -import java.net.UnknownHostException;
> -import java.util.Iterator;
> -
> -import org.apache.gora.cassandra.client.CassandraClient;
> -import org.apache.gora.cassandra.client.Row;
> -import org.apache.gora.cassandra.client.Select;
> -import org.apache.gora.cassandra.store.CassandraStore;
> +import java.util.List;
> +import java.util.Map;
> +
> +import org.apache.avro.Schema;
> +import org.apache.avro.Schema.Field;
> import org.apache.gora.persistency.Persistent;
> import org.apache.gora.query.Query;
> import org.apache.gora.query.impl.ResultBase;
> import org.apache.gora.store.DataStore;
> +import org.slf4j.Logger;
> +import org.slf4j.LoggerFactory;
>
> -public class CassandraResult<K, T extends Persistent>
> -extends ResultBase<K, T> {
> -
> - private Iterator<Row> rowIter;
> -
> - private CassandraStore<K, T> store;
> +public class CassandraResult<K, T extends Persistent> extends ResultBase<K, T> {
> + public static final Logger LOG = LoggerFactory.getLogger(CassandraResult.class);
> +
> + private int rowNumber;
> +
> + private CassandraResultSet cassandraResultSet;
> +
> + /**
> + * Maps Cassandra columns to Avro fields.
> + */
> + private Map<String, String> reverseMap;
>
> - private String[] fields;
> -
> - public CassandraResult(DataStore<K, T> dataStore, Query<K, T> query,
> - int batchRowCount) throws IOException {
> + public CassandraResult(DataStore<K, T> dataStore, Query<K, T> query) {
> super(dataStore, query);
> -
> - store = (CassandraStore<K, T>) dataStore;
> - fields = query.getFields();
> -
> - boolean isUsingTokens = (query instanceof CassandraPartitionQuery);
> - String startTokenOrKey;
> - String endTokenOrKey;
> -
> - if (isUsingTokens) {
> - CassandraPartitionQuery<K, T> partitionQuery = (CassandraPartitionQuery<K, T>) query;
> - startTokenOrKey = partitionQuery.getStartToken();
> - endTokenOrKey = partitionQuery.getEndToken();
> - } else {
> - CassandraQuery<K, T> cassandraQuery = (CassandraQuery<K, T>) query;
> - startTokenOrKey = cassandraQuery.getStartKey().toString();
> - endTokenOrKey = cassandraQuery.getEndKey().toString();
> - }
> -
> - Select select = store.createSelect(fields);
> -
> - CassandraClient client = store.getClientByLocation(getLocation(query));
> - if (isUsingTokens) {
> - rowIter =
> - client.getTokenRange(startTokenOrKey, endTokenOrKey,
> - batchRowCount, select).iterator();
> - } else {
> - rowIter = client.getRange(startTokenOrKey, endTokenOrKey,
> - batchRowCount, select).iterator();
> - }
> - }
> -
> - @Override
> - public float getProgress() throws IOException {
> - return 0;
> }
>
> @Override
> protected boolean nextInner() throws IOException {
> - if (!rowIter.hasNext()) {
> - return false;
> - }
> - Row row = rowIter.next();
> - if (row == null) {
> - return false;
> + if (this.rowNumber < this.cassandraResultSet.size()) {
> + updatePersistent();
> }
> -
> - key = toKey(row.getKey());
> - persistent = store.newInstance(row, fields);
> -
> - return true;
> + ++this.rowNumber;
> + return (this.rowNumber <= this.cassandraResultSet.size());
> }
>
> +
> + /**
> + * Load key/value pair from Cassandra row to Avro record.
> + * @throws IOException
> + */
> @SuppressWarnings("unchecked")
> - private K toKey(String keyStr) {
> - Class<K> keyClass = dataStore.getKeyClass();
> - if (keyClass.isAssignableFrom(String.class)) {
> - return (K) keyStr;
> - }
> - if (keyClass.isAssignableFrom(Integer.class)) {
> - return (K) (Integer) Integer.parseInt(keyStr);
> - }
> - if (keyClass.isAssignableFrom(Float.class)) {
> - return (K) (Float) Float.parseFloat(keyStr);
> - }
> - if (keyClass.isAssignableFrom(Double.class)) {
> - return (K) (Double) Double.parseDouble(keyStr);
> - }
> - if (keyClass.isAssignableFrom(Long.class)) {
> - return (K) (Long) Long.parseLong(keyStr);
> - }
> - if (keyClass.isAssignableFrom(Short.class)) {
> - return (K) (Short) Short.parseShort(keyStr);
> - }
> - if (keyClass.isAssignableFrom(Byte.class)) {
> - return (K) (Byte) Byte.parseByte(keyStr);
> + private void updatePersistent() throws IOException {
> + CassandraRow cassandraRow = this.cassandraResultSet.get(this.rowNumber);
> +
> + // load key
> + this.key = (K) cassandraRow.getKey();
> +
> + // load value
> + Schema schema = this.persistent.getSchema();
> + List<Field> fields = schema.getFields();
> +
> + for (CassandraColumn cassandraColumn: cassandraRow) {
> +
> + // get field name
> + String family = cassandraColumn.getFamily();
> + String fieldName = this.reverseMap.get(family + ":" + cassandraColumn.getName());
> +
> + // get field
> + int pos = this.persistent.getFieldIndex(fieldName);
> + Field field = fields.get(pos);
> +
> + // get value
> + cassandraColumn.setField(field);
> + Object value = cassandraColumn.getValue();
> +
> + this.persistent.put(pos, value);
> + // this field does not need to be written back to the store
> + this.persistent.clearDirty(pos);
> }
>
> - throw new RuntimeException("Can't parse " + keyStr +
> - " as an instance of " + keyClass);
> }
>
> @Override
> - public void close() throws IOException { }
> + public void close() throws IOException {
> + // TODO Auto-generated method stub
> +
> + }
>
> - private String getLocation(Query<K, T> query) {
> - if (!(query instanceof CassandraPartitionQuery)) {
> - return null;
> - }
> - CassandraPartitionQuery<K, T> partitonQuery =
> - (CassandraPartitionQuery<K, T>) query;
> - InetAddress[] localAddresses = new InetAddress[0];
> - try {
> - localAddresses = InetAddress.getAllByName(InetAddress.getLocalHost().getHostAddress());
> - } catch (UnknownHostException e) {
> - throw new AssertionError(e);
> - }
> - for (InetAddress address : localAddresses) {
> - for (String location : partitonQuery.getEndPoints()) {
> - InetAddress locationAddress = null;
> - try {
> - locationAddress = InetAddress.getByName(location);
> - } catch (UnknownHostException e) {
> - throw new AssertionError(e);
> - }
> - if (address.equals(locationAddress)) {
> - return location;
> - }
> - }
> - }
> - return partitonQuery.getEndPoints()[0];
> + @Override
> + public float getProgress() throws IOException {
> + return (((float) this.rowNumber) / this.cassandraResultSet.size());
> }
> -}
> \ No newline at end of file
> +
> + public void setResultSet(CassandraResultSet cassandraResultSet) {
> + this.cassandraResultSet = cassandraResultSet;
> + }
> +
> + public void setReverseMap(Map<String, String> reverseMap) {
> + this.reverseMap = reverseMap;
> + }
> +
> +}
>
> Added: incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResultSet.java
> URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResultSet.java?rev=1149420&view=auto
> ==============================================================================
> --- incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResultSet.java (added)
> +++ incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResultSet.java Fri Jul 22 00:33:59 2011
> @@ -0,0 +1,36 @@
> +package org.apache.gora.cassandra.query;
> +
> +import java.util.ArrayList;
> +import java.util.HashMap;
> +
> +/**
> + * List data structure to keep the order coming from the Cassandra selects.
> + */
> +public class CassandraResultSet extends ArrayList<CassandraRow> {
> +
> + /**
> + *
> + */
> + private static final long serialVersionUID = -7620939600192859652L;
> +
> + /**
> + * Maps keys to indices in the list.
> + */
> + private HashMap<String, Integer> indexMap = new HashMap<String, Integer>();
> +
> + public CassandraRow getRow(String key) {
> + Integer integer = this.indexMap.get(key);
> + if (integer == null) {
> + return null;
> + }
> +
> + return this.get(integer);
> + }
> +
> + public void putRow(String key, CassandraRow cassandraRow) {
> + this.add(cassandraRow);
> + this.indexMap.put(key, this.size()-1);
> + }
> +
> +
> +}
> \ No newline at end of file
>
> Added: incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraRow.java
> URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraRow.java?rev=1149420&view=auto
> ==============================================================================
> --- incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraRow.java (added)
> +++ incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraRow.java Fri Jul 22 00:33:59 2011
> @@ -0,0 +1,24 @@
> +package org.apache.gora.cassandra.query;
> +
> +import java.util.ArrayList;
> +
> +/**
> + * List of key value pairs representing a row, tagged by a key.
> + */
> +public class CassandraRow extends ArrayList<CassandraColumn> {
> +
> + /**
> + *
> + */
> + private static final long serialVersionUID = -7620939600192859652L;
> + private String key;
> +
> + public String getKey() {
> + return this.key;
> + }
> +
> + public void setKey(String key) {
> + this.key = key;
> + }
> +
> +}
>
> Added: incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSubColumn.java
> URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSubColumn.java?rev=1149420&view=auto
> ==============================================================================
> --- incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSubColumn.java (added)
> +++ incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSubColumn.java Fri Jul 22 00:33:59 2011
> @@ -0,0 +1,104 @@
> +package org.apache.gora.cassandra.query;
> +
> +import java.nio.ByteBuffer;
> +import java.nio.CharBuffer;
> +import java.nio.charset.CharacterCodingException;
> +import java.nio.charset.Charset;
> +import java.nio.charset.CharsetEncoder;
> +
> +import me.prettyprint.hector.api.beans.HColumn;
> +
> +import org.apache.avro.Schema;
> +import org.apache.avro.Schema.Field;
> +import org.apache.avro.Schema.Type;
> +import org.apache.avro.generic.GenericArray;
> +import org.apache.avro.generic.GenericData;
> +import org.apache.avro.util.Utf8;
> +import org.slf4j.Logger;
> +import org.slf4j.LoggerFactory;
> +
> +public class CassandraSubColumn extends CassandraColumn {
> + public static final Logger LOG = LoggerFactory.getLogger(CassandraSubColumn.class);
> +
> + private static final String ENCODING = "UTF-8";
> +
> + private static CharsetEncoder charsetEncoder = Charset.forName(ENCODING).newEncoder();;
> +
> +
> + /**
> + * Key-value pair containing the raw data.
> + */
> + private HColumn<String, String> hColumn;
> +
> + public String getName() {
> + return hColumn.getName();
> + }
> +
> + /**
> + * Deserialize a String into an typed Object, according to the field schema.
> + * @see org.apache.gora.cassandra.query.CassandraColumn#getValue()
> + */
> + public Object getValue() {
> + Field field = getField();
> + Schema fieldSchema = field.schema();
> + Type type = fieldSchema.getType();
> + String valueString = hColumn.getValue();
> + Object value = null;
> +
> + switch (type) {
> + case STRING:
> + value = new Utf8(valueString);
> + break;
> + case BYTES:
> + // convert string to bytebuffer
> + value = getByteBuffer(valueString);
> + break;
> + case INT:
> + value = Integer.parseInt(valueString);
> + break;
> + case LONG:
> + value = Long.parseLong(valueString);
> + break;
> + case FLOAT:
> + value = Float.parseFloat(valueString);
> + break;
> + case ARRAY:
> + // convert string to array
> + valueString = valueString.substring(1, valueString.length()-1);
> + String[] elements = valueString.split(", ");
> +
> + Type elementType = fieldSchema.getElementType().getType();
> + if (elementType == Schema.Type.STRING) {
> + // the array type is String
> + GenericArray<String> genericArray = new GenericData.Array<String>(elements.length, Schema.createArray(Schema.create(Schema.Type.STRING)));
> + for (String element: elements) {
> + genericArray.add(element);
> + }
> +
> + value = genericArray;
> + } else {
> + LOG.info("Element type not supported: " + elementType);
> + }
> + break;
> + default:
> + LOG.info("Type not supported: " + type);
> + }
> +
> + return value;
> +
> + }
> +
> + public void setValue(HColumn<String, String> hColumn) {
> + this.hColumn = hColumn;
> + }
> +
> + public static ByteBuffer getByteBuffer(String valueString) {
> + ByteBuffer byteBuffer = null;
> + try {
> + byteBuffer = charsetEncoder.encode(CharBuffer.wrap(valueString));
> + } catch (CharacterCodingException cce) {
> + LOG.warn("Unable to encode " + valueString + " into " + ENCODING);
> + }
> + return byteBuffer;
> + }
> +}
>
> Added: incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSuperColumn.java
> URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSuperColumn.java?rev=1149420&view=auto
> ==============================================================================
> --- incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSuperColumn.java (added)
> +++ incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSuperColumn.java Fri Jul 22 00:33:59 2011
> @@ -0,0 +1,102 @@
> +package org.apache.gora.cassandra.query;
> +
> +import java.util.Map;
> +
> +import me.prettyprint.hector.api.beans.HColumn;
> +import me.prettyprint.hector.api.beans.HSuperColumn;
> +
> +import org.apache.avro.Schema;
> +import org.apache.avro.Schema.Field;
> +import org.apache.avro.Schema.Type;
> +import org.apache.avro.util.Utf8;
> +import org.apache.gora.persistency.StatefulHashMap;
> +import org.apache.gora.persistency.impl.PersistentBase;
> +import org.slf4j.Logger;
> +import org.slf4j.LoggerFactory;
> +
> +public class CassandraSuperColumn extends CassandraColumn {
> + public static final Logger LOG = LoggerFactory.getLogger(CassandraSuperColumn.class);
> +
> + private HSuperColumn<String, String, String> hSuperColumn;
> +
> + public String getName() {
> + return hSuperColumn.getName();
> + }
> +
> + public Object getValue() {
> + Field field = getField();
> + Schema fieldSchema = field.schema();
> + Type type = fieldSchema.getType();
> +
> + Object value = null;
> +
> + switch (type) {
> + case MAP:
> + Map<Utf8, Object> map = new StatefulHashMap<Utf8, Object>();
> + Type valueType = fieldSchema.getValueType().getType();
> +
> + for (HColumn<String, String> hColumn : this.hSuperColumn.getColumns()) {
> + String memberString = hColumn.getValue();
> + Object memberValue = null;
> + switch (valueType) {
> + case STRING:
> + memberValue = new Utf8(memberString);
> + break;
> + case BYTES:
> + memberValue = CassandraSubColumn.getByteBuffer(memberString);
> + break;
> + default:
> + LOG.info("Type for the map value is not supported: " + valueType);
> +
> + }
> + map.put(new Utf8(hColumn.getName()), memberValue);
> + }
> + value = map;
> +
> + break;
> + case RECORD:
> + String fullName = fieldSchema.getFullName();
> +
> + Class<?> claz = null;
> + try {
> + claz = Class.forName(fullName);
> + } catch (ClassNotFoundException cnfe) {
> + LOG.warn("Unable to load class " + fullName, cnfe);
> + break;
> + }
> +
> + try {
> + value = claz.newInstance();
> + } catch (InstantiationException ie) {
> + LOG.warn("Instantiation error", ie);
> + break;
> + } catch (IllegalAccessException iae) {
> + LOG.warn("Illegal access error", iae);
> + break;
> + }
> +
> + // we updated the value instance, now update its members
> + if (value instanceof PersistentBase) {
> + PersistentBase record = (PersistentBase) value;
> +
> + for (HColumn<String, String> hColumn : this.hSuperColumn.getColumns()) {
> + Field memberField = fieldSchema.getField(hColumn.getName());
> + CassandraSubColumn cassandraColumn = new CassandraSubColumn();
> + cassandraColumn.setField(memberField);
> + cassandraColumn.setValue(hColumn);
> + record.put(record.getFieldIndex(hColumn.getName()), cassandraColumn.getValue());
> + }
> + }
> + break;
> + default:
> + LOG.info("Type not supported: " + type);
> + }
> +
> + return value;
> + }
> +
> + public void setValue(HSuperColumn<String, String, String> hSuperColumn) {
> + this.hSuperColumn = hSuperColumn;
> + }
> +
> +}
>
> Added: incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java
> URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java?rev=1149420&view=auto
> ==============================================================================
> --- incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java (added)
> +++ incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java Fri Jul 22 00:33:59 2011
> @@ -0,0 +1,253 @@
> +package org.apache.gora.cassandra.store;
> +
> +import java.nio.ByteBuffer;
> +import java.util.ArrayList;
> +import java.util.Arrays;
> +import java.util.HashMap;
> +import java.util.List;
> +import java.util.Map;
> +
> +import me.prettyprint.cassandra.serializers.StringSerializer;
> +import me.prettyprint.cassandra.service.CassandraHostConfigurator;
> +import me.prettyprint.hector.api.Cluster;
> +import me.prettyprint.hector.api.Keyspace;
> +import me.prettyprint.hector.api.Serializer;
> +import me.prettyprint.hector.api.beans.OrderedRows;
> +import me.prettyprint.hector.api.beans.OrderedSuperRows;
> +import me.prettyprint.hector.api.beans.Row;
> +import me.prettyprint.hector.api.beans.SuperRow;
> +import me.prettyprint.hector.api.ddl.ColumnFamilyDefinition;
> +import me.prettyprint.hector.api.ddl.KeyspaceDefinition;
> +import me.prettyprint.hector.api.factory.HFactory;
> +import me.prettyprint.hector.api.mutation.Mutator;
> +import me.prettyprint.hector.api.query.QueryResult;
> +import me.prettyprint.hector.api.query.RangeSlicesQuery;
> +import me.prettyprint.hector.api.query.RangeSuperSlicesQuery;
> +
> +import org.apache.gora.cassandra.query.CassandraQuery;
> +import org.apache.gora.mapreduce.GoraRecordReader;
> +import org.apache.gora.persistency.Persistent;
> +import org.apache.gora.query.Query;
> +import org.apache.gora.util.ByteUtils;
> +import org.slf4j.Logger;
> +import org.slf4j.LoggerFactory;
> +
> +public class CassandraClient<K, T extends Persistent> {
> + public static final Logger LOG = LoggerFactory.getLogger(CassandraClient.class);
> +
> + private Cluster cluster;
> + private Keyspace keyspace;
> + private Mutator<String> mutator;
> +
> + private CassandraMapping cassandraMapping = new CassandraMapping();
> +
> + private Serializer<String> stringSerializer = new StringSerializer();
> +
> + public void init() throws Exception {
> + this.cassandraMapping.loadConfiguration();
> + this.cluster = HFactory.getOrCreateCluster(this.cassandraMapping.getClusterName(), new CassandraHostConfigurator(this.cassandraMapping.getHostName()));
> +
> + // add keyspace to cluster
> + checkKeyspace();
> +
> + // Just create a Keyspace object on the client side, corresponding to an already existing keyspace with already created column families.
> + this.keyspace = HFactory.createKeyspace(this.cassandraMapping.getKeyspaceName(), this.cluster);
> +
> + this.mutator = HFactory.createMutator(this.keyspace, this.stringSerializer);
> + }
> +
> + /**
> + * Check if keyspace already exists. If not, create it.
> + */
> + public void checkKeyspace() {
> + // "describe keyspace <keyspaceName>;" query
> + KeyspaceDefinition keyspaceDefinition = this.cluster.describeKeyspace(this.cassandraMapping.getKeyspaceName());
> + if (keyspaceDefinition == null) {
> + List<ColumnFamilyDefinition> columnFamilyDefinitions = this.cassandraMapping.getColumnFamilyDefinitions();
> + keyspaceDefinition = HFactory.createKeyspaceDefinition(this.cassandraMapping.getKeyspaceName(), "org.apache.cassandra.locator.SimpleStrategy", 1, columnFamilyDefinitions);
> + this.cluster.addKeyspace(keyspaceDefinition);
> + LOG.info("Keyspace '" + this.cassandraMapping.getKeyspaceName() + "' in cluster '" + this.cassandraMapping.getClusterName() + "' was created on host '" + this.cassandraMapping.getHostName() + "'");
> +
> + keyspaceDefinition = null;
> + }
> +
> +
> + }
> +
> + /**
> + * Drop keyspace.
> + */
> + public void dropKeyspace() {
> + // "drop keyspace <keyspaceName>;" query
> + this.cluster.dropKeyspace(this.cassandraMapping.getKeyspaceName());
> + }
> +
> + /**
> + * Insert a field in a column.
> + * @param key the row key
> + * @param fieldName the field name
> + * @param value the field value.
> + */
> + public void addColumn(String key, String fieldName, Object value) {
> + if (value == null) {
> + return;
> + }
> + if (value instanceof ByteBuffer) {
> + value = toString((ByteBuffer) value);
> + }
> +
> + String columnFamily = this.cassandraMapping.getFamily(fieldName);
> + String columnName = this.cassandraMapping.getColumn(fieldName);
> +
> + this.mutator.insert(key, columnFamily, HFactory.createStringColumn(columnName, value.toString()));
> + }
> +
> + /**
> + * TODO do no convert bytes to string to store a binary field
> + * @param value
> + * @return
> + */
> + private static String toString(ByteBuffer value) {
> + ByteBuffer byteBuffer = (ByteBuffer) value;
> + return ByteUtils.toString(byteBuffer.array(), 0, byteBuffer.limit());
> + }
> +
> + /**
> + * Insert a member in a super column. This is used for map and record Avro types.
> + * @param key the row key
> + * @param fieldName the field name
> + * @param memberName the member name
> + * @param value the member value
> + */
> + public void addSubColumn(String key, String fieldName, String memberName, Object value) {
> + if (value == null) {
> + return;
> + }
> +
> + if (value instanceof ByteBuffer) {
> + value = toString((ByteBuffer) value);
> + }
> +
> + String columnFamily = this.cassandraMapping.getFamily(fieldName);
> + String superColumnName = this.cassandraMapping.getColumn(fieldName);
> +
> + this.mutator.insert(key, columnFamily, HFactory.createSuperColumn(superColumnName, Arrays.asList(HFactory.createStringColumn(memberName, value.toString())), this.stringSerializer, this.stringSerializer, this.stringSerializer));
> +
> + }
> +
> + /**
> + * Select a family column in the keyspace.
> + * @param cassandraQuery a wrapper of the query
> + * @param family the family name to be queried
> + * @return a list of family rows
> + */
> + public List<Row<String, String, String>> execute(CassandraQuery<K, T> cassandraQuery, String family) {
> +
> + String[] columnNames = cassandraQuery.getColumns(family);
> + Query<K, T> query = cassandraQuery.getQuery();
> + int limit = (int) query.getLimit();
> + String startKey = (String) query.getStartKey();
> + String endKey = (String) query.getEndKey();
> +
> + if (startKey == null) {
> + startKey = "";
> + }
> + if (endKey == null) {
> + endKey = "";
> + }
> +
> +
> + RangeSlicesQuery<String, String, String> rangeSlicesQuery = HFactory.createRangeSlicesQuery(this.keyspace, this.stringSerializer, stringSerializer, stringSerializer);
> + rangeSlicesQuery.setColumnFamily(family);
> + rangeSlicesQuery.setKeys(startKey, endKey);
> + rangeSlicesQuery.setRange("", "", false, GoraRecordReader.BUFFER_LIMIT_READ_VALUE);
> + rangeSlicesQuery.setRowCount(limit);
> + rangeSlicesQuery.setColumnNames(columnNames);
> +
> +
> + QueryResult<OrderedRows<String, String, String>> queryResult = rangeSlicesQuery.execute();
> + OrderedRows<String, String, String> orderedRows = queryResult.get();
> +
> +
> + return orderedRows.getList();
> + }
> +
> + /**
> + * Select the families that contain at least one column mapped to a query field.
> + * @param query indicates the columns to select
> + * @return a map which keys are the family names and values the corresponding column names required to get all the query fields.
> + */
> + public Map<String, List<String>> getFamilyMap(Query<K, T> query) {
> + Map<String, List<String>> map = new HashMap<String, List<String>>();
> + for (String field: query.getFields()) {
> + String family = this.cassandraMapping.getFamily(field);
> + String column = this.cassandraMapping.getColumn(field);
> +
> + // check if the family value was already initialized
> + List<String> list = map.get(family);
> + if (list == null) {
> + list = new ArrayList<String>();
> + map.put(family, list);
> + }
> +
> + if (column != null) {
> + list.add(column);
> + }
> +
> + }
> +
> + return map;
> + }
> +
> + /**
> + * Select the field names according to the column names, which format if fully qualified: "family:column"
> + * @param query
> + * @return a map which keys are the fully qualified column names and values the query fields
> + */
> + public Map<String, String> getReverseMap(Query<K, T> query) {
> + Map<String, String> map = new HashMap<String, String>();
> + for (String field: query.getFields()) {
> + String family = this.cassandraMapping.getFamily(field);
> + String column = this.cassandraMapping.getColumn(field);
> +
> + map.put(family + ":" + column, field);
> + }
> +
> + return map;
> +
> + }
> +
> + public boolean isSuper(String family) {
> + return this.cassandraMapping.isSuper(family);
> + }
> +
> + public List<SuperRow<String, String, String, String>> executeSuper(CassandraQuery<K, T> cassandraQuery, String family) {
> + String[] columnNames = cassandraQuery.getColumns(family);
> + Query<K, T> query = cassandraQuery.getQuery();
> + int limit = (int) query.getLimit();
> + String startKey = (String) query.getStartKey();
> + String endKey = (String) query.getEndKey();
> +
> + if (startKey == null) {
> + startKey = "";
> + }
> + if (endKey == null) {
> + endKey = "";
> + }
> +
> +
> + RangeSuperSlicesQuery<String, String, String, String> rangeSuperSlicesQuery = HFactory.createRangeSuperSlicesQuery(this.keyspace, this.stringSerializer, this.stringSerializer, this.stringSerializer, this.stringSerializer);
> + rangeSuperSlicesQuery.setColumnFamily(family);
> + rangeSuperSlicesQuery.setKeys(startKey, endKey);
> + rangeSuperSlicesQuery.setRange("", "", false, GoraRecordReader.BUFFER_LIMIT_READ_VALUE);
> + rangeSuperSlicesQuery.setRowCount(limit);
> + rangeSuperSlicesQuery.setColumnNames(columnNames);
> +
> +
> + QueryResult<OrderedSuperRows<String, String, String, String>> queryResult = rangeSuperSlicesQuery.execute();
> + OrderedSuperRows<String, String, String, String> orderedRows = queryResult.get();
> + return orderedRows.getList();
> +
> +
> + }
> +}
>
> Modified: incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java
> URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java?rev=1149420&r1=1149419&r2=1149420&view=diff
> ==============================================================================
> --- incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java (original)
> +++ incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java Fri Jul 22 00:33:59 2011
> @@ -1,50 +1,155 @@
> -/**
> - * Licensed to the Apache Software Foundation (ASF) under one
> - * or more contributor license agreements. See the NOTICE file
> - * distributed with this work for additional information
> - * regarding copyright ownership. The ASF licenses this file
> - * to you under the Apache License, Version 2.0 (the
> - * "License"); you may not use this file except in compliance
> - * with the License. You may obtain a copy of the License at
> - *
> - * http://www.apache.org/licenses/LICENSE-2.0
> - *
> - * Unless required by applicable law or agreed to in writing, software
> - * distributed under the License is distributed on an "AS IS" BASIS,
> - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> - * See the License for the specific language governing permissions and
> - * limitations under the License.
> - */
> package org.apache.gora.cassandra.store;
>
> +import java.io.IOException;
> +import java.util.ArrayList;
> import java.util.HashMap;
> +import java.util.List;
> import java.util.Map;
> -import java.util.Set;
> +
> +import me.prettyprint.cassandra.model.BasicColumnFamilyDefinition;
> +import me.prettyprint.cassandra.service.ThriftCfDef;
> +import me.prettyprint.hector.api.ddl.ColumnFamilyDefinition;
> +import me.prettyprint.hector.api.ddl.ColumnType;
> +import me.prettyprint.hector.api.ddl.ComparatorType;
> +
> +import org.jdom.Document;
> +import org.jdom.Element;
> +import org.jdom.JDOMException;
> +import org.jdom.input.SAXBuilder;
> +import org.slf4j.Logger;
> +import org.slf4j.LoggerFactory;
>
> public class CassandraMapping {
> +
> + public static final Logger LOG = LoggerFactory.getLogger(CassandraMapping.class);
> +
> + private static final String MAPPING_FILE = "gora-cassandra-mapping.xml";
> + private static final String KEYSPACE_ELEMENT = "keyspace";
> + private static final String NAME_ATTRIBUTE = "name";
> + private static final String MAPPING_ELEMENT = "class";
> + private static final String COLUMN_ATTRIBUTE = "qualifier";
> + private static final String FAMILY_ATTRIBUTE = "family";
> + private static final String SUPER_ATTRIBUTE = "type";
> + private static final String CLUSTER_ATTRIBUTE = "cluster";
> + private static final String HOST_ATTRIBUTE = "host";
> +
> +
> + private String hostName;
> + private String clusterName;
> + private String keyspaceName;
> +
> +
> + /**
> + * List of the super column families.
> + */
> + private List<String> superFamilies = new ArrayList<String>();
> +
> + /**
> + * Look up the column family associated to the Avro field.
> + */
> + private Map<String, String> familyMap = new HashMap<String, String>();
> +
> + /**
> + * Look up the column associated to the Avro field.
> + */
> + private Map<String, String> columnMap = new HashMap<String, String>();
> +
> + /**
> + * Look up the column family from its name.
> + */
> + private Map<String, BasicColumnFamilyDefinition> columnFamilyDefinitions = new HashMap<String, BasicColumnFamilyDefinition>();
>
> - private String keySpace;
> + public String getHostName() {
> + return this.hostName;
> + }
>
> - private Map<String, Boolean> families =
> - new HashMap<String, Boolean>();
> + public String getClusterName() {
> + return this.clusterName;
> + }
>
> - public String getKeySpace() {
> - return keySpace;
> + public String getKeyspaceName() {
> + return this.keyspaceName;
> }
>
> - public void setKeySpace(String keySpace) {
> - this.keySpace = keySpace;
> +
> + @SuppressWarnings("unchecked")
> + public void loadConfiguration() throws JDOMException, IOException {
> + SAXBuilder saxBuilder = new SAXBuilder();
> + Document document = saxBuilder.build(getClass().getClassLoader().getResourceAsStream(MAPPING_FILE));
> + Element root = document.getRootElement();
> +
> + Element keyspace = root.getChild(KEYSPACE_ELEMENT);
> + this.keyspaceName = keyspace.getAttributeValue(NAME_ATTRIBUTE);
> + this.clusterName = keyspace.getAttributeValue(CLUSTER_ATTRIBUTE);
> + this.hostName = keyspace.getAttributeValue(HOST_ATTRIBUTE);
> +
> + // load column family definitions
> + List<Element> elements = keyspace.getChildren();
> + for (Element element: elements) {
> + BasicColumnFamilyDefinition cfDef = new BasicColumnFamilyDefinition();
> +
> + String familyName = element.getAttributeValue(NAME_ATTRIBUTE);
> +
> + String superAttribute = element.getAttributeValue(SUPER_ATTRIBUTE);
> + if (superAttribute != null) {
> + this.superFamilies.add(familyName);
> + cfDef.setColumnType(ColumnType.SUPER);
> + cfDef.setSubComparatorType(ComparatorType.UTF8TYPE);
> + }
> +
> + cfDef.setKeyspaceName(this.keyspaceName);
> + cfDef.setName(familyName);
> + cfDef.setComparatorType(ComparatorType.UTF8TYPE);
> + cfDef.setDefaultValidationClass(ComparatorType.UTF8TYPE.getClassName());
> +
> + this.columnFamilyDefinitions.put(familyName, cfDef);
> +
> + }
> +
> + // load column definitions
> + Element mapping = root.getChild(MAPPING_ELEMENT);
> + elements = mapping.getChildren();
> + for (Element element: elements) {
> + String fieldName = element.getAttributeValue(NAME_ATTRIBUTE);
> + String familyName = element.getAttributeValue(FAMILY_ATTRIBUTE);
> + String columnName = element.getAttributeValue(COLUMN_ATTRIBUTE);
> + BasicColumnFamilyDefinition columnFamilyDefinition = this.columnFamilyDefinitions.get(familyName);
> + if (columnFamilyDefinition == null) {
> + LOG.warn("Family " + familyName + " was not declared in the keyspace.");
> + }
> +
> + this.familyMap.put(fieldName, familyName);
> + this.columnMap.put(fieldName, columnName);
> +
> + }
> }
>
> - public Set<String> getColumnFamilies() {
> - return families.keySet();
> + public String getFamily(String name) {
> + return this.familyMap.get(name);
> }
>
> - public void addColumnFamily(String columnFamily, boolean isSuper) {
> - families.put(columnFamily, isSuper);
> + public String getColumn(String name) {
> + return this.columnMap.get(name);
> }
>
> - public boolean isColumnFamilySuper(String columnFamily) {
> - return families.get(columnFamily);
> + /**
> + * Read family super attribute.
> + * @param family the family name
> + * @return true is the family is a super column family
> + */
> + public boolean isSuper(String family) {
> + return this.superFamilies.indexOf(family) != -1;
> }
> +
> + public List<ColumnFamilyDefinition> getColumnFamilyDefinitions() {
> + List<ColumnFamilyDefinition> list = new ArrayList<ColumnFamilyDefinition>();
> + for (String key: this.columnFamilyDefinitions.keySet()) {
> + ColumnFamilyDefinition columnFamilyDefinition = this.columnFamilyDefinitions.get(key);
> + ThriftCfDef thriftCfDef = new ThriftCfDef(columnFamilyDefinition);
> + list.add(thriftCfDef);
> + }
> +
> + return list;
> + }
> +
> }
>
> Modified: incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
> URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java?rev=1149420&r1=1149419&r2=1149420&view=diff
> ==============================================================================
> --- incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java (original)
> +++ incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java Fri Jul 22 00:33:59 2011
> @@ -1,465 +1,334 @@
> -/**
> - * Licensed to the Apache Software Foundation (ASF) under one
> - * or more contributor license agreements. See the NOTICE file
> - * distributed with this work for additional information
> - * regarding copyright ownership. The ASF licenses this file
> - * to you under the Apache License, Version 2.0 (the
> - * "License"); you may not use this file except in compliance
> - * with the License. You may obtain a copy of the License at
> - *
> - * http://www.apache.org/licenses/LICENSE-2.0
> - *
> - * Unless required by applicable law or agreed to in writing, software
> - * distributed under the License is distributed on an "AS IS" BASIS,
> - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> - * See the License for the specific language governing permissions and
> - * limitations under the License.
> - */
> package org.apache.gora.cassandra.store;
>
> import java.io.IOException;
> -import java.net.InetAddress;
> import java.util.ArrayList;
> import java.util.HashMap;
> import java.util.List;
> import java.util.Map;
> -import java.util.Map.Entry;
> -import java.util.Properties;
> -import java.util.Set;
> +
> +import me.prettyprint.hector.api.beans.ColumnSlice;
> +import me.prettyprint.hector.api.beans.HColumn;
> +import me.prettyprint.hector.api.beans.HSuperColumn;
> +import me.prettyprint.hector.api.beans.Row;
> +import me.prettyprint.hector.api.beans.SuperRow;
> +import me.prettyprint.hector.api.beans.SuperSlice;
>
> import org.apache.avro.Schema;
> import org.apache.avro.Schema.Field;
> import org.apache.avro.Schema.Type;
> import org.apache.avro.generic.GenericArray;
> import org.apache.avro.util.Utf8;
> -import org.apache.cassandra.thrift.TokenRange;
> -import org.apache.gora.cassandra.client.CassandraClient;
> -import org.apache.gora.cassandra.client.Mutate;
> -import org.apache.gora.cassandra.client.Row;
> -import org.apache.gora.cassandra.client.Select;
> -import org.apache.gora.cassandra.client.SimpleCassandraClient;
> -import org.apache.gora.cassandra.query.CassandraPartitionQuery;
> import org.apache.gora.cassandra.query.CassandraQuery;
> import org.apache.gora.cassandra.query.CassandraResult;
> -import org.apache.gora.persistency.ListGenericArray;
> +import org.apache.gora.cassandra.query.CassandraResultSet;
> +import org.apache.gora.cassandra.query.CassandraRow;
> +import org.apache.gora.cassandra.query.CassandraSubColumn;
> +import org.apache.gora.cassandra.query.CassandraSuperColumn;
> import org.apache.gora.persistency.Persistent;
> -import org.apache.gora.persistency.State;
> -import org.apache.gora.persistency.StateManager;
> import org.apache.gora.persistency.StatefulHashMap;
> -import org.apache.gora.persistency.StatefulMap;
> +import org.apache.gora.persistency.impl.PersistentBase;
> +import org.apache.gora.persistency.impl.StateManagerImpl;
> import org.apache.gora.query.PartitionQuery;
> import org.apache.gora.query.Query;
> import org.apache.gora.query.Result;
> -import org.apache.gora.store.DataStoreFactory;
> +import org.apache.gora.query.impl.PartitionQueryImpl;
> import org.apache.gora.store.impl.DataStoreBase;
> -import org.apache.gora.util.ByteUtils;
> -import org.jdom.Document;
> -import org.jdom.Element;
> -import org.jdom.input.SAXBuilder;
> -
> -/**
> - * DataStore for Cassandra.
> - *
> - * <p> Note: CassandraStore is not thread-safe. </p>
> - */
> -public class CassandraStore<K, T extends Persistent>
> -extends DataStoreBase<K, T> {
> -
> - private static final String ERROR_MESSAGE =
> - "Cassandra does not support creating or modifying ColumnFamilies during runtime";
> -
> - private static final String DEFAULT_MAPPING_FILE = "gora-cassandra-mapping.xml";
> -
> - private static final int SPLIT_SIZE = 65536;
> -
> - private static final int BATCH_COUNT = 256;
> -
> - private CassandraClient client;
> -
> - private Map<String, CassandraColumn> columnMap;
> +import org.slf4j.Logger;
> +import org.slf4j.LoggerFactory;
>
> - private CassandraMapping mapping;
> -
> - @Override
> - public void initialize(Class<K> keyClass, Class<T> persistentClass,
> - Properties properties) throws IOException {
> - super.initialize(keyClass, persistentClass, properties);
> -
> - String mappingFile =
> - DataStoreFactory.getMappingFile(properties, this, DEFAULT_MAPPING_FILE);
> -
> - readMapping(mappingFile);
> +public class CassandraStore<K, T extends Persistent> extends DataStoreBase<K, T> {
> + public static final Logger LOG = LoggerFactory.getLogger(CassandraStore.class);
> +
> + private CassandraClient<K, T> cassandraClient = new CassandraClient<K, T>();
> +
> + /**
> + * The values are Avro fields pending to be stored.
> + */
> + private Map<K, T> buffer = new HashMap<K, T>();
> +
> + public CassandraStore() throws Exception {
> + this.cassandraClient.init();
> }
>
> @Override
> - public String getSchemaName() {
> - return mapping.getKeySpace();
> + public void close() throws IOException {
> + LOG.debug("close");
> + flush();
> }
>
> @Override
> - public void createSchema() throws IOException {
> - throw new UnsupportedOperationException(ERROR_MESSAGE);
> + public void createSchema() {
> + LOG.debug("create schema");
> + this.cassandraClient.checkKeyspace();
> }
>
> @Override
> - public void deleteSchema() throws IOException {
> - throw new UnsupportedOperationException(ERROR_MESSAGE);
> + public boolean delete(K key) throws IOException {
> + LOG.debug("delete " + key);
> + return false;
> }
>
> @Override
> - public boolean schemaExists() throws IOException {
> - return true;
> + public long deleteByQuery(Query<K, T> query) throws IOException {
> + LOG.debug("delete by query " + query);
> + return 0;
> }
>
> - public CassandraClient getClientByLocation(String endPoint) {
> - return client;
> + @Override
> + public void deleteSchema() throws IOException {
> + LOG.debug("delete schema");
> + this.cassandraClient.dropKeyspace();
> }
>
> - public Select createSelect(String[] fields) {
> - Select select = new Select();
> - if (fields == null) {
> - fields = beanFactory.getCachedPersistent().getFields();
> - }
> - for (String f : fields) {
> - CassandraColumn col = columnMap.get(f);
> - Schema fieldSchema = fieldMap.get(f).schema();
> - switch (fieldSchema.getType()) {
> - case MAP:
> - case ARRAY:
> - if (col.isSuperColumn()) {
> - select.addAllColumnsForSuperColumn(col.family, col.superColumn);
> - } else {
> - select.addColumnAll(col.family);
> - }
> - break;
> - default:
> - if (col.isSuperColumn()) {
> - select.addColumnName(col.family, col.superColumn, col.column);
> - } else {
> - select.addColumnName(col.family, col.column);
> - }
> - break;
> + @Override
> + public Result<K, T> execute(Query<K, T> query) throws IOException {
> +
> + Map<String, List<String>> familyMap = this.cassandraClient.getFamilyMap(query);
> + Map<String, String> reverseMap = this.cassandraClient.getReverseMap(query);
> +
> + CassandraQuery<K, T> cassandraQuery = new CassandraQuery<K, T>();
> + cassandraQuery.setQuery(query);
> + cassandraQuery.setFamilyMap(familyMap);
> +
> + CassandraResult<K, T> cassandraResult = new CassandraResult<K, T>(this, query);
> + cassandraResult.setReverseMap(reverseMap);
> +
> + CassandraResultSet cassandraResultSet = new CassandraResultSet();
> +
> + // We query Cassandra keyspace by families.
> + for (String family : familyMap.keySet()) {
> + if (this.cassandraClient.isSuper(family)) {
> + addSuperColumns(family, cassandraQuery, cassandraResultSet);
> +
> + } else {
> + addSubColumns(family, cassandraQuery, cassandraResultSet);
> +
> }
> +
> }
> - return select;
> - }
> +
> + cassandraResult.setResultSet(cassandraResultSet);
> +
> +
> + return cassandraResult;
>
> - @Override
> - public T get(K key, String[] fields) throws IOException {
> - if (fields == null) {
> - fields = beanFactory.getCachedPersistent().getFields();
> - }
> - Select select = createSelect(fields);
> - try {
> - Row result = client.get(key.toString(), select);
> - return newInstance(result, fields);
> - } catch (Exception e) {
> - throw new IOException(e);
> - }
> }
>
> - @SuppressWarnings("rawtypes")
> - private void setField(T persistent, Field field, StatefulMap map) {
> - persistent.put(field.pos(), map);
> - }
> -
> - private void setField(T persistent, Field field, byte[] val)
> - throws IOException {
> - persistent.put(field.pos()
> - , ByteUtils.fromBytes(val, field.schema(), datumReader, persistent.get(field.pos())));
> - }
> -
> - @SuppressWarnings("rawtypes")
> - private void setField(T persistent, Field field, GenericArray list) {
> - persistent.put(field.pos(), list);
> - }
> -
> - @SuppressWarnings({ "rawtypes", "unchecked" })
> - public T newInstance(Row result, String[] fields)
> - throws IOException {
> - if(result == null)
> - return null;
> -
> - T persistent = newPersistent();
> - StateManager stateManager = persistent.getStateManager();
> - for (String f : fields) {
> - CassandraColumn col = columnMap.get(f);
> - Field field = fieldMap.get(f);
> - Schema fieldSchema = field.schema();
> - Map<String, byte[]> qualMap;
> - switch(fieldSchema.getType()) {
> - case MAP:
> - if (col.isSuperColumn()) {
> - qualMap = result.getSuperColumn(col.family, col.superColumn);
> - } else {
> - qualMap = result.getColumn(col.family);
> - }
> - if (qualMap == null) {
> - continue;
> - }
> - Schema valueSchema = fieldSchema.getValueType();
> - StatefulMap map = new StatefulHashMap();
> - for (Entry<String, byte[]> e : qualMap.entrySet()) {
> - Utf8 mapKey = new Utf8(e.getKey());
> - map.put(mapKey, ByteUtils.fromBytes(e.getValue(), valueSchema, datumReader, null));
> - map.putState(mapKey, State.CLEAN);
> - }
> - setField(persistent, field, map);
> - break;
> - case ARRAY:
> - if (col.isSuperColumn()) {
> - qualMap = result.getSuperColumn(col.family, col.superColumn);
> - } else {
> - qualMap = result.getColumn(col.family);
> - }
> - if (qualMap == null) {
> - continue;
> - }
> - valueSchema = fieldSchema.getElementType();
> - ArrayList arrayList = new ArrayList();
> - for (Entry<String, byte[]> e : qualMap.entrySet()) {
> - arrayList.add(ByteUtils.fromBytes(e.getValue(), valueSchema, datumReader, null));
> - }
> - ListGenericArray arr = new ListGenericArray(fieldSchema, arrayList);
> - setField(persistent, field, arr);
> - break;
> - default:
> - byte[] val;
> - if (col.isSuperColumn()) {
> - val = result.get(col.family, col.superColumn, col.column);
> - } else {
> - val = result.get(col.family, col.column);
> - }
> - if (val == null) {
> - continue;
> - }
> - setField(persistent, field, val);
> - break;
> + private void addSubColumns(String family, CassandraQuery<K, T> cassandraQuery,
> + CassandraResultSet cassandraResultSet) {
> + // select family columns that are included in the query
> + List<Row<String, String, String>> rows = this.cassandraClient.execute(cassandraQuery, family);
> +
> + for (Row<String, String, String> row : rows) {
> + String key = row.getKey();
> +
> + // find associated row in the resultset
> + CassandraRow cassandraRow = cassandraResultSet.getRow(key);
> + if (cassandraRow == null) {
> + cassandraRow = new CassandraRow();
> + cassandraResultSet.putRow(key, cassandraRow);
> + cassandraRow.setKey(key);
> }
> +
> + ColumnSlice<String, String> columnSlice = row.getColumnSlice();
> +
> + for (HColumn<String, String> hColumn : columnSlice.getColumns()) {
> + CassandraSubColumn cassandraSubColumn = new CassandraSubColumn();
> + cassandraSubColumn.setValue(hColumn);
> + cassandraSubColumn.setFamily(family);
> + cassandraRow.add(cassandraSubColumn);
> + }
> +
> }
> - stateManager.clearDirty(persistent);
> - return persistent;
> }
>
> - @Override
> - public void put(K key, T obj) throws IOException {
> - Mutate mutate = new Mutate();
> - Schema schema = obj.getSchema();
> - StateManager stateManager = obj.getStateManager();
> - List<Field> fields = schema.getFields();
> - String qual;
> - byte[] value;
> - for (int i = 0; i < fields.size(); i++) {
> - if (!stateManager.isDirty(obj, i)) {
> - continue;
> + private void addSuperColumns(String family, CassandraQuery<K, T> cassandraQuery,
> + CassandraResultSet cassandraResultSet) {
> +
> + List<SuperRow<String, String, String, String>> superRows = this.cassandraClient.executeSuper(cassandraQuery, family);
> + for (SuperRow<String, String, String, String> superRow: superRows) {
> + String key = superRow.getKey();
> + CassandraRow cassandraRow = cassandraResultSet.getRow(key);
> + if (cassandraRow == null) {
> + cassandraRow = new CassandraRow();
> + cassandraResultSet.putRow(key, cassandraRow);
> + cassandraRow.setKey(key);
> }
> - Field field = fields.get(i);
> - Type type = field.schema().getType();
> - Object o = obj.get(i);
> - CassandraColumn col = columnMap.get(field.name());
> -
> - switch(type) {
> - case MAP:
> - if(o instanceof StatefulMap) {
> - @SuppressWarnings("unchecked")
> - StatefulMap<Utf8, ?> map = (StatefulMap<Utf8, ?>) o;
> - for (Entry<Utf8, State> e : map.states().entrySet()) {
> - Utf8 mapKey = e.getKey();
> - switch (e.getValue()) {
> - case DIRTY:
> - qual = mapKey.toString();
> - value = ByteUtils.toBytes(map.get(mapKey), field.schema().getValueType(), datumWriter);
> - if (col.isSuperColumn()) {
> - mutate.put(col.family, col.superColumn, qual, value);
> - } else {
> - mutate.put(col.family, qual, value);
> - }
> - break;
> - case DELETED:
> - qual = mapKey.toString();
> - if (col.isSuperColumn()) {
> - mutate.delete(col.family, col.superColumn, qual);
> - } else {
> - mutate.delete(col.family, qual);
> - }
> - break;
> - }
> - }
> - } else {
> - @SuppressWarnings({ "rawtypes", "unchecked" })
> - Set<Map.Entry> set = ((Map)o).entrySet();
> - for(@SuppressWarnings("rawtypes") Entry entry: set) {
> - qual = entry.getKey().toString();
> - value = ByteUtils.toBytes(entry.getValue().toString());
> - if (col.isSuperColumn()) {
> - mutate.put(col.family, col.superColumn, qual, value);
> - } else {
> - mutate.put(col.family, qual, value);
> - }
> - }
> - }
> - break;
> - case ARRAY:
> - if(o instanceof GenericArray) {
> - @SuppressWarnings("rawtypes")
> - GenericArray arr = (GenericArray) o;
> - int j=0;
> - for(Object item : arr) {
> - value = ByteUtils.toBytes(item.toString());
> - if (col.isSuperColumn()) {
> - mutate.put(col.family, col.superColumn, Integer.toString(j), value);
> - } else {
> - mutate.put(col.family, Integer.toString(j), value);
> - }
> - j++;
> - }
> - }
> - break;
> - default:
> - value = ByteUtils.toBytes(o, field.schema(), datumWriter);
> - if (col.isSuperColumn()) {
> - mutate.put(col.family, col.superColumn, col.column, value);
> - } else {
> - mutate.put(col.family, col.column, value);
> - }
> - break;
> +
> + SuperSlice<String, String, String> superSlice = superRow.getSuperSlice();
> + for (HSuperColumn<String, String, String> hSuperColumn: superSlice.getSuperColumns()) {
> + CassandraSuperColumn cassandraSuperColumn = new CassandraSuperColumn();
> + cassandraSuperColumn.setValue(hSuperColumn);
> + cassandraSuperColumn.setFamily(family);
> + cassandraRow.add(cassandraSuperColumn);
> }
> }
> -
> - if(!mutate.isEmpty())
> - client.mutate(key.toString(), mutate);
> }
>
> - @Override
> - public boolean delete(K key) throws IOException {
> - Mutate mutate = new Mutate();
> - for (String family : mapping.getColumnFamilies()) {
> - mutate.deleteAll(family);
> + /**
> + * Flush the buffer. Write the buffered rows.
> + * @see org.apache.gora.store.DataStore#flush()
> + */
> + @Override
> + public void flush() throws IOException {
> + for (K key: this.buffer.keySet()) {
> + T value = this.buffer.get(key);
> + Schema schema = value.getSchema();
> + for (Field field: schema.getFields()) {
> + if (value.isDirty(field.pos())) {
> + addOrUpdateField((String) key, field, value.get(field.pos()));
> + }
> + }
> }
> -
> - client.mutate(key.toString(), mutate);
> - return true;
> +
> + this.buffer.clear();
> }
>
> @Override
> - public void flush() throws IOException { }
> -
> - @Override
> - public void close() throws IOException {
> - client.close();
> + public T get(K key, String[] fields) throws IOException {
> + LOG.info("get " + key);
> + return null;
> }
>
> @Override
> - public Query<K, T> newQuery() {
> - return new CassandraQuery<K, T>(this);
> + public List<PartitionQuery<K, T>> getPartitions(Query<K, T> query)
> + throws IOException {
> + // just a single partition
> + List<PartitionQuery<K,T>> partitions = new ArrayList<PartitionQuery<K,T>>();
> + partitions.add(new PartitionQueryImpl<K,T>(query));
> + return partitions;
> }
>
> @Override
> - public long deleteByQuery(Query<K, T> query) throws IOException {
> - // TODO Auto-generated method stub
> - return 0;
> + public String getSchemaName() {
> + LOG.info("get schema name");
> + return null;
> }
>
> @Override
> - public Result<K, T> execute(Query<K, T> query) throws IOException {
> - return new CassandraResult<K, T>(this, query, BATCH_COUNT);
> + public Query<K, T> newQuery() {
> + return new CassandraQuery<K, T>(this);
> }
>
> - @Override
> - public List<PartitionQuery<K, T>> getPartitions(Query<K, T> query)
> - throws IOException {
> - List<PartitionQuery<K,T>> partitions = new ArrayList<PartitionQuery<K,T>>();
> -
> - List<TokenRange> rangeList = client.describeRing();
> - for (TokenRange range : rangeList) {
> - List<String> tokens =
> - client.describeSplits(range.start_token, range.end_token, SPLIT_SIZE);
> - // turn the sub-ranges into InputSplits
> - String[] endpoints = range.endpoints.toArray(new String[range.endpoints.size()]);
> - // hadoop needs hostname, not ip
> - for (int i = 0; i < endpoints.length; i++) {
> - endpoints[i] = InetAddress.getByName(endpoints[i]).getHostName();
> - }
> -
> - for (int i = 1; i < tokens.size(); i++) {
> - CassandraPartitionQuery<K, T> partitionQuery =
> - new CassandraPartitionQuery<K, T>(query, tokens.get(i - 1), tokens.get(i), endpoints, SPLIT_SIZE);
> - partitions.add(partitionQuery);
> + /**
> + * Duplicate instance to keep all the objects in memory till flushing.
> + * @see org.apache.gora.store.DataStore#put(java.lang.Object, org.apache.gora.persistency.Persistent)
> + */
> + @Override
> + public void put(K key, T value) throws IOException {
> + T p = (T) value.newInstance(new StateManagerImpl());
> + Schema schema = value.getSchema();
> + for (Field field: schema.getFields()) {
> + if (value.isDirty(field.pos())) {
> + Object fieldValue = value.get(field.pos());
> +
> + // check if field has a nested structure (map or record)
> + Schema fieldSchema = field.schema();
> + Type type = fieldSchema.getType();
> + switch(type) {
> + case RECORD:
> + Persistent persistent = (Persistent) fieldValue;
> + Persistent newRecord = persistent.newInstance(new StateManagerImpl());
> + for (Field member: fieldSchema.getFields()) {
> + newRecord.put(member.pos(), persistent.get(member.pos()));
> + }
> + fieldValue = newRecord;
> + break;
> + case MAP:
> + StatefulHashMap<?, ?> map = (StatefulHashMap<?, ?>) fieldValue;
> + StatefulHashMap<?, ?> newMap = new StatefulHashMap(map);
> + fieldValue = newMap;
> + break;
> + }
> +
> + p.put(field.pos(), fieldValue);
> }
> }
> - return partitions;
> - }
> -
> - private CassandraClient createClient() throws IOException {
> - String serverStr =
> - DataStoreFactory.findPropertyOrDie(properties, this, "servers");
> - String[] server1Parts = serverStr.split(",")[0].split(":");
> - try {
> - return new SimpleCassandraClient(server1Parts[0],
> - Integer.parseInt(server1Parts[1]), mapping.getKeySpace());
> - } catch (Exception e) {
> - throw new IOException(e);
> - }
> - }
> -
> - @SuppressWarnings("unchecked")
> - protected void readMapping(String filename) throws IOException {
> -
> - mapping = new CassandraMapping();
> - columnMap = new HashMap<String, CassandraColumn>();
> -
> - try {
> - SAXBuilder builder = new SAXBuilder();
> - Document doc = builder.build(getClass().getClassLoader()
> - .getResourceAsStream(filename));
> -
> - List<Element> classes = doc.getRootElement().getChildren("class");
> -
> - for(Element classElement: classes) {
> - if(classElement.getAttributeValue("keyClass").equals(keyClass.getCanonicalName())
> - && classElement.getAttributeValue("name").equals(
> - persistentClass.getCanonicalName())) {
> -
> - String keySpace = classElement.getAttributeValue("keyspace");
> - mapping.setKeySpace(keySpace);
> - client = createClient();
> - Map<String, Map<String, String>> keySpaceDesc = client.describeKeySpace();
> - for (Entry<String, Map<String, String>> e : keySpaceDesc.entrySet()) {
> - boolean isSuper = e.getValue().get("Type").equals("Super");
> - mapping.addColumnFamily(e.getKey(), isSuper);
> - }
> -
> - List<Element> fields = classElement.getChildren("field");
> -
> - for(Element field:fields) {
> - String fieldName = field.getAttributeValue("name");
> - String path = field.getAttributeValue("path");
> - String[] parts = path.split(":");
> - String columnFamily = parts[0];
> - String superColumn = null;
> - String column = null;
> -
> - boolean isSuper = mapping.isColumnFamilySuper(columnFamily);
> - if (isSuper) {
> - superColumn = parts[1];
> - if (parts.length == 3) {
> - column = parts[2];
> +
> + this.buffer.put(key, p);
> + }
> +
> + /**
> + * Add a field to Cassandra according to its type.
> + * @param key the key of the row where the field should be added
> + * @param field the Avro field representing a datum
> + * @param value the field value
> + */
> + private void addOrUpdateField(String key, Field field, Object value) {
> + Schema schema = field.schema();
> + Type type = schema.getType();
> + //LOG.info(field.name() + " " + type.name());
> + switch (type) {
> + case STRING:
> + this.cassandraClient.addColumn(key, field.name(), value);
> + break;
> + case INT:
> + this.cassandraClient.addColumn(key, field.name(), value);
> + break;
> + case LONG:
> + this.cassandraClient.addColumn(key, field.name(), value);
> + break;
> + case BYTES:
> + this.cassandraClient.addColumn(key, field.name(), value);
> + break;
> + case FLOAT:
> + this.cassandraClient.addColumn(key, field.name(), value);
> + break;
> + case RECORD:
> + if (value != null) {
> + if (value instanceof PersistentBase) {
> + PersistentBase persistentBase = (PersistentBase) value;
> + for (Field member: schema.getFields()) {
> +
> + // TODO: hack, do not store empty arrays
> + Object memberValue = persistentBase.get(member.pos());
> + if (memberValue instanceof GenericArray<?>) {
> + GenericArray<String> array = (GenericArray<String>) memberValue;
> + if (array.size() == 0) {
> + continue;
> + }
> }
> - } else {
> - if (parts.length == 2) {
> - column = parts[1];
> +
> + this.cassandraClient.addSubColumn(key, field.name(), member.name(), memberValue);
> + }
> + } else {
> + LOG.info("Record not supported: " + value.toString());
> +
> + }
> + }
> + break;
> + case MAP:
> + if (value != null) {
> + if (value instanceof StatefulHashMap<?, ?>) {
> + //TODO cast to stateful map and only write dirty keys
> + Map<Utf8, Object> map = (Map<Utf8, Object>) value;
> + for (Utf8 mapKey: map.keySet()) {
> +
> + // TODO: hack, do not store empty arrays
> + Object keyValue = map.get(mapKey);
> + if (keyValue instanceof GenericArray<?>) {
> + GenericArray<String> array = (GenericArray<String>) keyValue;
> + if (array.size() == 0) {
> + continue;
> + }
> }
> +
> + this.cassandraClient.addSubColumn(key, field.name(), mapKey.toString(), keyValue);
> }
> -
> - columnMap.put(fieldName,
> - new CassandraColumn(columnFamily, superColumn, column));
> + } else {
> + LOG.info("Map not supported: " + value.toString());
> }
> -
> - break;
> }
> - }
> - } catch(Exception ex) {
> - throw new IOException(ex);
> + break;
> + default:
> + LOG.info("Type not considered: " + type.name());
> }
> }
> -}
> \ No newline at end of file
> +
> + @Override
> + public boolean schemaExists() throws IOException {
> + LOG.info("schema exists");
> + return false;
> + }
> +
> +}
>
> Modified: incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/GoraRecordReader.java
> URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/GoraRecordReader.java?rev=1149420&r1=1149419&r2=1149420&view=diff
> ==============================================================================
> --- incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/GoraRecordReader.java (original)
> +++ incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/GoraRecordReader.java Fri Jul 22 00:33:59 2011
> @@ -36,8 +36,8 @@ import org.slf4j.LoggerFactory;
> public class GoraRecordReader<K, T extends Persistent> extends RecordReader<K,T> {
> public static final Logger LOG = LoggerFactory.getLogger(GoraRecordReader.class);
>
> - private static final String BUFFER_LIMIT_READ_NAME = "gora.buffer.read.limit";
> - private static final int BUFFER_LIMIT_READ_VALUE = 10000;
> + public static final String BUFFER_LIMIT_READ_NAME = "gora.buffer.read.limit";
> + public static final int BUFFER_LIMIT_READ_VALUE = 10000;
>
> protected Query<K,T> query;
> protected Result<K,T> result;
>
>
>
Re: svn commit: r1149420 - in /incubator/gora/trunk:
gora-cassandra/ivy/ gora-cassandra/lib-ext/ gora-cassandra/src/main/java/org/apache/gora/cassandra/client/
gora-cassandra/src/main/java/org/apache/gora/cassandra/query/ gora-cassandra/src/main/java
Posted by Alexis <al...@gmail.com>.
My bad. The headers should now be included in the new revision 1149425.
On Thu, Jul 21, 2011 at 5:39 PM, Henry Saputra <he...@gmail.com> wrote:
> Looks like some files missing Apache license header.
>
> - Henry
>
> On Thu, Jul 21, 2011 at 5:34 PM, <al...@apache.org> wrote:
>> Author: alexis
>> Date: Fri Jul 22 00:33:59 2011
>> New Revision: 1149420
>>
>> URL: http://svn.apache.org/viewvc?rev=1149420&view=rev
>> Log:
>> Cassandra 0.8 backend witch Hector client
>>
>> Added:
>> incubator/gora/trunk/gora-cassandra/lib-ext/cassandra-all-0.8.0.jar (with props)
>> incubator/gora/trunk/gora-cassandra/lib-ext/hector-core-0.8.0-1.jar (with props)
>> incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraColumn.java
>> incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResultSet.java
>> incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraRow.java
>> incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSubColumn.java
>> incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSuperColumn.java
>> incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java
>> Removed:
>> incubator/gora/trunk/gora-cassandra/lib-ext/apache-cassandra-0.6.4.jar
>> incubator/gora/trunk/gora-cassandra/lib-ext/libthrift-r917130.jar
>> incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/client/
>> incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraColumn.java
>> Modified:
>> incubator/gora/trunk/gora-cassandra/ivy/ivy.xml
>> incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraPartitionQuery.java
>> incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraQuery.java
>> incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResult.java
>> incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java
>> incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
>> incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/GoraRecordReader.java
>>
>> Modified: incubator/gora/trunk/gora-cassandra/ivy/ivy.xml
>> URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/ivy/ivy.xml?rev=1149420&r1=1149419&r2=1149420&view=diff
>> ==============================================================================
>> --- incubator/gora/trunk/gora-cassandra/ivy/ivy.xml (original)
>> +++ incubator/gora/trunk/gora-cassandra/ivy/ivy.xml Fri Jul 22 00:33:59 2011
>> @@ -22,7 +22,7 @@
>> organisation="org.apache.gora"
>> module="gora-cassandra"
>> status="integration"/>
>> -
>> +
>> <configurations>
>> <include file="${project.dir}/ivy/ivy-configurations.xml"/>
>> </configurations>
>> @@ -32,15 +32,25 @@
>> <artifact name="gora-cassandra-test" conf="test"/>
>> </publications>
>>
>> +
>> <dependencies>
>> <!-- conf="*->@" means every conf is mapped to the conf of the same name of the artifact-->
>> - <dependency org="org.apache.gora" name="gora-core" rev="latest.integration" changing="true" conf="*->@"/>
>> - <dependency org="org.jdom" name="jdom" rev="1.1" conf="*->master"/>
>> -
>> - <dependency org="org.slf4j" name="slf4j-jdk14" rev="1.5.8" conf="*->master"/>
>> - <dependency org="org.slf4j" name="slf4j-api" rev="1.5.8" conf="*->master"/>
>> -
>> - <dependency org="com.google.guava" name="guava" rev="r06"/>
>> +
>> + <dependency org="org.apache.gora" name="gora-core" rev="latest.integration" changing="true" conf="*->@"/>
>> +
>> + <dependency org="org.jdom" name="jdom" rev="1.1">
>> + <exclude org="xerces" name="xercesImpl"/>
>> + </dependency>
>> +
>> + <!--
>> + <dependency org="org.apache.cassandra" name="apache-cassandra" rev="0.8.1"/>
>> + <dependency org="me.prettyprint" name="hector" rev="0.8.0-1"/>
>> + -->
>> + <dependency org="org.apache.cassandra" name="cassandra-thrift" rev="0.8.1"/>
>> + <dependency org="com.ecyrd.speed4j" name="speed4j" rev="0.9" conf="*->*,!javadoc,!sources"/>
>> + <dependency org="com.github.stephenc.high-scale-lib" name="high-scale-lib" rev="1.1.2" conf="*->*,!javadoc,!sources"/>
>> + <dependency org="com.google.collections" name="google-collections" rev="1.0" conf="*->*,!javadoc,!sources"/>
>> + <dependency org="com.google.guava" name="guava" rev="r09" conf="*->*,!javadoc,!sources"/>
>>
>> <!-- test dependencies -->
>>
>>
>> Added: incubator/gora/trunk/gora-cassandra/lib-ext/cassandra-all-0.8.0.jar
>> URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/lib-ext/cassandra-all-0.8.0.jar?rev=1149420&view=auto
>> ==============================================================================
>> Binary file - no diff available.
>>
>> Propchange: incubator/gora/trunk/gora-cassandra/lib-ext/cassandra-all-0.8.0.jar
>> ------------------------------------------------------------------------------
>> svn:mime-type = application/octet-stream
>>
>> Added: incubator/gora/trunk/gora-cassandra/lib-ext/hector-core-0.8.0-1.jar
>> URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/lib-ext/hector-core-0.8.0-1.jar?rev=1149420&view=auto
>> ==============================================================================
>> Binary file - no diff available.
>>
>> Propchange: incubator/gora/trunk/gora-cassandra/lib-ext/hector-core-0.8.0-1.jar
>> ------------------------------------------------------------------------------
>> svn:mime-type = application/octet-stream
>>
>> Added: incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraColumn.java
>> URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraColumn.java?rev=1149420&view=auto
>> ==============================================================================
>> --- incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraColumn.java (added)
>> +++ incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraColumn.java Fri Jul 22 00:33:59 2011
>> @@ -0,0 +1,41 @@
>> +package org.apache.gora.cassandra.query;
>> +
>> +import org.apache.avro.Schema.Field;
>> +
>> +
>> +/**
>> + * Represents a unit of data: a key value pair tagged by a family name
>> + */
>> +public abstract class CassandraColumn {
>> + public static final int SUB = 0;
>> + public static final int SUPER = 1;
>> +
>> + private String family;
>> + private int type;
>> + private Field field;
>> +
>> + public String getFamily() {
>> + return family;
>> + }
>> + public void setFamily(String family) {
>> + this.family = family;
>> + }
>> + public int getType() {
>> + return type;
>> + }
>> + public void setType(int type) {
>> + this.type = type;
>> + }
>> + public void setField(Field field) {
>> + this.field = field;
>> + }
>> +
>> + protected Field getField() {
>> + return this.field;
>> + }
>> +
>> + public abstract String getName();
>> + public abstract Object getValue();
>> +
>> +
>> +}
>>
>> Modified: incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraPartitionQuery.java
>> URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraPartitionQuery.java?rev=1149420&r1=1149419&r2=1149420&view=diff
>> ==============================================================================
>> --- incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraPartitionQuery.java (original)
>> +++ incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraPartitionQuery.java Fri Jul 22 00:33:59 2011
>> @@ -1,93 +0,0 @@
>> -/**
>> - * Licensed to the Apache Software Foundation (ASF) under one
>> - * or more contributor license agreements. See the NOTICE file
>> - * distributed with this work for additional information
>> - * regarding copyright ownership. The ASF licenses this file
>> - * to you under the Apache License, Version 2.0 (the
>> - * "License"); you may not use this file except in compliance
>> - * with the License. You may obtain a copy of the License at
>> - *
>> - * http://www.apache.org/licenses/LICENSE-2.0
>> - *
>> - * Unless required by applicable law or agreed to in writing, software
>> - * distributed under the License is distributed on an "AS IS" BASIS,
>> - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>> - * See the License for the specific language governing permissions and
>> - * limitations under the License.
>> - */
>> -package org.apache.gora.cassandra.query;
>> -
>> -import java.io.DataInput;
>> -import java.io.DataOutput;
>> -import java.io.IOException;
>> -
>> -import org.apache.gora.persistency.Persistent;
>> -import org.apache.gora.query.Query;
>> -import org.apache.gora.query.impl.PartitionQueryImpl;
>> -import org.apache.hadoop.io.Text;
>> -
>> -public class CassandraPartitionQuery<K, T extends Persistent>
>> -extends PartitionQueryImpl<K, T> {
>> -
>> - private String startToken;
>> -
>> - private String endToken;
>> -
>> - private String[] endPoints;
>> -
>> - private int splitSize;
>> -
>> - public CassandraPartitionQuery() {
>> - this.dataStore = null;
>> - }
>> -
>> - public CassandraPartitionQuery(Query<K, T> baseQuery, String startToken, String endToken, String[] endPoints,
>> - int splitSize) {
>> - super(baseQuery);
>> - this.startToken = startToken;
>> - this.endToken = endToken;
>> - this.endPoints = endPoints;
>> - this.splitSize = splitSize;
>> - }
>> -
>> - @Override
>> - public void write(DataOutput out) throws IOException {
>> - super.write(out);
>> - Text.writeString(out, startToken);
>> - Text.writeString(out, endToken);
>> - out.writeInt(endPoints.length);
>> - for (String endPoint : endPoints) {
>> - Text.writeString(out, endPoint);
>> - }
>> - out.writeInt(splitSize);
>> - }
>> -
>> - @Override
>> - public void readFields(DataInput in) throws IOException {
>> - super.readFields(in);
>> - startToken = Text.readString(in);
>> - endToken = Text.readString(in);
>> - int size = in.readInt();
>> - endPoints = new String[size];
>> - for (int i = 0; i < size; i++) {
>> - endPoints[i] = Text.readString(in);
>> - }
>> - splitSize = in.readInt();
>> - }
>> -
>> - public String getStartToken() {
>> - return startToken;
>> - }
>> -
>> - public String getEndToken() {
>> - return endToken;
>> - }
>> -
>> - public String[] getEndPoints() {
>> - return endPoints;
>> - }
>> -
>> - public int getSplitSize() {
>> - return splitSize;
>> - }
>> -}
>>
>> Modified: incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraQuery.java
>> URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraQuery.java?rev=1149420&r1=1149419&r2=1149420&view=diff
>> ==============================================================================
>> --- incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraQuery.java (original)
>> +++ incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraQuery.java Fri Jul 22 00:33:59 2011
>> @@ -1,34 +1,55 @@
>> -/**
>> - * Licensed to the Apache Software Foundation (ASF) under one
>> - * or more contributor license agreements. See the NOTICE file
>> - * distributed with this work for additional information
>> - * regarding copyright ownership. The ASF licenses this file
>> - * to you under the Apache License, Version 2.0 (the
>> - * "License"); you may not use this file except in compliance
>> - * with the License. You may obtain a copy of the License at
>> - *
>> - * http://www.apache.org/licenses/LICENSE-2.0
>> - *
>> - * Unless required by applicable law or agreed to in writing, software
>> - * distributed under the License is distributed on an "AS IS" BASIS,
>> - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>> - * See the License for the specific language governing permissions and
>> - * limitations under the License.
>> - */
>> package org.apache.gora.cassandra.query;
>>
>> +import java.util.List;
>> +import java.util.Map;
>> +
>> import org.apache.gora.persistency.Persistent;
>> +import org.apache.gora.query.Query;
>> import org.apache.gora.query.impl.QueryBase;
>> import org.apache.gora.store.DataStore;
>>
>> -public class CassandraQuery<K, T extends Persistent>
>> -extends QueryBase<K, T> {
>> +public class CassandraQuery<K, T extends Persistent> extends QueryBase<K, T> {
>>
>> + private Query<K, T> query;
>> +
>> + /**
>> + * Maps Avro fields to Cassandra columns.
>> + */
>> + private Map<String, List<String>> familyMap;
>> +
>> public CassandraQuery() {
>> super(null);
>> }
>> -
>> public CassandraQuery(DataStore<K, T> dataStore) {
>> super(dataStore);
>> }
>> + public void setFamilyMap(Map<String, List<String>> familyMap) {
>> + this.familyMap = familyMap;
>> + }
>> + public Map<String, List<String>> getFamilyMap() {
>> + return familyMap;
>> + }
>> +
>> + /**
>> + * @param family the family name
>> + * @return an array of the query column names belonging to the family
>> + */
>> + public String[] getColumns(String family) {
>> +
>> + List<String> columnList = familyMap.get(family);
>> + String[] columns = new String[columnList.size()];
>> + for (int i = 0; i < columns.length; ++i) {
>> + columns[i] = columnList.get(i);
>> + }
>> + return columns;
>> + }
>> + public Query<K, T> getQuery() {
>> + return query;
>> + }
>> + public void setQuery(Query<K, T> query) {
>> + this.query = query;
>> + }
>> +
>> +
>> +
>> }
>>
>> Modified: incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResult.java
>> URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResult.java?rev=1149420&r1=1149419&r2=1149420&view=diff
>> ==============================================================================
>> --- incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResult.java (original)
>> +++ incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResult.java Fri Jul 22 00:33:59 2011
>> @@ -1,157 +1,97 @@
>> -/**
>> - * Licensed to the Apache Software Foundation (ASF) under one
>> - * or more contributor license agreements. See the NOTICE file
>> - * distributed with this work for additional information
>> - * regarding copyright ownership. The ASF licenses this file
>> - * to you under the Apache License, Version 2.0 (the
>> - * "License"); you may not use this file except in compliance
>> - * with the License. You may obtain a copy of the License at
>> - *
>> - * http://www.apache.org/licenses/LICENSE-2.0
>> - *
>> - * Unless required by applicable law or agreed to in writing, software
>> - * distributed under the License is distributed on an "AS IS" BASIS,
>> - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>> - * See the License for the specific language governing permissions and
>> - * limitations under the License.
>> - */
>> package org.apache.gora.cassandra.query;
>>
>> import java.io.IOException;
>> -import java.net.InetAddress;
>> -import java.net.UnknownHostException;
>> -import java.util.Iterator;
>> -
>> -import org.apache.gora.cassandra.client.CassandraClient;
>> -import org.apache.gora.cassandra.client.Row;
>> -import org.apache.gora.cassandra.client.Select;
>> -import org.apache.gora.cassandra.store.CassandraStore;
>> +import java.util.List;
>> +import java.util.Map;
>> +
>> +import org.apache.avro.Schema;
>> +import org.apache.avro.Schema.Field;
>> import org.apache.gora.persistency.Persistent;
>> import org.apache.gora.query.Query;
>> import org.apache.gora.query.impl.ResultBase;
>> import org.apache.gora.store.DataStore;
>> +import org.slf4j.Logger;
>> +import org.slf4j.LoggerFactory;
>>
>> -public class CassandraResult<K, T extends Persistent>
>> -extends ResultBase<K, T> {
>> -
>> - private Iterator<Row> rowIter;
>> -
>> - private CassandraStore<K, T> store;
>> +public class CassandraResult<K, T extends Persistent> extends ResultBase<K, T> {
>> + public static final Logger LOG = LoggerFactory.getLogger(CassandraResult.class);
>> +
>> + private int rowNumber;
>> +
>> + private CassandraResultSet cassandraResultSet;
>> +
>> + /**
>> + * Maps Cassandra columns to Avro fields.
>> + */
>> + private Map<String, String> reverseMap;
>>
>> - private String[] fields;
>> -
>> - public CassandraResult(DataStore<K, T> dataStore, Query<K, T> query,
>> - int batchRowCount) throws IOException {
>> + public CassandraResult(DataStore<K, T> dataStore, Query<K, T> query) {
>> super(dataStore, query);
>> -
>> - store = (CassandraStore<K, T>) dataStore;
>> - fields = query.getFields();
>> -
>> - boolean isUsingTokens = (query instanceof CassandraPartitionQuery);
>> - String startTokenOrKey;
>> - String endTokenOrKey;
>> -
>> - if (isUsingTokens) {
>> - CassandraPartitionQuery<K, T> partitionQuery = (CassandraPartitionQuery<K, T>) query;
>> - startTokenOrKey = partitionQuery.getStartToken();
>> - endTokenOrKey = partitionQuery.getEndToken();
>> - } else {
>> - CassandraQuery<K, T> cassandraQuery = (CassandraQuery<K, T>) query;
>> - startTokenOrKey = cassandraQuery.getStartKey().toString();
>> - endTokenOrKey = cassandraQuery.getEndKey().toString();
>> - }
>> -
>> - Select select = store.createSelect(fields);
>> -
>> - CassandraClient client = store.getClientByLocation(getLocation(query));
>> - if (isUsingTokens) {
>> - rowIter =
>> - client.getTokenRange(startTokenOrKey, endTokenOrKey,
>> - batchRowCount, select).iterator();
>> - } else {
>> - rowIter = client.getRange(startTokenOrKey, endTokenOrKey,
>> - batchRowCount, select).iterator();
>> - }
>> - }
>> -
>> - @Override
>> - public float getProgress() throws IOException {
>> - return 0;
>> }
>>
>> @Override
>> protected boolean nextInner() throws IOException {
>> - if (!rowIter.hasNext()) {
>> - return false;
>> - }
>> - Row row = rowIter.next();
>> - if (row == null) {
>> - return false;
>> + if (this.rowNumber < this.cassandraResultSet.size()) {
>> + updatePersistent();
>> }
>> -
>> - key = toKey(row.getKey());
>> - persistent = store.newInstance(row, fields);
>> -
>> - return true;
>> + ++this.rowNumber;
>> + return (this.rowNumber <= this.cassandraResultSet.size());
>> }
>>
>> +
>> + /**
>> + * Load key/value pair from Cassandra row to Avro record.
>> + * @throws IOException
>> + */
>> @SuppressWarnings("unchecked")
>> - private K toKey(String keyStr) {
>> - Class<K> keyClass = dataStore.getKeyClass();
>> - if (keyClass.isAssignableFrom(String.class)) {
>> - return (K) keyStr;
>> - }
>> - if (keyClass.isAssignableFrom(Integer.class)) {
>> - return (K) (Integer) Integer.parseInt(keyStr);
>> - }
>> - if (keyClass.isAssignableFrom(Float.class)) {
>> - return (K) (Float) Float.parseFloat(keyStr);
>> - }
>> - if (keyClass.isAssignableFrom(Double.class)) {
>> - return (K) (Double) Double.parseDouble(keyStr);
>> - }
>> - if (keyClass.isAssignableFrom(Long.class)) {
>> - return (K) (Long) Long.parseLong(keyStr);
>> - }
>> - if (keyClass.isAssignableFrom(Short.class)) {
>> - return (K) (Short) Short.parseShort(keyStr);
>> - }
>> - if (keyClass.isAssignableFrom(Byte.class)) {
>> - return (K) (Byte) Byte.parseByte(keyStr);
>> + private void updatePersistent() throws IOException {
>> + CassandraRow cassandraRow = this.cassandraResultSet.get(this.rowNumber);
>> +
>> + // load key
>> + this.key = (K) cassandraRow.getKey();
>> +
>> + // load value
>> + Schema schema = this.persistent.getSchema();
>> + List<Field> fields = schema.getFields();
>> +
>> + for (CassandraColumn cassandraColumn: cassandraRow) {
>> +
>> + // get field name
>> + String family = cassandraColumn.getFamily();
>> + String fieldName = this.reverseMap.get(family + ":" + cassandraColumn.getName());
>> +
>> + // get field
>> + int pos = this.persistent.getFieldIndex(fieldName);
>> + Field field = fields.get(pos);
>> +
>> + // get value
>> + cassandraColumn.setField(field);
>> + Object value = cassandraColumn.getValue();
>> +
>> + this.persistent.put(pos, value);
>> + // this field does not need to be written back to the store
>> + this.persistent.clearDirty(pos);
>> }
>>
>> - throw new RuntimeException("Can't parse " + keyStr +
>> - " as an instance of " + keyClass);
>> }
>>
>> @Override
>> - public void close() throws IOException { }
>> + public void close() throws IOException {
>> + // TODO Auto-generated method stub
>> +
>> + }
>>
>> - private String getLocation(Query<K, T> query) {
>> - if (!(query instanceof CassandraPartitionQuery)) {
>> - return null;
>> - }
>> - CassandraPartitionQuery<K, T> partitonQuery =
>> - (CassandraPartitionQuery<K, T>) query;
>> - InetAddress[] localAddresses = new InetAddress[0];
>> - try {
>> - localAddresses = InetAddress.getAllByName(InetAddress.getLocalHost().getHostAddress());
>> - } catch (UnknownHostException e) {
>> - throw new AssertionError(e);
>> - }
>> - for (InetAddress address : localAddresses) {
>> - for (String location : partitonQuery.getEndPoints()) {
>> - InetAddress locationAddress = null;
>> - try {
>> - locationAddress = InetAddress.getByName(location);
>> - } catch (UnknownHostException e) {
>> - throw new AssertionError(e);
>> - }
>> - if (address.equals(locationAddress)) {
>> - return location;
>> - }
>> - }
>> - }
>> - return partitonQuery.getEndPoints()[0];
>> + @Override
>> + public float getProgress() throws IOException {
>> + return (((float) this.rowNumber) / this.cassandraResultSet.size());
>> }
>> -}
>> \ No newline at end of file
>> +
>> + public void setResultSet(CassandraResultSet cassandraResultSet) {
>> + this.cassandraResultSet = cassandraResultSet;
>> + }
>> +
>> + public void setReverseMap(Map<String, String> reverseMap) {
>> + this.reverseMap = reverseMap;
>> + }
>> +
>> +}
>>
>> Added: incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResultSet.java
>> URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResultSet.java?rev=1149420&view=auto
>> ==============================================================================
>> --- incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResultSet.java (added)
>> +++ incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResultSet.java Fri Jul 22 00:33:59 2011
>> @@ -0,0 +1,36 @@
>> +package org.apache.gora.cassandra.query;
>> +
>> +import java.util.ArrayList;
>> +import java.util.HashMap;
>> +
>> +/**
>> + * List data structure to keep the order coming from the Cassandra selects.
>> + */
>> +public class CassandraResultSet extends ArrayList<CassandraRow> {
>> +
>> + /**
>> + *
>> + */
>> + private static final long serialVersionUID = -7620939600192859652L;
>> +
>> + /**
>> + * Maps keys to indices in the list.
>> + */
>> + private HashMap<String, Integer> indexMap = new HashMap<String, Integer>();
>> +
>> + public CassandraRow getRow(String key) {
>> + Integer integer = this.indexMap.get(key);
>> + if (integer == null) {
>> + return null;
>> + }
>> +
>> + return this.get(integer);
>> + }
>> +
>> + public void putRow(String key, CassandraRow cassandraRow) {
>> + this.add(cassandraRow);
>> + this.indexMap.put(key, this.size()-1);
>> + }
>> +
>> +
>> +}
>> \ No newline at end of file
>>
>> Added: incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraRow.java
>> URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraRow.java?rev=1149420&view=auto
>> ==============================================================================
>> --- incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraRow.java (added)
>> +++ incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraRow.java Fri Jul 22 00:33:59 2011
>> @@ -0,0 +1,24 @@
>> +package org.apache.gora.cassandra.query;
>> +
>> +import java.util.ArrayList;
>> +
>> +/**
>> + * List of key value pairs representing a row, tagged by a key.
>> + */
>> +public class CassandraRow extends ArrayList<CassandraColumn> {
>> +
>> + /**
>> + *
>> + */
>> + private static final long serialVersionUID = -7620939600192859652L;
>> + private String key;
>> +
>> + public String getKey() {
>> + return this.key;
>> + }
>> +
>> + public void setKey(String key) {
>> + this.key = key;
>> + }
>> +
>> +}
>>
>> Added: incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSubColumn.java
>> URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSubColumn.java?rev=1149420&view=auto
>> ==============================================================================
>> --- incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSubColumn.java (added)
>> +++ incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSubColumn.java Fri Jul 22 00:33:59 2011
>> @@ -0,0 +1,104 @@
>> +package org.apache.gora.cassandra.query;
>> +
>> +import java.nio.ByteBuffer;
>> +import java.nio.CharBuffer;
>> +import java.nio.charset.CharacterCodingException;
>> +import java.nio.charset.Charset;
>> +import java.nio.charset.CharsetEncoder;
>> +
>> +import me.prettyprint.hector.api.beans.HColumn;
>> +
>> +import org.apache.avro.Schema;
>> +import org.apache.avro.Schema.Field;
>> +import org.apache.avro.Schema.Type;
>> +import org.apache.avro.generic.GenericArray;
>> +import org.apache.avro.generic.GenericData;
>> +import org.apache.avro.util.Utf8;
>> +import org.slf4j.Logger;
>> +import org.slf4j.LoggerFactory;
>> +
>> +public class CassandraSubColumn extends CassandraColumn {
>> + public static final Logger LOG = LoggerFactory.getLogger(CassandraSubColumn.class);
>> +
>> + private static final String ENCODING = "UTF-8";
>> +
>> + private static CharsetEncoder charsetEncoder = Charset.forName(ENCODING).newEncoder();;
>> +
>> +
>> + /**
>> + * Key-value pair containing the raw data.
>> + */
>> + private HColumn<String, String> hColumn;
>> +
>> + public String getName() {
>> + return hColumn.getName();
>> + }
>> +
>> + /**
>> + * Deserialize a String into an typed Object, according to the field schema.
>> + * @see org.apache.gora.cassandra.query.CassandraColumn#getValue()
>> + */
>> + public Object getValue() {
>> + Field field = getField();
>> + Schema fieldSchema = field.schema();
>> + Type type = fieldSchema.getType();
>> + String valueString = hColumn.getValue();
>> + Object value = null;
>> +
>> + switch (type) {
>> + case STRING:
>> + value = new Utf8(valueString);
>> + break;
>> + case BYTES:
>> + // convert string to bytebuffer
>> + value = getByteBuffer(valueString);
>> + break;
>> + case INT:
>> + value = Integer.parseInt(valueString);
>> + break;
>> + case LONG:
>> + value = Long.parseLong(valueString);
>> + break;
>> + case FLOAT:
>> + value = Float.parseFloat(valueString);
>> + break;
>> + case ARRAY:
>> + // convert string to array
>> + valueString = valueString.substring(1, valueString.length()-1);
>> + String[] elements = valueString.split(", ");
>> +
>> + Type elementType = fieldSchema.getElementType().getType();
>> + if (elementType == Schema.Type.STRING) {
>> + // the array type is String
>> + GenericArray<String> genericArray = new GenericData.Array<String>(elements.length, Schema.createArray(Schema.create(Schema.Type.STRING)));
>> + for (String element: elements) {
>> + genericArray.add(element);
>> + }
>> +
>> + value = genericArray;
>> + } else {
>> + LOG.info("Element type not supported: " + elementType);
>> + }
>> + break;
>> + default:
>> + LOG.info("Type not supported: " + type);
>> + }
>> +
>> + return value;
>> +
>> + }
>> +
>> + public void setValue(HColumn<String, String> hColumn) {
>> + this.hColumn = hColumn;
>> + }
>> +
>> + public static ByteBuffer getByteBuffer(String valueString) {
>> + ByteBuffer byteBuffer = null;
>> + try {
>> + byteBuffer = charsetEncoder.encode(CharBuffer.wrap(valueString));
>> + } catch (CharacterCodingException cce) {
>> + LOG.warn("Unable to encode " + valueString + " into " + ENCODING);
>> + }
>> + return byteBuffer;
>> + }
>> +}
>>
>> Added: incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSuperColumn.java
>> URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSuperColumn.java?rev=1149420&view=auto
>> ==============================================================================
>> --- incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSuperColumn.java (added)
>> +++ incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSuperColumn.java Fri Jul 22 00:33:59 2011
>> @@ -0,0 +1,102 @@
>> +package org.apache.gora.cassandra.query;
>> +
>> +import java.util.Map;
>> +
>> +import me.prettyprint.hector.api.beans.HColumn;
>> +import me.prettyprint.hector.api.beans.HSuperColumn;
>> +
>> +import org.apache.avro.Schema;
>> +import org.apache.avro.Schema.Field;
>> +import org.apache.avro.Schema.Type;
>> +import org.apache.avro.util.Utf8;
>> +import org.apache.gora.persistency.StatefulHashMap;
>> +import org.apache.gora.persistency.impl.PersistentBase;
>> +import org.slf4j.Logger;
>> +import org.slf4j.LoggerFactory;
>> +
>> +public class CassandraSuperColumn extends CassandraColumn {
>> + public static final Logger LOG = LoggerFactory.getLogger(CassandraSuperColumn.class);
>> +
>> + private HSuperColumn<String, String, String> hSuperColumn;
>> +
>> + public String getName() {
>> + return hSuperColumn.getName();
>> + }
>> +
>> + public Object getValue() {
>> + Field field = getField();
>> + Schema fieldSchema = field.schema();
>> + Type type = fieldSchema.getType();
>> +
>> + Object value = null;
>> +
>> + switch (type) {
>> + case MAP:
>> + Map<Utf8, Object> map = new StatefulHashMap<Utf8, Object>();
>> + Type valueType = fieldSchema.getValueType().getType();
>> +
>> + for (HColumn<String, String> hColumn : this.hSuperColumn.getColumns()) {
>> + String memberString = hColumn.getValue();
>> + Object memberValue = null;
>> + switch (valueType) {
>> + case STRING:
>> + memberValue = new Utf8(memberString);
>> + break;
>> + case BYTES:
>> + memberValue = CassandraSubColumn.getByteBuffer(memberString);
>> + break;
>> + default:
>> + LOG.info("Type for the map value is not supported: " + valueType);
>> +
>> + }
>> + map.put(new Utf8(hColumn.getName()), memberValue);
>> + }
>> + value = map;
>> +
>> + break;
>> + case RECORD:
>> + String fullName = fieldSchema.getFullName();
>> +
>> + Class<?> claz = null;
>> + try {
>> + claz = Class.forName(fullName);
>> + } catch (ClassNotFoundException cnfe) {
>> + LOG.warn("Unable to load class " + fullName, cnfe);
>> + break;
>> + }
>> +
>> + try {
>> + value = claz.newInstance();
>> + } catch (InstantiationException ie) {
>> + LOG.warn("Instantiation error", ie);
>> + break;
>> + } catch (IllegalAccessException iae) {
>> + LOG.warn("Illegal access error", iae);
>> + break;
>> + }
>> +
>> + // we updated the value instance, now update its members
>> + if (value instanceof PersistentBase) {
>> + PersistentBase record = (PersistentBase) value;
>> +
>> + for (HColumn<String, String> hColumn : this.hSuperColumn.getColumns()) {
>> + Field memberField = fieldSchema.getField(hColumn.getName());
>> + CassandraSubColumn cassandraColumn = new CassandraSubColumn();
>> + cassandraColumn.setField(memberField);
>> + cassandraColumn.setValue(hColumn);
>> + record.put(record.getFieldIndex(hColumn.getName()), cassandraColumn.getValue());
>> + }
>> + }
>> + break;
>> + default:
>> + LOG.info("Type not supported: " + type);
>> + }
>> +
>> + return value;
>> + }
>> +
>> + public void setValue(HSuperColumn<String, String, String> hSuperColumn) {
>> + this.hSuperColumn = hSuperColumn;
>> + }
>> +
>> +}
>>
>> Added: incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java
>> URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java?rev=1149420&view=auto
>> ==============================================================================
>> --- incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java (added)
>> +++ incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java Fri Jul 22 00:33:59 2011
>> @@ -0,0 +1,253 @@
>> +package org.apache.gora.cassandra.store;
>> +
>> +import java.nio.ByteBuffer;
>> +import java.util.ArrayList;
>> +import java.util.Arrays;
>> +import java.util.HashMap;
>> +import java.util.List;
>> +import java.util.Map;
>> +
>> +import me.prettyprint.cassandra.serializers.StringSerializer;
>> +import me.prettyprint.cassandra.service.CassandraHostConfigurator;
>> +import me.prettyprint.hector.api.Cluster;
>> +import me.prettyprint.hector.api.Keyspace;
>> +import me.prettyprint.hector.api.Serializer;
>> +import me.prettyprint.hector.api.beans.OrderedRows;
>> +import me.prettyprint.hector.api.beans.OrderedSuperRows;
>> +import me.prettyprint.hector.api.beans.Row;
>> +import me.prettyprint.hector.api.beans.SuperRow;
>> +import me.prettyprint.hector.api.ddl.ColumnFamilyDefinition;
>> +import me.prettyprint.hector.api.ddl.KeyspaceDefinition;
>> +import me.prettyprint.hector.api.factory.HFactory;
>> +import me.prettyprint.hector.api.mutation.Mutator;
>> +import me.prettyprint.hector.api.query.QueryResult;
>> +import me.prettyprint.hector.api.query.RangeSlicesQuery;
>> +import me.prettyprint.hector.api.query.RangeSuperSlicesQuery;
>> +
>> +import org.apache.gora.cassandra.query.CassandraQuery;
>> +import org.apache.gora.mapreduce.GoraRecordReader;
>> +import org.apache.gora.persistency.Persistent;
>> +import org.apache.gora.query.Query;
>> +import org.apache.gora.util.ByteUtils;
>> +import org.slf4j.Logger;
>> +import org.slf4j.LoggerFactory;
>> +
>> +public class CassandraClient<K, T extends Persistent> {
>> + public static final Logger LOG = LoggerFactory.getLogger(CassandraClient.class);
>> +
>> + private Cluster cluster;
>> + private Keyspace keyspace;
>> + private Mutator<String> mutator;
>> +
>> + private CassandraMapping cassandraMapping = new CassandraMapping();
>> +
>> + private Serializer<String> stringSerializer = new StringSerializer();
>> +
>> + public void init() throws Exception {
>> + this.cassandraMapping.loadConfiguration();
>> + this.cluster = HFactory.getOrCreateCluster(this.cassandraMapping.getClusterName(), new CassandraHostConfigurator(this.cassandraMapping.getHostName()));
>> +
>> + // add keyspace to cluster
>> + checkKeyspace();
>> +
>> + // Just create a Keyspace object on the client side, corresponding to an already existing keyspace with already created column families.
>> + this.keyspace = HFactory.createKeyspace(this.cassandraMapping.getKeyspaceName(), this.cluster);
>> +
>> + this.mutator = HFactory.createMutator(this.keyspace, this.stringSerializer);
>> + }
>> +
>> + /**
>> + * Check if keyspace already exists. If not, create it.
>> + */
>> + public void checkKeyspace() {
>> + // "describe keyspace <keyspaceName>;" query
>> + KeyspaceDefinition keyspaceDefinition = this.cluster.describeKeyspace(this.cassandraMapping.getKeyspaceName());
>> + if (keyspaceDefinition == null) {
>> + List<ColumnFamilyDefinition> columnFamilyDefinitions = this.cassandraMapping.getColumnFamilyDefinitions();
>> + keyspaceDefinition = HFactory.createKeyspaceDefinition(this.cassandraMapping.getKeyspaceName(), "org.apache.cassandra.locator.SimpleStrategy", 1, columnFamilyDefinitions);
>> + this.cluster.addKeyspace(keyspaceDefinition);
>> + LOG.info("Keyspace '" + this.cassandraMapping.getKeyspaceName() + "' in cluster '" + this.cassandraMapping.getClusterName() + "' was created on host '" + this.cassandraMapping.getHostName() + "'");
>> +
>> + keyspaceDefinition = null;
>> + }
>> +
>> +
>> + }
>> +
>> + /**
>> + * Drop keyspace.
>> + */
>> + public void dropKeyspace() {
>> + // "drop keyspace <keyspaceName>;" query
>> + this.cluster.dropKeyspace(this.cassandraMapping.getKeyspaceName());
>> + }
>> +
>> + /**
>> + * Insert a field in a column.
>> + * @param key the row key
>> + * @param fieldName the field name
>> + * @param value the field value.
>> + */
>> + public void addColumn(String key, String fieldName, Object value) {
>> + if (value == null) {
>> + return;
>> + }
>> + if (value instanceof ByteBuffer) {
>> + value = toString((ByteBuffer) value);
>> + }
>> +
>> + String columnFamily = this.cassandraMapping.getFamily(fieldName);
>> + String columnName = this.cassandraMapping.getColumn(fieldName);
>> +
>> + this.mutator.insert(key, columnFamily, HFactory.createStringColumn(columnName, value.toString()));
>> + }
>> +
>> + /**
>> + * TODO do no convert bytes to string to store a binary field
>> + * @param value
>> + * @return
>> + */
>> + private static String toString(ByteBuffer value) {
>> + ByteBuffer byteBuffer = (ByteBuffer) value;
>> + return ByteUtils.toString(byteBuffer.array(), 0, byteBuffer.limit());
>> + }
>> +
>> + /**
>> + * Insert a member in a super column. This is used for map and record Avro types.
>> + * @param key the row key
>> + * @param fieldName the field name
>> + * @param memberName the member name
>> + * @param value the member value
>> + */
>> + public void addSubColumn(String key, String fieldName, String memberName, Object value) {
>> + if (value == null) {
>> + return;
>> + }
>> +
>> + if (value instanceof ByteBuffer) {
>> + value = toString((ByteBuffer) value);
>> + }
>> +
>> + String columnFamily = this.cassandraMapping.getFamily(fieldName);
>> + String superColumnName = this.cassandraMapping.getColumn(fieldName);
>> +
>> + this.mutator.insert(key, columnFamily, HFactory.createSuperColumn(superColumnName, Arrays.asList(HFactory.createStringColumn(memberName, value.toString())), this.stringSerializer, this.stringSerializer, this.stringSerializer));
>> +
>> + }
>> +
>> + /**
>> + * Select a family column in the keyspace.
>> + * @param cassandraQuery a wrapper of the query
>> + * @param family the family name to be queried
>> + * @return a list of family rows
>> + */
>> + public List<Row<String, String, String>> execute(CassandraQuery<K, T> cassandraQuery, String family) {
>> +
>> + String[] columnNames = cassandraQuery.getColumns(family);
>> + Query<K, T> query = cassandraQuery.getQuery();
>> + int limit = (int) query.getLimit();
>> + String startKey = (String) query.getStartKey();
>> + String endKey = (String) query.getEndKey();
>> +
>> + if (startKey == null) {
>> + startKey = "";
>> + }
>> + if (endKey == null) {
>> + endKey = "";
>> + }
>> +
>> +
>> + RangeSlicesQuery<String, String, String> rangeSlicesQuery = HFactory.createRangeSlicesQuery(this.keyspace, this.stringSerializer, stringSerializer, stringSerializer);
>> + rangeSlicesQuery.setColumnFamily(family);
>> + rangeSlicesQuery.setKeys(startKey, endKey);
>> + rangeSlicesQuery.setRange("", "", false, GoraRecordReader.BUFFER_LIMIT_READ_VALUE);
>> + rangeSlicesQuery.setRowCount(limit);
>> + rangeSlicesQuery.setColumnNames(columnNames);
>> +
>> +
>> + QueryResult<OrderedRows<String, String, String>> queryResult = rangeSlicesQuery.execute();
>> + OrderedRows<String, String, String> orderedRows = queryResult.get();
>> +
>> +
>> + return orderedRows.getList();
>> + }
>> +
>> + /**
>> + * Select the families that contain at least one column mapped to a query field.
>> + * @param query indicates the columns to select
>> + * @return a map which keys are the family names and values the corresponding column names required to get all the query fields.
>> + */
>> + public Map<String, List<String>> getFamilyMap(Query<K, T> query) {
>> + Map<String, List<String>> map = new HashMap<String, List<String>>();
>> + for (String field: query.getFields()) {
>> + String family = this.cassandraMapping.getFamily(field);
>> + String column = this.cassandraMapping.getColumn(field);
>> +
>> + // check if the family value was already initialized
>> + List<String> list = map.get(family);
>> + if (list == null) {
>> + list = new ArrayList<String>();
>> + map.put(family, list);
>> + }
>> +
>> + if (column != null) {
>> + list.add(column);
>> + }
>> +
>> + }
>> +
>> + return map;
>> + }
>> +
>> + /**
>> + * Select the field names according to the column names, which format if fully qualified: "family:column"
>> + * @param query
>> + * @return a map which keys are the fully qualified column names and values the query fields
>> + */
>> + public Map<String, String> getReverseMap(Query<K, T> query) {
>> + Map<String, String> map = new HashMap<String, String>();
>> + for (String field: query.getFields()) {
>> + String family = this.cassandraMapping.getFamily(field);
>> + String column = this.cassandraMapping.getColumn(field);
>> +
>> + map.put(family + ":" + column, field);
>> + }
>> +
>> + return map;
>> +
>> + }
>> +
>> + public boolean isSuper(String family) {
>> + return this.cassandraMapping.isSuper(family);
>> + }
>> +
>> + public List<SuperRow<String, String, String, String>> executeSuper(CassandraQuery<K, T> cassandraQuery, String family) {
>> + String[] columnNames = cassandraQuery.getColumns(family);
>> + Query<K, T> query = cassandraQuery.getQuery();
>> + int limit = (int) query.getLimit();
>> + String startKey = (String) query.getStartKey();
>> + String endKey = (String) query.getEndKey();
>> +
>> + if (startKey == null) {
>> + startKey = "";
>> + }
>> + if (endKey == null) {
>> + endKey = "";
>> + }
>> +
>> +
>> + RangeSuperSlicesQuery<String, String, String, String> rangeSuperSlicesQuery = HFactory.createRangeSuperSlicesQuery(this.keyspace, this.stringSerializer, this.stringSerializer, this.stringSerializer, this.stringSerializer);
>> + rangeSuperSlicesQuery.setColumnFamily(family);
>> + rangeSuperSlicesQuery.setKeys(startKey, endKey);
>> + rangeSuperSlicesQuery.setRange("", "", false, GoraRecordReader.BUFFER_LIMIT_READ_VALUE);
>> + rangeSuperSlicesQuery.setRowCount(limit);
>> + rangeSuperSlicesQuery.setColumnNames(columnNames);
>> +
>> +
>> + QueryResult<OrderedSuperRows<String, String, String, String>> queryResult = rangeSuperSlicesQuery.execute();
>> + OrderedSuperRows<String, String, String, String> orderedRows = queryResult.get();
>> + return orderedRows.getList();
>> +
>> +
>> + }
>> +}
>>
>> Modified: incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java
>> URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java?rev=1149420&r1=1149419&r2=1149420&view=diff
>> ==============================================================================
>> --- incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java (original)
>> +++ incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java Fri Jul 22 00:33:59 2011
>> @@ -1,50 +1,155 @@
>> -/**
>> - * Licensed to the Apache Software Foundation (ASF) under one
>> - * or more contributor license agreements. See the NOTICE file
>> - * distributed with this work for additional information
>> - * regarding copyright ownership. The ASF licenses this file
>> - * to you under the Apache License, Version 2.0 (the
>> - * "License"); you may not use this file except in compliance
>> - * with the License. You may obtain a copy of the License at
>> - *
>> - * http://www.apache.org/licenses/LICENSE-2.0
>> - *
>> - * Unless required by applicable law or agreed to in writing, software
>> - * distributed under the License is distributed on an "AS IS" BASIS,
>> - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>> - * See the License for the specific language governing permissions and
>> - * limitations under the License.
>> - */
>> package org.apache.gora.cassandra.store;
>>
>> +import java.io.IOException;
>> +import java.util.ArrayList;
>> import java.util.HashMap;
>> +import java.util.List;
>> import java.util.Map;
>> -import java.util.Set;
>> +
>> +import me.prettyprint.cassandra.model.BasicColumnFamilyDefinition;
>> +import me.prettyprint.cassandra.service.ThriftCfDef;
>> +import me.prettyprint.hector.api.ddl.ColumnFamilyDefinition;
>> +import me.prettyprint.hector.api.ddl.ColumnType;
>> +import me.prettyprint.hector.api.ddl.ComparatorType;
>> +
>> +import org.jdom.Document;
>> +import org.jdom.Element;
>> +import org.jdom.JDOMException;
>> +import org.jdom.input.SAXBuilder;
>> +import org.slf4j.Logger;
>> +import org.slf4j.LoggerFactory;
>>
>> public class CassandraMapping {
>> +
>> + public static final Logger LOG = LoggerFactory.getLogger(CassandraMapping.class);
>> +
>> + private static final String MAPPING_FILE = "gora-cassandra-mapping.xml";
>> + private static final String KEYSPACE_ELEMENT = "keyspace";
>> + private static final String NAME_ATTRIBUTE = "name";
>> + private static final String MAPPING_ELEMENT = "class";
>> + private static final String COLUMN_ATTRIBUTE = "qualifier";
>> + private static final String FAMILY_ATTRIBUTE = "family";
>> + private static final String SUPER_ATTRIBUTE = "type";
>> + private static final String CLUSTER_ATTRIBUTE = "cluster";
>> + private static final String HOST_ATTRIBUTE = "host";
>> +
>> +
>> + private String hostName;
>> + private String clusterName;
>> + private String keyspaceName;
>> +
>> +
>> + /**
>> + * List of the super column families.
>> + */
>> + private List<String> superFamilies = new ArrayList<String>();
>> +
>> + /**
>> + * Look up the column family associated to the Avro field.
>> + */
>> + private Map<String, String> familyMap = new HashMap<String, String>();
>> +
>> + /**
>> + * Look up the column associated to the Avro field.
>> + */
>> + private Map<String, String> columnMap = new HashMap<String, String>();
>> +
>> + /**
>> + * Look up the column family from its name.
>> + */
>> + private Map<String, BasicColumnFamilyDefinition> columnFamilyDefinitions = new HashMap<String, BasicColumnFamilyDefinition>();
>>
>> - private String keySpace;
>> + public String getHostName() {
>> + return this.hostName;
>> + }
>>
>> - private Map<String, Boolean> families =
>> - new HashMap<String, Boolean>();
>> + public String getClusterName() {
>> + return this.clusterName;
>> + }
>>
>> - public String getKeySpace() {
>> - return keySpace;
>> + public String getKeyspaceName() {
>> + return this.keyspaceName;
>> }
>>
>> - public void setKeySpace(String keySpace) {
>> - this.keySpace = keySpace;
>> +
>> + @SuppressWarnings("unchecked")
>> + public void loadConfiguration() throws JDOMException, IOException {
>> + SAXBuilder saxBuilder = new SAXBuilder();
>> + Document document = saxBuilder.build(getClass().getClassLoader().getResourceAsStream(MAPPING_FILE));
>> + Element root = document.getRootElement();
>> +
>> + Element keyspace = root.getChild(KEYSPACE_ELEMENT);
>> + this.keyspaceName = keyspace.getAttributeValue(NAME_ATTRIBUTE);
>> + this.clusterName = keyspace.getAttributeValue(CLUSTER_ATTRIBUTE);
>> + this.hostName = keyspace.getAttributeValue(HOST_ATTRIBUTE);
>> +
>> + // load column family definitions
>> + List<Element> elements = keyspace.getChildren();
>> + for (Element element: elements) {
>> + BasicColumnFamilyDefinition cfDef = new BasicColumnFamilyDefinition();
>> +
>> + String familyName = element.getAttributeValue(NAME_ATTRIBUTE);
>> +
>> + String superAttribute = element.getAttributeValue(SUPER_ATTRIBUTE);
>> + if (superAttribute != null) {
>> + this.superFamilies.add(familyName);
>> + cfDef.setColumnType(ColumnType.SUPER);
>> + cfDef.setSubComparatorType(ComparatorType.UTF8TYPE);
>> + }
>> +
>> + cfDef.setKeyspaceName(this.keyspaceName);
>> + cfDef.setName(familyName);
>> + cfDef.setComparatorType(ComparatorType.UTF8TYPE);
>> + cfDef.setDefaultValidationClass(ComparatorType.UTF8TYPE.getClassName());
>> +
>> + this.columnFamilyDefinitions.put(familyName, cfDef);
>> +
>> + }
>> +
>> + // load column definitions
>> + Element mapping = root.getChild(MAPPING_ELEMENT);
>> + elements = mapping.getChildren();
>> + for (Element element: elements) {
>> + String fieldName = element.getAttributeValue(NAME_ATTRIBUTE);
>> + String familyName = element.getAttributeValue(FAMILY_ATTRIBUTE);
>> + String columnName = element.getAttributeValue(COLUMN_ATTRIBUTE);
>> + BasicColumnFamilyDefinition columnFamilyDefinition = this.columnFamilyDefinitions.get(familyName);
>> + if (columnFamilyDefinition == null) {
>> + LOG.warn("Family " + familyName + " was not declared in the keyspace.");
>> + }
>> +
>> + this.familyMap.put(fieldName, familyName);
>> + this.columnMap.put(fieldName, columnName);
>> +
>> + }
>> }
>>
>> - public Set<String> getColumnFamilies() {
>> - return families.keySet();
>> + public String getFamily(String name) {
>> + return this.familyMap.get(name);
>> }
>>
>> - public void addColumnFamily(String columnFamily, boolean isSuper) {
>> - families.put(columnFamily, isSuper);
>> + public String getColumn(String name) {
>> + return this.columnMap.get(name);
>> }
>>
>> - public boolean isColumnFamilySuper(String columnFamily) {
>> - return families.get(columnFamily);
>> + /**
>> + * Read family super attribute.
>> + * @param family the family name
>> + * @return true is the family is a super column family
>> + */
>> + public boolean isSuper(String family) {
>> + return this.superFamilies.indexOf(family) != -1;
>> }
>> +
>> + public List<ColumnFamilyDefinition> getColumnFamilyDefinitions() {
>> + List<ColumnFamilyDefinition> list = new ArrayList<ColumnFamilyDefinition>();
>> + for (String key: this.columnFamilyDefinitions.keySet()) {
>> + ColumnFamilyDefinition columnFamilyDefinition = this.columnFamilyDefinitions.get(key);
>> + ThriftCfDef thriftCfDef = new ThriftCfDef(columnFamilyDefinition);
>> + list.add(thriftCfDef);
>> + }
>> +
>> + return list;
>> + }
>> +
>> }
>>
>> Modified: incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
>> URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java?rev=1149420&r1=1149419&r2=1149420&view=diff
>> ==============================================================================
>> --- incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java (original)
>> +++ incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java Fri Jul 22 00:33:59 2011
>> @@ -1,465 +1,334 @@
>> -/**
>> - * Licensed to the Apache Software Foundation (ASF) under one
>> - * or more contributor license agreements. See the NOTICE file
>> - * distributed with this work for additional information
>> - * regarding copyright ownership. The ASF licenses this file
>> - * to you under the Apache License, Version 2.0 (the
>> - * "License"); you may not use this file except in compliance
>> - * with the License. You may obtain a copy of the License at
>> - *
>> - * http://www.apache.org/licenses/LICENSE-2.0
>> - *
>> - * Unless required by applicable law or agreed to in writing, software
>> - * distributed under the License is distributed on an "AS IS" BASIS,
>> - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>> - * See the License for the specific language governing permissions and
>> - * limitations under the License.
>> - */
>> package org.apache.gora.cassandra.store;
>>
>> import java.io.IOException;
>> -import java.net.InetAddress;
>> import java.util.ArrayList;
>> import java.util.HashMap;
>> import java.util.List;
>> import java.util.Map;
>> -import java.util.Map.Entry;
>> -import java.util.Properties;
>> -import java.util.Set;
>> +
>> +import me.prettyprint.hector.api.beans.ColumnSlice;
>> +import me.prettyprint.hector.api.beans.HColumn;
>> +import me.prettyprint.hector.api.beans.HSuperColumn;
>> +import me.prettyprint.hector.api.beans.Row;
>> +import me.prettyprint.hector.api.beans.SuperRow;
>> +import me.prettyprint.hector.api.beans.SuperSlice;
>>
>> import org.apache.avro.Schema;
>> import org.apache.avro.Schema.Field;
>> import org.apache.avro.Schema.Type;
>> import org.apache.avro.generic.GenericArray;
>> import org.apache.avro.util.Utf8;
>> -import org.apache.cassandra.thrift.TokenRange;
>> -import org.apache.gora.cassandra.client.CassandraClient;
>> -import org.apache.gora.cassandra.client.Mutate;
>> -import org.apache.gora.cassandra.client.Row;
>> -import org.apache.gora.cassandra.client.Select;
>> -import org.apache.gora.cassandra.client.SimpleCassandraClient;
>> -import org.apache.gora.cassandra.query.CassandraPartitionQuery;
>> import org.apache.gora.cassandra.query.CassandraQuery;
>> import org.apache.gora.cassandra.query.CassandraResult;
>> -import org.apache.gora.persistency.ListGenericArray;
>> +import org.apache.gora.cassandra.query.CassandraResultSet;
>> +import org.apache.gora.cassandra.query.CassandraRow;
>> +import org.apache.gora.cassandra.query.CassandraSubColumn;
>> +import org.apache.gora.cassandra.query.CassandraSuperColumn;
>> import org.apache.gora.persistency.Persistent;
>> -import org.apache.gora.persistency.State;
>> -import org.apache.gora.persistency.StateManager;
>> import org.apache.gora.persistency.StatefulHashMap;
>> -import org.apache.gora.persistency.StatefulMap;
>> +import org.apache.gora.persistency.impl.PersistentBase;
>> +import org.apache.gora.persistency.impl.StateManagerImpl;
>> import org.apache.gora.query.PartitionQuery;
>> import org.apache.gora.query.Query;
>> import org.apache.gora.query.Result;
>> -import org.apache.gora.store.DataStoreFactory;
>> +import org.apache.gora.query.impl.PartitionQueryImpl;
>> import org.apache.gora.store.impl.DataStoreBase;
>> -import org.apache.gora.util.ByteUtils;
>> -import org.jdom.Document;
>> -import org.jdom.Element;
>> -import org.jdom.input.SAXBuilder;
>> -
>> -/**
>> - * DataStore for Cassandra.
>> - *
>> - * <p> Note: CassandraStore is not thread-safe. </p>
>> - */
>> -public class CassandraStore<K, T extends Persistent>
>> -extends DataStoreBase<K, T> {
>> -
>> - private static final String ERROR_MESSAGE =
>> - "Cassandra does not support creating or modifying ColumnFamilies during runtime";
>> -
>> - private static final String DEFAULT_MAPPING_FILE = "gora-cassandra-mapping.xml";
>> -
>> - private static final int SPLIT_SIZE = 65536;
>> -
>> - private static final int BATCH_COUNT = 256;
>> -
>> - private CassandraClient client;
>> -
>> - private Map<String, CassandraColumn> columnMap;
>> +import org.slf4j.Logger;
>> +import org.slf4j.LoggerFactory;
>>
>> - private CassandraMapping mapping;
>> -
>> - @Override
>> - public void initialize(Class<K> keyClass, Class<T> persistentClass,
>> - Properties properties) throws IOException {
>> - super.initialize(keyClass, persistentClass, properties);
>> -
>> - String mappingFile =
>> - DataStoreFactory.getMappingFile(properties, this, DEFAULT_MAPPING_FILE);
>> -
>> - readMapping(mappingFile);
>> +public class CassandraStore<K, T extends Persistent> extends DataStoreBase<K, T> {
>> + public static final Logger LOG = LoggerFactory.getLogger(CassandraStore.class);
>> +
>> + private CassandraClient<K, T> cassandraClient = new CassandraClient<K, T>();
>> +
>> + /**
>> + * The values are Avro fields pending to be stored.
>> + */
>> + private Map<K, T> buffer = new HashMap<K, T>();
>> +
>> + public CassandraStore() throws Exception {
>> + this.cassandraClient.init();
>> }
>>
>> @Override
>> - public String getSchemaName() {
>> - return mapping.getKeySpace();
>> + public void close() throws IOException {
>> + LOG.debug("close");
>> + flush();
>> }
>>
>> @Override
>> - public void createSchema() throws IOException {
>> - throw new UnsupportedOperationException(ERROR_MESSAGE);
>> + public void createSchema() {
>> + LOG.debug("create schema");
>> + this.cassandraClient.checkKeyspace();
>> }
>>
>> @Override
>> - public void deleteSchema() throws IOException {
>> - throw new UnsupportedOperationException(ERROR_MESSAGE);
>> + public boolean delete(K key) throws IOException {
>> + LOG.debug("delete " + key);
>> + return false;
>> }
>>
>> @Override
>> - public boolean schemaExists() throws IOException {
>> - return true;
>> + public long deleteByQuery(Query<K, T> query) throws IOException {
>> + LOG.debug("delete by query " + query);
>> + return 0;
>> }
>>
>> - public CassandraClient getClientByLocation(String endPoint) {
>> - return client;
>> + @Override
>> + public void deleteSchema() throws IOException {
>> + LOG.debug("delete schema");
>> + this.cassandraClient.dropKeyspace();
>> }
>>
>> - public Select createSelect(String[] fields) {
>> - Select select = new Select();
>> - if (fields == null) {
>> - fields = beanFactory.getCachedPersistent().getFields();
>> - }
>> - for (String f : fields) {
>> - CassandraColumn col = columnMap.get(f);
>> - Schema fieldSchema = fieldMap.get(f).schema();
>> - switch (fieldSchema.getType()) {
>> - case MAP:
>> - case ARRAY:
>> - if (col.isSuperColumn()) {
>> - select.addAllColumnsForSuperColumn(col.family, col.superColumn);
>> - } else {
>> - select.addColumnAll(col.family);
>> - }
>> - break;
>> - default:
>> - if (col.isSuperColumn()) {
>> - select.addColumnName(col.family, col.superColumn, col.column);
>> - } else {
>> - select.addColumnName(col.family, col.column);
>> - }
>> - break;
>> + @Override
>> + public Result<K, T> execute(Query<K, T> query) throws IOException {
>> +
>> + Map<String, List<String>> familyMap = this.cassandraClient.getFamilyMap(query);
>> + Map<String, String> reverseMap = this.cassandraClient.getReverseMap(query);
>> +
>> + CassandraQuery<K, T> cassandraQuery = new CassandraQuery<K, T>();
>> + cassandraQuery.setQuery(query);
>> + cassandraQuery.setFamilyMap(familyMap);
>> +
>> + CassandraResult<K, T> cassandraResult = new CassandraResult<K, T>(this, query);
>> + cassandraResult.setReverseMap(reverseMap);
>> +
>> + CassandraResultSet cassandraResultSet = new CassandraResultSet();
>> +
>> + // We query Cassandra keyspace by families.
>> + for (String family : familyMap.keySet()) {
>> + if (this.cassandraClient.isSuper(family)) {
>> + addSuperColumns(family, cassandraQuery, cassandraResultSet);
>> +
>> + } else {
>> + addSubColumns(family, cassandraQuery, cassandraResultSet);
>> +
>> }
>> +
>> }
>> - return select;
>> - }
>> +
>> + cassandraResult.setResultSet(cassandraResultSet);
>> +
>> +
>> + return cassandraResult;
>>
>> - @Override
>> - public T get(K key, String[] fields) throws IOException {
>> - if (fields == null) {
>> - fields = beanFactory.getCachedPersistent().getFields();
>> - }
>> - Select select = createSelect(fields);
>> - try {
>> - Row result = client.get(key.toString(), select);
>> - return newInstance(result, fields);
>> - } catch (Exception e) {
>> - throw new IOException(e);
>> - }
>> }
>>
>> - @SuppressWarnings("rawtypes")
>> - private void setField(T persistent, Field field, StatefulMap map) {
>> - persistent.put(field.pos(), map);
>> - }
>> -
>> - private void setField(T persistent, Field field, byte[] val)
>> - throws IOException {
>> - persistent.put(field.pos()
>> - , ByteUtils.fromBytes(val, field.schema(), datumReader, persistent.get(field.pos())));
>> - }
>> -
>> - @SuppressWarnings("rawtypes")
>> - private void setField(T persistent, Field field, GenericArray list) {
>> - persistent.put(field.pos(), list);
>> - }
>> -
>> - @SuppressWarnings({ "rawtypes", "unchecked" })
>> - public T newInstance(Row result, String[] fields)
>> - throws IOException {
>> - if(result == null)
>> - return null;
>> -
>> - T persistent = newPersistent();
>> - StateManager stateManager = persistent.getStateManager();
>> - for (String f : fields) {
>> - CassandraColumn col = columnMap.get(f);
>> - Field field = fieldMap.get(f);
>> - Schema fieldSchema = field.schema();
>> - Map<String, byte[]> qualMap;
>> - switch(fieldSchema.getType()) {
>> - case MAP:
>> - if (col.isSuperColumn()) {
>> - qualMap = result.getSuperColumn(col.family, col.superColumn);
>> - } else {
>> - qualMap = result.getColumn(col.family);
>> - }
>> - if (qualMap == null) {
>> - continue;
>> - }
>> - Schema valueSchema = fieldSchema.getValueType();
>> - StatefulMap map = new StatefulHashMap();
>> - for (Entry<String, byte[]> e : qualMap.entrySet()) {
>> - Utf8 mapKey = new Utf8(e.getKey());
>> - map.put(mapKey, ByteUtils.fromBytes(e.getValue(), valueSchema, datumReader, null));
>> - map.putState(mapKey, State.CLEAN);
>> - }
>> - setField(persistent, field, map);
>> - break;
>> - case ARRAY:
>> - if (col.isSuperColumn()) {
>> - qualMap = result.getSuperColumn(col.family, col.superColumn);
>> - } else {
>> - qualMap = result.getColumn(col.family);
>> - }
>> - if (qualMap == null) {
>> - continue;
>> - }
>> - valueSchema = fieldSchema.getElementType();
>> - ArrayList arrayList = new ArrayList();
>> - for (Entry<String, byte[]> e : qualMap.entrySet()) {
>> - arrayList.add(ByteUtils.fromBytes(e.getValue(), valueSchema, datumReader, null));
>> - }
>> - ListGenericArray arr = new ListGenericArray(fieldSchema, arrayList);
>> - setField(persistent, field, arr);
>> - break;
>> - default:
>> - byte[] val;
>> - if (col.isSuperColumn()) {
>> - val = result.get(col.family, col.superColumn, col.column);
>> - } else {
>> - val = result.get(col.family, col.column);
>> - }
>> - if (val == null) {
>> - continue;
>> - }
>> - setField(persistent, field, val);
>> - break;
>> + private void addSubColumns(String family, CassandraQuery<K, T> cassandraQuery,
>> + CassandraResultSet cassandraResultSet) {
>> + // select family columns that are included in the query
>> + List<Row<String, String, String>> rows = this.cassandraClient.execute(cassandraQuery, family);
>> +
>> + for (Row<String, String, String> row : rows) {
>> + String key = row.getKey();
>> +
>> + // find associated row in the resultset
>> + CassandraRow cassandraRow = cassandraResultSet.getRow(key);
>> + if (cassandraRow == null) {
>> + cassandraRow = new CassandraRow();
>> + cassandraResultSet.putRow(key, cassandraRow);
>> + cassandraRow.setKey(key);
>> }
>> +
>> + ColumnSlice<String, String> columnSlice = row.getColumnSlice();
>> +
>> + for (HColumn<String, String> hColumn : columnSlice.getColumns()) {
>> + CassandraSubColumn cassandraSubColumn = new CassandraSubColumn();
>> + cassandraSubColumn.setValue(hColumn);
>> + cassandraSubColumn.setFamily(family);
>> + cassandraRow.add(cassandraSubColumn);
>> + }
>> +
>> }
>> - stateManager.clearDirty(persistent);
>> - return persistent;
>> }
>>
>> - @Override
>> - public void put(K key, T obj) throws IOException {
>> - Mutate mutate = new Mutate();
>> - Schema schema = obj.getSchema();
>> - StateManager stateManager = obj.getStateManager();
>> - List<Field> fields = schema.getFields();
>> - String qual;
>> - byte[] value;
>> - for (int i = 0; i < fields.size(); i++) {
>> - if (!stateManager.isDirty(obj, i)) {
>> - continue;
>> + private void addSuperColumns(String family, CassandraQuery<K, T> cassandraQuery,
>> + CassandraResultSet cassandraResultSet) {
>> +
>> + List<SuperRow<String, String, String, String>> superRows = this.cassandraClient.executeSuper(cassandraQuery, family);
>> + for (SuperRow<String, String, String, String> superRow: superRows) {
>> + String key = superRow.getKey();
>> + CassandraRow cassandraRow = cassandraResultSet.getRow(key);
>> + if (cassandraRow == null) {
>> + cassandraRow = new CassandraRow();
>> + cassandraResultSet.putRow(key, cassandraRow);
>> + cassandraRow.setKey(key);
>> }
>> - Field field = fields.get(i);
>> - Type type = field.schema().getType();
>> - Object o = obj.get(i);
>> - CassandraColumn col = columnMap.get(field.name());
>> -
>> - switch(type) {
>> - case MAP:
>> - if(o instanceof StatefulMap) {
>> - @SuppressWarnings("unchecked")
>> - StatefulMap<Utf8, ?> map = (StatefulMap<Utf8, ?>) o;
>> - for (Entry<Utf8, State> e : map.states().entrySet()) {
>> - Utf8 mapKey = e.getKey();
>> - switch (e.getValue()) {
>> - case DIRTY:
>> - qual = mapKey.toString();
>> - value = ByteUtils.toBytes(map.get(mapKey), field.schema().getValueType(), datumWriter);
>> - if (col.isSuperColumn()) {
>> - mutate.put(col.family, col.superColumn, qual, value);
>> - } else {
>> - mutate.put(col.family, qual, value);
>> - }
>> - break;
>> - case DELETED:
>> - qual = mapKey.toString();
>> - if (col.isSuperColumn()) {
>> - mutate.delete(col.family, col.superColumn, qual);
>> - } else {
>> - mutate.delete(col.family, qual);
>> - }
>> - break;
>> - }
>> - }
>> - } else {
>> - @SuppressWarnings({ "rawtypes", "unchecked" })
>> - Set<Map.Entry> set = ((Map)o).entrySet();
>> - for(@SuppressWarnings("rawtypes") Entry entry: set) {
>> - qual = entry.getKey().toString();
>> - value = ByteUtils.toBytes(entry.getValue().toString());
>> - if (col.isSuperColumn()) {
>> - mutate.put(col.family, col.superColumn, qual, value);
>> - } else {
>> - mutate.put(col.family, qual, value);
>> - }
>> - }
>> - }
>> - break;
>> - case ARRAY:
>> - if(o instanceof GenericArray) {
>> - @SuppressWarnings("rawtypes")
>> - GenericArray arr = (GenericArray) o;
>> - int j=0;
>> - for(Object item : arr) {
>> - value = ByteUtils.toBytes(item.toString());
>> - if (col.isSuperColumn()) {
>> - mutate.put(col.family, col.superColumn, Integer.toString(j), value);
>> - } else {
>> - mutate.put(col.family, Integer.toString(j), value);
>> - }
>> - j++;
>> - }
>> - }
>> - break;
>> - default:
>> - value = ByteUtils.toBytes(o, field.schema(), datumWriter);
>> - if (col.isSuperColumn()) {
>> - mutate.put(col.family, col.superColumn, col.column, value);
>> - } else {
>> - mutate.put(col.family, col.column, value);
>> - }
>> - break;
>> +
>> + SuperSlice<String, String, String> superSlice = superRow.getSuperSlice();
>> + for (HSuperColumn<String, String, String> hSuperColumn: superSlice.getSuperColumns()) {
>> + CassandraSuperColumn cassandraSuperColumn = new CassandraSuperColumn();
>> + cassandraSuperColumn.setValue(hSuperColumn);
>> + cassandraSuperColumn.setFamily(family);
>> + cassandraRow.add(cassandraSuperColumn);
>> }
>> }
>> -
>> - if(!mutate.isEmpty())
>> - client.mutate(key.toString(), mutate);
>> }
>>
>> - @Override
>> - public boolean delete(K key) throws IOException {
>> - Mutate mutate = new Mutate();
>> - for (String family : mapping.getColumnFamilies()) {
>> - mutate.deleteAll(family);
>> + /**
>> + * Flush the buffer. Write the buffered rows.
>> + * @see org.apache.gora.store.DataStore#flush()
>> + */
>> + @Override
>> + public void flush() throws IOException {
>> + for (K key: this.buffer.keySet()) {
>> + T value = this.buffer.get(key);
>> + Schema schema = value.getSchema();
>> + for (Field field: schema.getFields()) {
>> + if (value.isDirty(field.pos())) {
>> + addOrUpdateField((String) key, field, value.get(field.pos()));
>> + }
>> + }
>> }
>> -
>> - client.mutate(key.toString(), mutate);
>> - return true;
>> +
>> + this.buffer.clear();
>> }
>>
>> @Override
>> - public void flush() throws IOException { }
>> -
>> - @Override
>> - public void close() throws IOException {
>> - client.close();
>> + public T get(K key, String[] fields) throws IOException {
>> + LOG.info("get " + key);
>> + return null;
>> }
>>
>> @Override
>> - public Query<K, T> newQuery() {
>> - return new CassandraQuery<K, T>(this);
>> + public List<PartitionQuery<K, T>> getPartitions(Query<K, T> query)
>> + throws IOException {
>> + // just a single partition
>> + List<PartitionQuery<K,T>> partitions = new ArrayList<PartitionQuery<K,T>>();
>> + partitions.add(new PartitionQueryImpl<K,T>(query));
>> + return partitions;
>> }
>>
>> @Override
>> - public long deleteByQuery(Query<K, T> query) throws IOException {
>> - // TODO Auto-generated method stub
>> - return 0;
>> + public String getSchemaName() {
>> + LOG.info("get schema name");
>> + return null;
>> }
>>
>> @Override
>> - public Result<K, T> execute(Query<K, T> query) throws IOException {
>> - return new CassandraResult<K, T>(this, query, BATCH_COUNT);
>> + public Query<K, T> newQuery() {
>> + return new CassandraQuery<K, T>(this);
>> }
>>
>> - @Override
>> - public List<PartitionQuery<K, T>> getPartitions(Query<K, T> query)
>> - throws IOException {
>> - List<PartitionQuery<K,T>> partitions = new ArrayList<PartitionQuery<K,T>>();
>> -
>> - List<TokenRange> rangeList = client.describeRing();
>> - for (TokenRange range : rangeList) {
>> - List<String> tokens =
>> - client.describeSplits(range.start_token, range.end_token, SPLIT_SIZE);
>> - // turn the sub-ranges into InputSplits
>> - String[] endpoints = range.endpoints.toArray(new String[range.endpoints.size()]);
>> - // hadoop needs hostname, not ip
>> - for (int i = 0; i < endpoints.length; i++) {
>> - endpoints[i] = InetAddress.getByName(endpoints[i]).getHostName();
>> - }
>> -
>> - for (int i = 1; i < tokens.size(); i++) {
>> - CassandraPartitionQuery<K, T> partitionQuery =
>> - new CassandraPartitionQuery<K, T>(query, tokens.get(i - 1), tokens.get(i), endpoints, SPLIT_SIZE);
>> - partitions.add(partitionQuery);
>> + /**
>> + * Duplicate instance to keep all the objects in memory till flushing.
>> + * @see org.apache.gora.store.DataStore#put(java.lang.Object, org.apache.gora.persistency.Persistent)
>> + */
>> + @Override
>> + public void put(K key, T value) throws IOException {
>> + T p = (T) value.newInstance(new StateManagerImpl());
>> + Schema schema = value.getSchema();
>> + for (Field field: schema.getFields()) {
>> + if (value.isDirty(field.pos())) {
>> + Object fieldValue = value.get(field.pos());
>> +
>> + // check if field has a nested structure (map or record)
>> + Schema fieldSchema = field.schema();
>> + Type type = fieldSchema.getType();
>> + switch(type) {
>> + case RECORD:
>> + Persistent persistent = (Persistent) fieldValue;
>> + Persistent newRecord = persistent.newInstance(new StateManagerImpl());
>> + for (Field member: fieldSchema.getFields()) {
>> + newRecord.put(member.pos(), persistent.get(member.pos()));
>> + }
>> + fieldValue = newRecord;
>> + break;
>> + case MAP:
>> + StatefulHashMap<?, ?> map = (StatefulHashMap<?, ?>) fieldValue;
>> + StatefulHashMap<?, ?> newMap = new StatefulHashMap(map);
>> + fieldValue = newMap;
>> + break;
>> + }
>> +
>> + p.put(field.pos(), fieldValue);
>> }
>> }
>> - return partitions;
>> - }
>> -
>> - private CassandraClient createClient() throws IOException {
>> - String serverStr =
>> - DataStoreFactory.findPropertyOrDie(properties, this, "servers");
>> - String[] server1Parts = serverStr.split(",")[0].split(":");
>> - try {
>> - return new SimpleCassandraClient(server1Parts[0],
>> - Integer.parseInt(server1Parts[1]), mapping.getKeySpace());
>> - } catch (Exception e) {
>> - throw new IOException(e);
>> - }
>> - }
>> -
>> - @SuppressWarnings("unchecked")
>> - protected void readMapping(String filename) throws IOException {
>> -
>> - mapping = new CassandraMapping();
>> - columnMap = new HashMap<String, CassandraColumn>();
>> -
>> - try {
>> - SAXBuilder builder = new SAXBuilder();
>> - Document doc = builder.build(getClass().getClassLoader()
>> - .getResourceAsStream(filename));
>> -
>> - List<Element> classes = doc.getRootElement().getChildren("class");
>> -
>> - for(Element classElement: classes) {
>> - if(classElement.getAttributeValue("keyClass").equals(keyClass.getCanonicalName())
>> - && classElement.getAttributeValue("name").equals(
>> - persistentClass.getCanonicalName())) {
>> -
>> - String keySpace = classElement.getAttributeValue("keyspace");
>> - mapping.setKeySpace(keySpace);
>> - client = createClient();
>> - Map<String, Map<String, String>> keySpaceDesc = client.describeKeySpace();
>> - for (Entry<String, Map<String, String>> e : keySpaceDesc.entrySet()) {
>> - boolean isSuper = e.getValue().get("Type").equals("Super");
>> - mapping.addColumnFamily(e.getKey(), isSuper);
>> - }
>> -
>> - List<Element> fields = classElement.getChildren("field");
>> -
>> - for(Element field:fields) {
>> - String fieldName = field.getAttributeValue("name");
>> - String path = field.getAttributeValue("path");
>> - String[] parts = path.split(":");
>> - String columnFamily = parts[0];
>> - String superColumn = null;
>> - String column = null;
>> -
>> - boolean isSuper = mapping.isColumnFamilySuper(columnFamily);
>> - if (isSuper) {
>> - superColumn = parts[1];
>> - if (parts.length == 3) {
>> - column = parts[2];
>> +
>> + this.buffer.put(key, p);
>> + }
>> +
>> + /**
>> + * Add a field to Cassandra according to its type.
>> + * @param key the key of the row where the field should be added
>> + * @param field the Avro field representing a datum
>> + * @param value the field value
>> + */
>> + private void addOrUpdateField(String key, Field field, Object value) {
>> + Schema schema = field.schema();
>> + Type type = schema.getType();
>> + //LOG.info(field.name() + " " + type.name());
>> + switch (type) {
>> + case STRING:
>> + this.cassandraClient.addColumn(key, field.name(), value);
>> + break;
>> + case INT:
>> + this.cassandraClient.addColumn(key, field.name(), value);
>> + break;
>> + case LONG:
>> + this.cassandraClient.addColumn(key, field.name(), value);
>> + break;
>> + case BYTES:
>> + this.cassandraClient.addColumn(key, field.name(), value);
>> + break;
>> + case FLOAT:
>> + this.cassandraClient.addColumn(key, field.name(), value);
>> + break;
>> + case RECORD:
>> + if (value != null) {
>> + if (value instanceof PersistentBase) {
>> + PersistentBase persistentBase = (PersistentBase) value;
>> + for (Field member: schema.getFields()) {
>> +
>> + // TODO: hack, do not store empty arrays
>> + Object memberValue = persistentBase.get(member.pos());
>> + if (memberValue instanceof GenericArray<?>) {
>> + GenericArray<String> array = (GenericArray<String>) memberValue;
>> + if (array.size() == 0) {
>> + continue;
>> + }
>> }
>> - } else {
>> - if (parts.length == 2) {
>> - column = parts[1];
>> +
>> + this.cassandraClient.addSubColumn(key, field.name(), member.name(), memberValue);
>> + }
>> + } else {
>> + LOG.info("Record not supported: " + value.toString());
>> +
>> + }
>> + }
>> + break;
>> + case MAP:
>> + if (value != null) {
>> + if (value instanceof StatefulHashMap<?, ?>) {
>> + //TODO cast to stateful map and only write dirty keys
>> + Map<Utf8, Object> map = (Map<Utf8, Object>) value;
>> + for (Utf8 mapKey: map.keySet()) {
>> +
>> + // TODO: hack, do not store empty arrays
>> + Object keyValue = map.get(mapKey);
>> + if (keyValue instanceof GenericArray<?>) {
>> + GenericArray<String> array = (GenericArray<String>) keyValue;
>> + if (array.size() == 0) {
>> + continue;
>> + }
>> }
>> +
>> + this.cassandraClient.addSubColumn(key, field.name(), mapKey.toString(), keyValue);
>> }
>> -
>> - columnMap.put(fieldName,
>> - new CassandraColumn(columnFamily, superColumn, column));
>> + } else {
>> + LOG.info("Map not supported: " + value.toString());
>> }
>> -
>> - break;
>> }
>> - }
>> - } catch(Exception ex) {
>> - throw new IOException(ex);
>> + break;
>> + default:
>> + LOG.info("Type not considered: " + type.name());
>> }
>> }
>> -}
>> \ No newline at end of file
>> +
>> + @Override
>> + public boolean schemaExists() throws IOException {
>> + LOG.info("schema exists");
>> + return false;
>> + }
>> +
>> +}
>>
>> Modified: incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/GoraRecordReader.java
>> URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/GoraRecordReader.java?rev=1149420&r1=1149419&r2=1149420&view=diff
>> ==============================================================================
>> --- incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/GoraRecordReader.java (original)
>> +++ incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/GoraRecordReader.java Fri Jul 22 00:33:59 2011
>> @@ -36,8 +36,8 @@ import org.slf4j.LoggerFactory;
>> public class GoraRecordReader<K, T extends Persistent> extends RecordReader<K,T> {
>> public static final Logger LOG = LoggerFactory.getLogger(GoraRecordReader.class);
>>
>> - private static final String BUFFER_LIMIT_READ_NAME = "gora.buffer.read.limit";
>> - private static final int BUFFER_LIMIT_READ_VALUE = 10000;
>> + public static final String BUFFER_LIMIT_READ_NAME = "gora.buffer.read.limit";
>> + public static final int BUFFER_LIMIT_READ_VALUE = 10000;
>>
>> protected Query<K,T> query;
>> protected Result<K,T> result;
>>
>>
>>
>