You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2013/09/26 20:54:33 UTC
[1/6] git commit: Don't add extraneous field with CqlStorage Patch by
Sam Tunnicliffe, reviewed by Alex Liu for CASSANDRA-6071
Updated Branches:
refs/heads/cassandra-1.2 00e871d0f -> 389ff55e2
refs/heads/cassandra-2.0 d49303078 -> 006eec4a5
refs/heads/trunk 2c7b61b76 -> 246fefabf
Don't add extraneous field with CqlStorage
Patch by Sam Tunnicliffe, reviewed by Alex Liu for CASSANDRA-6071
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/389ff55e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/389ff55e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/389ff55e
Branch: refs/heads/cassandra-1.2
Commit: 389ff55e2bbc3046a6ad1aba85bdaab0e38dc6e8
Parents: 00e871d
Author: Brandon Williams <br...@apache.org>
Authored: Thu Sep 26 13:49:07 2013 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Thu Sep 26 13:49:07 2013 -0500
----------------------------------------------------------------------
.../hadoop/pig/AbstractCassandraStorage.java | 151 ++-----------------
.../cassandra/hadoop/pig/CassandraStorage.java | 2 +-
.../apache/cassandra/hadoop/pig/CqlStorage.java | 144 +++++++++++++++++-
3 files changed, 153 insertions(+), 144 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/389ff55e/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java b/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
index 50671da..ce92014 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
@@ -641,7 +641,7 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
NotFoundException;
/** get column meta data */
- protected List<ColumnDef> getColumnMeta(Cassandra.Client client, boolean cassandraStorage)
+ protected List<ColumnDef> getColumnMeta(Cassandra.Client client, boolean cassandraStorage, boolean includeCompactValueColumn)
throws InvalidRequestException,
UnavailableException,
TimedOutException,
@@ -666,9 +666,13 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
List<CqlRow> rows = result.rows;
List<ColumnDef> columnDefs = new ArrayList<ColumnDef>();
- if (!cassandraStorage && (rows == null || rows.isEmpty()))
+ if (rows == null || rows.isEmpty())
{
- // check classic thrift tables
+ // if CassandraStorage, just return the empty list
+ if (cassandraStorage)
+ return columnDefs;
+
+ // otherwise for CqlStorage, check metadata for classic thrift tables
CFDefinition cfDefinition = getCfDefinition(keyspace, column_family, client);
for (ColumnIdentifier column : cfDefinition.metadata.keySet())
{
@@ -680,7 +684,9 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
cDef.validation_class = type;
columnDefs.add(cDef);
}
- if (columnDefs.size() == 0)
+ // we may not need to include the value column for compact tables as we
+ // could have already processed it as schema_columnfamilies.value_alias
+ if (columnDefs.size() == 0 && includeCompactValueColumn)
{
String value = cfDefinition.value != null ? cfDefinition.value.toString() : null;
if ("value".equals(value))
@@ -693,8 +699,6 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
}
return columnDefs;
}
- else if (rows == null || rows.isEmpty())
- return columnDefs;
Iterator<CqlRow> iterator = rows.iterator();
while (iterator.hasNext())
@@ -711,138 +715,6 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
return columnDefs;
}
- /** get keys meta data */
- protected List<ColumnDef> getKeysMeta(Cassandra.Client client)
- throws Exception
- {
- String query = "SELECT key_aliases, " +
- " column_aliases, " +
- " key_validator, " +
- " comparator, " +
- " keyspace_name, " +
- " value_alias, " +
- " default_validator " +
- "FROM system.schema_columnfamilies " +
- "WHERE keyspace_name = '%s'" +
- " AND columnfamily_name = '%s' ";
-
- CqlResult result = client.execute_cql3_query(
- ByteBufferUtil.bytes(String.format(query, keyspace, column_family)),
- Compression.NONE,
- ConsistencyLevel.ONE);
-
- if (result == null || result.rows == null || result.rows.isEmpty())
- return null;
-
- Iterator<CqlRow> iteraRow = result.rows.iterator();
- List<ColumnDef> keys = new ArrayList<ColumnDef>();
- if (iteraRow.hasNext())
- {
- CqlRow cqlRow = iteraRow.next();
- String name = ByteBufferUtil.string(cqlRow.columns.get(4).value);
- logger.debug("Found ksDef name: {}", name);
- String keyString = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(0).getValue()));
-
- logger.debug("partition keys: {}", keyString);
- List<String> keyNames = FBUtilities.fromJsonList(keyString);
-
- Iterator<String> iterator = keyNames.iterator();
- while (iterator.hasNext())
- {
- ColumnDef cDef = new ColumnDef();
- cDef.name = ByteBufferUtil.bytes(iterator.next());
- keys.add(cDef);
- }
- // classic thrift tables
- if (keys.size() == 0)
- {
- CFDefinition cfDefinition = getCfDefinition(keyspace, column_family, client);
- for (ColumnIdentifier column : cfDefinition.keys.keySet())
- {
- String key = column.toString();
- logger.debug("name: {} ", key);
- ColumnDef cDef = new ColumnDef();
- cDef.name = ByteBufferUtil.bytes(key);
- keys.add(cDef);
- }
- for (ColumnIdentifier column : cfDefinition.columns.keySet())
- {
- String key = column.toString();
- logger.debug("name: {} ", key);
- ColumnDef cDef = new ColumnDef();
- cDef.name = ByteBufferUtil.bytes(key);
- keys.add(cDef);
- }
- }
-
- keyString = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(1).getValue()));
-
- logger.debug("cluster keys: {}", keyString);
- keyNames = FBUtilities.fromJsonList(keyString);
-
- iterator = keyNames.iterator();
- while (iterator.hasNext())
- {
- ColumnDef cDef = new ColumnDef();
- cDef.name = ByteBufferUtil.bytes(iterator.next());
- keys.add(cDef);
- }
-
- String validator = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(2).getValue()));
- logger.debug("row key validator: {}", validator);
- AbstractType<?> keyValidator = parseType(validator);
-
- Iterator<ColumnDef> keyItera = keys.iterator();
- if (keyValidator instanceof CompositeType)
- {
- Iterator<AbstractType<?>> typeItera = ((CompositeType) keyValidator).types.iterator();
- while (typeItera.hasNext())
- keyItera.next().validation_class = typeItera.next().toString();
- }
- else
- keyItera.next().validation_class = keyValidator.toString();
-
- validator = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(3).getValue()));
- logger.debug("cluster key validator: {}", validator);
-
- if (keyItera.hasNext() && validator != null && !validator.isEmpty())
- {
- AbstractType<?> clusterKeyValidator = parseType(validator);
-
- if (clusterKeyValidator instanceof CompositeType)
- {
- Iterator<AbstractType<?>> typeItera = ((CompositeType) clusterKeyValidator).types.iterator();
- while (keyItera.hasNext())
- keyItera.next().validation_class = typeItera.next().toString();
- }
- else
- keyItera.next().validation_class = clusterKeyValidator.toString();
- }
-
- // compact value_alias column
- if (cqlRow.columns.get(5).value != null)
- {
- try
- {
- String compactValidator = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(6).getValue()));
- logger.debug("default validator: {}", compactValidator);
- AbstractType<?> defaultValidator = parseType(compactValidator);
-
- ColumnDef cDef = new ColumnDef();
- cDef.name = cqlRow.columns.get(5).value;
- cDef.validation_class = defaultValidator.toString();
- keys.add(cDef);
- }
- catch (Exception e)
- {
- // no compact column at value_alias
- }
- }
-
- }
- return keys;
- }
-
/** get index type from string */
protected IndexType getIndexType(String type)
{
@@ -884,9 +756,8 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
return indexes;
}
-
/** get CFDefinition of a column family */
- private CFDefinition getCfDefinition(String ks, String cf, Cassandra.Client client)
+ protected CFDefinition getCfDefinition(String ks, String cf, Cassandra.Client client)
throws NotFoundException,
InvalidRequestException,
TException,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/389ff55e/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
index 8cf06f2..09171a0 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
@@ -711,7 +711,7 @@ public class CassandraStorage extends AbstractCassandraStorage
if (cql3Table)
return new ArrayList<ColumnDef>();
- return getColumnMeta(client, true);
+ return getColumnMeta(client, true, true);
}
/** convert key to a tuple */
http://git-wip-us.apache.org/repos/asf/cassandra/blob/389ff55e/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
index 7780ca9..79abc2c 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
@@ -23,6 +23,8 @@ import java.nio.charset.CharacterCodingException;
import java.util.*;
+import org.apache.cassandra.cql3.CFDefinition;
+import org.apache.cassandra.cql3.ColumnIdentifier;
import org.apache.cassandra.db.IColumn;
import org.apache.cassandra.db.Column;
import org.apache.cassandra.db.marshal.*;
@@ -31,6 +33,8 @@ import org.apache.cassandra.hadoop.*;
import org.apache.cassandra.hadoop.cql3.CqlConfigHelper;
import org.apache.cassandra.thrift.*;
import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+
import org.apache.hadoop.mapreduce.*;
import org.apache.pig.Expression;
import org.apache.pig.Expression.OpType;
@@ -61,7 +65,8 @@ public class CqlStorage extends AbstractCassandraStorage
private String columns;
private String outputQuery;
private String whereClause;
-
+ private boolean hasCompactValueAlias = false;
+
public CqlStorage()
{
this(1000);
@@ -450,7 +455,7 @@ public class CqlStorage extends AbstractCassandraStorage
}
// get other columns
- List<ColumnDef> columns = getColumnMeta(client, false);
+ List<ColumnDef> columns = getColumnMeta(client, false, !hasCompactValueAlias);
// combine all columns in a list
if (keyColumns != null && columns != null)
@@ -458,7 +463,140 @@ public class CqlStorage extends AbstractCassandraStorage
return keyColumns;
}
-
+
+ /** get keys meta data */
+ protected List<ColumnDef> getKeysMeta(Cassandra.Client client)
+ throws Exception
+ {
+ String query = "SELECT key_aliases, " +
+ " column_aliases, " +
+ " key_validator, " +
+ " comparator, " +
+ " keyspace_name, " +
+ " value_alias, " +
+ " default_validator " +
+ "FROM system.schema_columnfamilies " +
+ "WHERE keyspace_name = '%s'" +
+ " AND columnfamily_name = '%s' ";
+
+ CqlResult result = client.execute_cql3_query(
+ ByteBufferUtil.bytes(String.format(query, keyspace, column_family)),
+ Compression.NONE,
+ ConsistencyLevel.ONE);
+
+ if (result == null || result.rows == null || result.rows.isEmpty())
+ return null;
+
+ Iterator<CqlRow> iteraRow = result.rows.iterator();
+ List<ColumnDef> keys = new ArrayList<ColumnDef>();
+ if (iteraRow.hasNext())
+ {
+ CqlRow cqlRow = iteraRow.next();
+ String name = ByteBufferUtil.string(cqlRow.columns.get(4).value);
+ logger.debug("Found ksDef name: {}", name);
+ String keyString = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(0).getValue()));
+
+ logger.debug("partition keys: {}", keyString);
+ List<String> keyNames = FBUtilities.fromJsonList(keyString);
+
+ Iterator<String> iterator = keyNames.iterator();
+ while (iterator.hasNext())
+ {
+ ColumnDef cDef = new ColumnDef();
+ cDef.name = ByteBufferUtil.bytes(iterator.next());
+ keys.add(cDef);
+ }
+ // classic thrift tables
+ if (keys.size() == 0)
+ {
+ CFDefinition cfDefinition = getCfDefinition(keyspace, column_family, client);
+ for (ColumnIdentifier column : cfDefinition.keys.keySet())
+ {
+ String key = column.toString();
+ logger.debug("name: {} ", key);
+ ColumnDef cDef = new ColumnDef();
+ cDef.name = ByteBufferUtil.bytes(key);
+ keys.add(cDef);
+ }
+ for (ColumnIdentifier column : cfDefinition.columns.keySet())
+ {
+ String key = column.toString();
+ logger.debug("name: {} ", key);
+ ColumnDef cDef = new ColumnDef();
+ cDef.name = ByteBufferUtil.bytes(key);
+ keys.add(cDef);
+ }
+ }
+
+ keyString = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(1).getValue()));
+
+ logger.debug("cluster keys: {}", keyString);
+ keyNames = FBUtilities.fromJsonList(keyString);
+
+ iterator = keyNames.iterator();
+ while (iterator.hasNext())
+ {
+ ColumnDef cDef = new ColumnDef();
+ cDef.name = ByteBufferUtil.bytes(iterator.next());
+ keys.add(cDef);
+ }
+
+ String validator = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(2).getValue()));
+ logger.debug("row key validator: {}", validator);
+ AbstractType<?> keyValidator = parseType(validator);
+
+ Iterator<ColumnDef> keyItera = keys.iterator();
+ if (keyValidator instanceof CompositeType)
+ {
+ Iterator<AbstractType<?>> typeItera = ((CompositeType) keyValidator).types.iterator();
+ while (typeItera.hasNext())
+ keyItera.next().validation_class = typeItera.next().toString();
+ }
+ else
+ keyItera.next().validation_class = keyValidator.toString();
+
+ validator = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(3).getValue()));
+ logger.debug("cluster key validator: {}", validator);
+
+ if (keyItera.hasNext() && validator != null && !validator.isEmpty())
+ {
+ AbstractType<?> clusterKeyValidator = parseType(validator);
+
+ if (clusterKeyValidator instanceof CompositeType)
+ {
+ Iterator<AbstractType<?>> typeItera = ((CompositeType) clusterKeyValidator).types.iterator();
+ while (keyItera.hasNext())
+ keyItera.next().validation_class = typeItera.next().toString();
+ }
+ else
+ keyItera.next().validation_class = clusterKeyValidator.toString();
+ }
+
+ // compact value_alias column
+ if (cqlRow.columns.get(5).value != null)
+ {
+ try
+ {
+ String compactValidator = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(6).getValue()));
+ logger.debug("default validator: {}", compactValidator);
+ AbstractType<?> defaultValidator = parseType(compactValidator);
+
+ ColumnDef cDef = new ColumnDef();
+ cDef.name = cqlRow.columns.get(5).value;
+ cDef.validation_class = defaultValidator.toString();
+ keys.add(cDef);
+ hasCompactValueAlias = true;
+ }
+ catch (Exception e)
+ {
+ // no compact column at value_alias
+ }
+ }
+
+ }
+ return keys;
+ }
+
/** cql://[username:password@]<keyspace>/<columnfamily>[?[page_size=<size>]
* [&columns=<col1,col2>][&output_query=<prepared_statement_query>][&where_clause=<clause>]
* [&split_size=<size>][&partitioner=<partitioner>][&use_secondary=true|false]] */
[3/6] git commit: Don't add extraneous field with CqlStorage Patch by
Sam Tunnicliffe, reviewed by Alex Liu for CASSANDRA-6071
Posted by br...@apache.org.
Don't add extraneous field with CqlStorage
Patch by Sam Tunnicliffe, reviewed by Alex Liu for CASSANDRA-6071
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/389ff55e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/389ff55e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/389ff55e
Branch: refs/heads/trunk
Commit: 389ff55e2bbc3046a6ad1aba85bdaab0e38dc6e8
Parents: 00e871d
Author: Brandon Williams <br...@apache.org>
Authored: Thu Sep 26 13:49:07 2013 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Thu Sep 26 13:49:07 2013 -0500
----------------------------------------------------------------------
.../hadoop/pig/AbstractCassandraStorage.java | 151 ++-----------------
.../cassandra/hadoop/pig/CassandraStorage.java | 2 +-
.../apache/cassandra/hadoop/pig/CqlStorage.java | 144 +++++++++++++++++-
3 files changed, 153 insertions(+), 144 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/389ff55e/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java b/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
index 50671da..ce92014 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
@@ -641,7 +641,7 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
NotFoundException;
/** get column meta data */
- protected List<ColumnDef> getColumnMeta(Cassandra.Client client, boolean cassandraStorage)
+ protected List<ColumnDef> getColumnMeta(Cassandra.Client client, boolean cassandraStorage, boolean includeCompactValueColumn)
throws InvalidRequestException,
UnavailableException,
TimedOutException,
@@ -666,9 +666,13 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
List<CqlRow> rows = result.rows;
List<ColumnDef> columnDefs = new ArrayList<ColumnDef>();
- if (!cassandraStorage && (rows == null || rows.isEmpty()))
+ if (rows == null || rows.isEmpty())
{
- // check classic thrift tables
+ // if CassandraStorage, just return the empty list
+ if (cassandraStorage)
+ return columnDefs;
+
+ // otherwise for CqlStorage, check metadata for classic thrift tables
CFDefinition cfDefinition = getCfDefinition(keyspace, column_family, client);
for (ColumnIdentifier column : cfDefinition.metadata.keySet())
{
@@ -680,7 +684,9 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
cDef.validation_class = type;
columnDefs.add(cDef);
}
- if (columnDefs.size() == 0)
+ // we may not need to include the value column for compact tables as we
+ // could have already processed it as schema_columnfamilies.value_alias
+ if (columnDefs.size() == 0 && includeCompactValueColumn)
{
String value = cfDefinition.value != null ? cfDefinition.value.toString() : null;
if ("value".equals(value))
@@ -693,8 +699,6 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
}
return columnDefs;
}
- else if (rows == null || rows.isEmpty())
- return columnDefs;
Iterator<CqlRow> iterator = rows.iterator();
while (iterator.hasNext())
@@ -711,138 +715,6 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
return columnDefs;
}
- /** get keys meta data */
- protected List<ColumnDef> getKeysMeta(Cassandra.Client client)
- throws Exception
- {
- String query = "SELECT key_aliases, " +
- " column_aliases, " +
- " key_validator, " +
- " comparator, " +
- " keyspace_name, " +
- " value_alias, " +
- " default_validator " +
- "FROM system.schema_columnfamilies " +
- "WHERE keyspace_name = '%s'" +
- " AND columnfamily_name = '%s' ";
-
- CqlResult result = client.execute_cql3_query(
- ByteBufferUtil.bytes(String.format(query, keyspace, column_family)),
- Compression.NONE,
- ConsistencyLevel.ONE);
-
- if (result == null || result.rows == null || result.rows.isEmpty())
- return null;
-
- Iterator<CqlRow> iteraRow = result.rows.iterator();
- List<ColumnDef> keys = new ArrayList<ColumnDef>();
- if (iteraRow.hasNext())
- {
- CqlRow cqlRow = iteraRow.next();
- String name = ByteBufferUtil.string(cqlRow.columns.get(4).value);
- logger.debug("Found ksDef name: {}", name);
- String keyString = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(0).getValue()));
-
- logger.debug("partition keys: {}", keyString);
- List<String> keyNames = FBUtilities.fromJsonList(keyString);
-
- Iterator<String> iterator = keyNames.iterator();
- while (iterator.hasNext())
- {
- ColumnDef cDef = new ColumnDef();
- cDef.name = ByteBufferUtil.bytes(iterator.next());
- keys.add(cDef);
- }
- // classic thrift tables
- if (keys.size() == 0)
- {
- CFDefinition cfDefinition = getCfDefinition(keyspace, column_family, client);
- for (ColumnIdentifier column : cfDefinition.keys.keySet())
- {
- String key = column.toString();
- logger.debug("name: {} ", key);
- ColumnDef cDef = new ColumnDef();
- cDef.name = ByteBufferUtil.bytes(key);
- keys.add(cDef);
- }
- for (ColumnIdentifier column : cfDefinition.columns.keySet())
- {
- String key = column.toString();
- logger.debug("name: {} ", key);
- ColumnDef cDef = new ColumnDef();
- cDef.name = ByteBufferUtil.bytes(key);
- keys.add(cDef);
- }
- }
-
- keyString = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(1).getValue()));
-
- logger.debug("cluster keys: {}", keyString);
- keyNames = FBUtilities.fromJsonList(keyString);
-
- iterator = keyNames.iterator();
- while (iterator.hasNext())
- {
- ColumnDef cDef = new ColumnDef();
- cDef.name = ByteBufferUtil.bytes(iterator.next());
- keys.add(cDef);
- }
-
- String validator = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(2).getValue()));
- logger.debug("row key validator: {}", validator);
- AbstractType<?> keyValidator = parseType(validator);
-
- Iterator<ColumnDef> keyItera = keys.iterator();
- if (keyValidator instanceof CompositeType)
- {
- Iterator<AbstractType<?>> typeItera = ((CompositeType) keyValidator).types.iterator();
- while (typeItera.hasNext())
- keyItera.next().validation_class = typeItera.next().toString();
- }
- else
- keyItera.next().validation_class = keyValidator.toString();
-
- validator = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(3).getValue()));
- logger.debug("cluster key validator: {}", validator);
-
- if (keyItera.hasNext() && validator != null && !validator.isEmpty())
- {
- AbstractType<?> clusterKeyValidator = parseType(validator);
-
- if (clusterKeyValidator instanceof CompositeType)
- {
- Iterator<AbstractType<?>> typeItera = ((CompositeType) clusterKeyValidator).types.iterator();
- while (keyItera.hasNext())
- keyItera.next().validation_class = typeItera.next().toString();
- }
- else
- keyItera.next().validation_class = clusterKeyValidator.toString();
- }
-
- // compact value_alias column
- if (cqlRow.columns.get(5).value != null)
- {
- try
- {
- String compactValidator = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(6).getValue()));
- logger.debug("default validator: {}", compactValidator);
- AbstractType<?> defaultValidator = parseType(compactValidator);
-
- ColumnDef cDef = new ColumnDef();
- cDef.name = cqlRow.columns.get(5).value;
- cDef.validation_class = defaultValidator.toString();
- keys.add(cDef);
- }
- catch (Exception e)
- {
- // no compact column at value_alias
- }
- }
-
- }
- return keys;
- }
-
/** get index type from string */
protected IndexType getIndexType(String type)
{
@@ -884,9 +756,8 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
return indexes;
}
-
/** get CFDefinition of a column family */
- private CFDefinition getCfDefinition(String ks, String cf, Cassandra.Client client)
+ protected CFDefinition getCfDefinition(String ks, String cf, Cassandra.Client client)
throws NotFoundException,
InvalidRequestException,
TException,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/389ff55e/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
index 8cf06f2..09171a0 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
@@ -711,7 +711,7 @@ public class CassandraStorage extends AbstractCassandraStorage
if (cql3Table)
return new ArrayList<ColumnDef>();
- return getColumnMeta(client, true);
+ return getColumnMeta(client, true, true);
}
/** convert key to a tuple */
http://git-wip-us.apache.org/repos/asf/cassandra/blob/389ff55e/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
index 7780ca9..79abc2c 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
@@ -23,6 +23,8 @@ import java.nio.charset.CharacterCodingException;
import java.util.*;
+import org.apache.cassandra.cql3.CFDefinition;
+import org.apache.cassandra.cql3.ColumnIdentifier;
import org.apache.cassandra.db.IColumn;
import org.apache.cassandra.db.Column;
import org.apache.cassandra.db.marshal.*;
@@ -31,6 +33,8 @@ import org.apache.cassandra.hadoop.*;
import org.apache.cassandra.hadoop.cql3.CqlConfigHelper;
import org.apache.cassandra.thrift.*;
import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+
import org.apache.hadoop.mapreduce.*;
import org.apache.pig.Expression;
import org.apache.pig.Expression.OpType;
@@ -61,7 +65,8 @@ public class CqlStorage extends AbstractCassandraStorage
private String columns;
private String outputQuery;
private String whereClause;
-
+ private boolean hasCompactValueAlias = false;
+
public CqlStorage()
{
this(1000);
@@ -450,7 +455,7 @@ public class CqlStorage extends AbstractCassandraStorage
}
// get other columns
- List<ColumnDef> columns = getColumnMeta(client, false);
+ List<ColumnDef> columns = getColumnMeta(client, false, !hasCompactValueAlias);
// combine all columns in a list
if (keyColumns != null && columns != null)
@@ -458,7 +463,140 @@ public class CqlStorage extends AbstractCassandraStorage
return keyColumns;
}
-
+
+ /** get keys meta data */
+ protected List<ColumnDef> getKeysMeta(Cassandra.Client client)
+ throws Exception
+ {
+ String query = "SELECT key_aliases, " +
+ " column_aliases, " +
+ " key_validator, " +
+ " comparator, " +
+ " keyspace_name, " +
+ " value_alias, " +
+ " default_validator " +
+ "FROM system.schema_columnfamilies " +
+ "WHERE keyspace_name = '%s'" +
+ " AND columnfamily_name = '%s' ";
+
+ CqlResult result = client.execute_cql3_query(
+ ByteBufferUtil.bytes(String.format(query, keyspace, column_family)),
+ Compression.NONE,
+ ConsistencyLevel.ONE);
+
+ if (result == null || result.rows == null || result.rows.isEmpty())
+ return null;
+
+ Iterator<CqlRow> iteraRow = result.rows.iterator();
+ List<ColumnDef> keys = new ArrayList<ColumnDef>();
+ if (iteraRow.hasNext())
+ {
+ CqlRow cqlRow = iteraRow.next();
+ String name = ByteBufferUtil.string(cqlRow.columns.get(4).value);
+ logger.debug("Found ksDef name: {}", name);
+ String keyString = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(0).getValue()));
+
+ logger.debug("partition keys: {}", keyString);
+ List<String> keyNames = FBUtilities.fromJsonList(keyString);
+
+ Iterator<String> iterator = keyNames.iterator();
+ while (iterator.hasNext())
+ {
+ ColumnDef cDef = new ColumnDef();
+ cDef.name = ByteBufferUtil.bytes(iterator.next());
+ keys.add(cDef);
+ }
+ // classic thrift tables
+ if (keys.size() == 0)
+ {
+ CFDefinition cfDefinition = getCfDefinition(keyspace, column_family, client);
+ for (ColumnIdentifier column : cfDefinition.keys.keySet())
+ {
+ String key = column.toString();
+ logger.debug("name: {} ", key);
+ ColumnDef cDef = new ColumnDef();
+ cDef.name = ByteBufferUtil.bytes(key);
+ keys.add(cDef);
+ }
+ for (ColumnIdentifier column : cfDefinition.columns.keySet())
+ {
+ String key = column.toString();
+ logger.debug("name: {} ", key);
+ ColumnDef cDef = new ColumnDef();
+ cDef.name = ByteBufferUtil.bytes(key);
+ keys.add(cDef);
+ }
+ }
+
+ keyString = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(1).getValue()));
+
+ logger.debug("cluster keys: {}", keyString);
+ keyNames = FBUtilities.fromJsonList(keyString);
+
+ iterator = keyNames.iterator();
+ while (iterator.hasNext())
+ {
+ ColumnDef cDef = new ColumnDef();
+ cDef.name = ByteBufferUtil.bytes(iterator.next());
+ keys.add(cDef);
+ }
+
+ String validator = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(2).getValue()));
+ logger.debug("row key validator: {}", validator);
+ AbstractType<?> keyValidator = parseType(validator);
+
+ Iterator<ColumnDef> keyItera = keys.iterator();
+ if (keyValidator instanceof CompositeType)
+ {
+ Iterator<AbstractType<?>> typeItera = ((CompositeType) keyValidator).types.iterator();
+ while (typeItera.hasNext())
+ keyItera.next().validation_class = typeItera.next().toString();
+ }
+ else
+ keyItera.next().validation_class = keyValidator.toString();
+
+ validator = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(3).getValue()));
+ logger.debug("cluster key validator: {}", validator);
+
+ if (keyItera.hasNext() && validator != null && !validator.isEmpty())
+ {
+ AbstractType<?> clusterKeyValidator = parseType(validator);
+
+ if (clusterKeyValidator instanceof CompositeType)
+ {
+ Iterator<AbstractType<?>> typeItera = ((CompositeType) clusterKeyValidator).types.iterator();
+ while (keyItera.hasNext())
+ keyItera.next().validation_class = typeItera.next().toString();
+ }
+ else
+ keyItera.next().validation_class = clusterKeyValidator.toString();
+ }
+
+ // compact value_alias column
+ if (cqlRow.columns.get(5).value != null)
+ {
+ try
+ {
+ String compactValidator = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(6).getValue()));
+ logger.debug("default validator: {}", compactValidator);
+ AbstractType<?> defaultValidator = parseType(compactValidator);
+
+ ColumnDef cDef = new ColumnDef();
+ cDef.name = cqlRow.columns.get(5).value;
+ cDef.validation_class = defaultValidator.toString();
+ keys.add(cDef);
+ hasCompactValueAlias = true;
+ }
+ catch (Exception e)
+ {
+ // no compact column at value_alias
+ }
+ }
+
+ }
+ return keys;
+ }
+
/** cql://[username:password@]<keyspace>/<columnfamily>[?[page_size=<size>]
* [&columns=<col1,col2>][&output_query=<prepared_statement_query>][&where_clause=<clause>]
* [&split_size=<size>][&partitioner=<partitioner>][&use_secondary=true|false]] */
[6/6] git commit: Merge branch 'cassandra-2.0' into trunk
Posted by br...@apache.org.
Merge branch 'cassandra-2.0' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/246fefab
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/246fefab
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/246fefab
Branch: refs/heads/trunk
Commit: 246fefabfbae20b6c17a1898bb838d7463ccad0a
Parents: 2c7b61b 006eec4
Author: Brandon Williams <br...@apache.org>
Authored: Thu Sep 26 13:53:58 2013 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Thu Sep 26 13:53:58 2013 -0500
----------------------------------------------------------------------
.../hadoop/pig/AbstractCassandraStorage.java | 151 ++-----------------
.../cassandra/hadoop/pig/CassandraStorage.java | 2 +-
.../apache/cassandra/hadoop/pig/CqlStorage.java | 144 +++++++++++++++++-
3 files changed, 153 insertions(+), 144 deletions(-)
----------------------------------------------------------------------
[4/6] git commit: Merge branch 'cassandra-1.2' into cassandra-2.0
Posted by br...@apache.org.
Merge branch 'cassandra-1.2' into cassandra-2.0
Conflicts:
src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/006eec4a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/006eec4a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/006eec4a
Branch: refs/heads/trunk
Commit: 006eec4a5dc76d79f3147ab1e1e78e17e304a88c
Parents: d493030 389ff55
Author: Brandon Williams <br...@apache.org>
Authored: Thu Sep 26 13:53:46 2013 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Thu Sep 26 13:53:46 2013 -0500
----------------------------------------------------------------------
.../hadoop/pig/AbstractCassandraStorage.java | 151 ++-----------------
.../cassandra/hadoop/pig/CassandraStorage.java | 2 +-
.../apache/cassandra/hadoop/pig/CqlStorage.java | 144 +++++++++++++++++-
3 files changed, 153 insertions(+), 144 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/006eec4a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/006eec4a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
index e66f585,09171a0..c9afff0
--- a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
@@@ -700,12 -698,20 +700,12 @@@ public class CassandraStorage extends A
/** get a list of column for the column family */
protected List<ColumnDef> getColumnMetadata(Cassandra.Client client, boolean cql3Table)
- throws InvalidRequestException,
- UnavailableException,
- TimedOutException,
- SchemaDisagreementException,
- TException,
- CharacterCodingException,
- org.apache.cassandra.exceptions.InvalidRequestException,
- ConfigurationException,
- NotFoundException
+ throws TException, CharacterCodingException, InvalidRequestException, ConfigurationException
{
if (cql3Table)
- return new ArrayList<ColumnDef>();
+ return new ArrayList<>();
- return getColumnMeta(client, true);
+ return getColumnMeta(client, true, true);
}
/** convert key to a tuple */
http://git-wip-us.apache.org/repos/asf/cassandra/blob/006eec4a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
index 86fe338,79abc2c..b96d10e
--- a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
@@@ -23,6 -23,9 +23,8 @@@ import java.nio.charset.CharacterCoding
import java.util.*;
+ import org.apache.cassandra.cql3.CFDefinition;
+ import org.apache.cassandra.cql3.ColumnIdentifier;
-import org.apache.cassandra.db.IColumn;
import org.apache.cassandra.db.Column;
import org.apache.cassandra.db.marshal.*;
import org.apache.cassandra.exceptions.ConfigurationException;
[2/6] git commit: Don't add extraneous field with CqlStorage Patch by
Sam Tunnicliffe, reviewed by Alex Liu for CASSANDRA-6071
Posted by br...@apache.org.
Don't add extraneous field with CqlStorage
Patch by Sam Tunnicliffe, reviewed by Alex Liu for CASSANDRA-6071
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/389ff55e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/389ff55e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/389ff55e
Branch: refs/heads/cassandra-2.0
Commit: 389ff55e2bbc3046a6ad1aba85bdaab0e38dc6e8
Parents: 00e871d
Author: Brandon Williams <br...@apache.org>
Authored: Thu Sep 26 13:49:07 2013 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Thu Sep 26 13:49:07 2013 -0500
----------------------------------------------------------------------
.../hadoop/pig/AbstractCassandraStorage.java | 151 ++-----------------
.../cassandra/hadoop/pig/CassandraStorage.java | 2 +-
.../apache/cassandra/hadoop/pig/CqlStorage.java | 144 +++++++++++++++++-
3 files changed, 153 insertions(+), 144 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/389ff55e/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java b/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
index 50671da..ce92014 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
@@ -641,7 +641,7 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
NotFoundException;
/** get column meta data */
- protected List<ColumnDef> getColumnMeta(Cassandra.Client client, boolean cassandraStorage)
+ protected List<ColumnDef> getColumnMeta(Cassandra.Client client, boolean cassandraStorage, boolean includeCompactValueColumn)
throws InvalidRequestException,
UnavailableException,
TimedOutException,
@@ -666,9 +666,13 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
List<CqlRow> rows = result.rows;
List<ColumnDef> columnDefs = new ArrayList<ColumnDef>();
- if (!cassandraStorage && (rows == null || rows.isEmpty()))
+ if (rows == null || rows.isEmpty())
{
- // check classic thrift tables
+ // if CassandraStorage, just return the empty list
+ if (cassandraStorage)
+ return columnDefs;
+
+ // otherwise for CqlStorage, check metadata for classic thrift tables
CFDefinition cfDefinition = getCfDefinition(keyspace, column_family, client);
for (ColumnIdentifier column : cfDefinition.metadata.keySet())
{
@@ -680,7 +684,9 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
cDef.validation_class = type;
columnDefs.add(cDef);
}
- if (columnDefs.size() == 0)
+ // we may not need to include the value column for compact tables as we
+ // could have already processed it as schema_columnfamilies.value_alias
+ if (columnDefs.size() == 0 && includeCompactValueColumn)
{
String value = cfDefinition.value != null ? cfDefinition.value.toString() : null;
if ("value".equals(value))
@@ -693,8 +699,6 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
}
return columnDefs;
}
- else if (rows == null || rows.isEmpty())
- return columnDefs;
Iterator<CqlRow> iterator = rows.iterator();
while (iterator.hasNext())
@@ -711,138 +715,6 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
return columnDefs;
}
- /** get keys meta data */
- protected List<ColumnDef> getKeysMeta(Cassandra.Client client)
- throws Exception
- {
- String query = "SELECT key_aliases, " +
- " column_aliases, " +
- " key_validator, " +
- " comparator, " +
- " keyspace_name, " +
- " value_alias, " +
- " default_validator " +
- "FROM system.schema_columnfamilies " +
- "WHERE keyspace_name = '%s'" +
- " AND columnfamily_name = '%s' ";
-
- CqlResult result = client.execute_cql3_query(
- ByteBufferUtil.bytes(String.format(query, keyspace, column_family)),
- Compression.NONE,
- ConsistencyLevel.ONE);
-
- if (result == null || result.rows == null || result.rows.isEmpty())
- return null;
-
- Iterator<CqlRow> iteraRow = result.rows.iterator();
- List<ColumnDef> keys = new ArrayList<ColumnDef>();
- if (iteraRow.hasNext())
- {
- CqlRow cqlRow = iteraRow.next();
- String name = ByteBufferUtil.string(cqlRow.columns.get(4).value);
- logger.debug("Found ksDef name: {}", name);
- String keyString = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(0).getValue()));
-
- logger.debug("partition keys: {}", keyString);
- List<String> keyNames = FBUtilities.fromJsonList(keyString);
-
- Iterator<String> iterator = keyNames.iterator();
- while (iterator.hasNext())
- {
- ColumnDef cDef = new ColumnDef();
- cDef.name = ByteBufferUtil.bytes(iterator.next());
- keys.add(cDef);
- }
- // classic thrift tables
- if (keys.size() == 0)
- {
- CFDefinition cfDefinition = getCfDefinition(keyspace, column_family, client);
- for (ColumnIdentifier column : cfDefinition.keys.keySet())
- {
- String key = column.toString();
- logger.debug("name: {} ", key);
- ColumnDef cDef = new ColumnDef();
- cDef.name = ByteBufferUtil.bytes(key);
- keys.add(cDef);
- }
- for (ColumnIdentifier column : cfDefinition.columns.keySet())
- {
- String key = column.toString();
- logger.debug("name: {} ", key);
- ColumnDef cDef = new ColumnDef();
- cDef.name = ByteBufferUtil.bytes(key);
- keys.add(cDef);
- }
- }
-
- keyString = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(1).getValue()));
-
- logger.debug("cluster keys: {}", keyString);
- keyNames = FBUtilities.fromJsonList(keyString);
-
- iterator = keyNames.iterator();
- while (iterator.hasNext())
- {
- ColumnDef cDef = new ColumnDef();
- cDef.name = ByteBufferUtil.bytes(iterator.next());
- keys.add(cDef);
- }
-
- String validator = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(2).getValue()));
- logger.debug("row key validator: {}", validator);
- AbstractType<?> keyValidator = parseType(validator);
-
- Iterator<ColumnDef> keyItera = keys.iterator();
- if (keyValidator instanceof CompositeType)
- {
- Iterator<AbstractType<?>> typeItera = ((CompositeType) keyValidator).types.iterator();
- while (typeItera.hasNext())
- keyItera.next().validation_class = typeItera.next().toString();
- }
- else
- keyItera.next().validation_class = keyValidator.toString();
-
- validator = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(3).getValue()));
- logger.debug("cluster key validator: {}", validator);
-
- if (keyItera.hasNext() && validator != null && !validator.isEmpty())
- {
- AbstractType<?> clusterKeyValidator = parseType(validator);
-
- if (clusterKeyValidator instanceof CompositeType)
- {
- Iterator<AbstractType<?>> typeItera = ((CompositeType) clusterKeyValidator).types.iterator();
- while (keyItera.hasNext())
- keyItera.next().validation_class = typeItera.next().toString();
- }
- else
- keyItera.next().validation_class = clusterKeyValidator.toString();
- }
-
- // compact value_alias column
- if (cqlRow.columns.get(5).value != null)
- {
- try
- {
- String compactValidator = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(6).getValue()));
- logger.debug("default validator: {}", compactValidator);
- AbstractType<?> defaultValidator = parseType(compactValidator);
-
- ColumnDef cDef = new ColumnDef();
- cDef.name = cqlRow.columns.get(5).value;
- cDef.validation_class = defaultValidator.toString();
- keys.add(cDef);
- }
- catch (Exception e)
- {
- // no compact column at value_alias
- }
- }
-
- }
- return keys;
- }
-
/** get index type from string */
protected IndexType getIndexType(String type)
{
@@ -884,9 +756,8 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
return indexes;
}
-
/** get CFDefinition of a column family */
- private CFDefinition getCfDefinition(String ks, String cf, Cassandra.Client client)
+ protected CFDefinition getCfDefinition(String ks, String cf, Cassandra.Client client)
throws NotFoundException,
InvalidRequestException,
TException,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/389ff55e/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
index 8cf06f2..09171a0 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
@@ -711,7 +711,7 @@ public class CassandraStorage extends AbstractCassandraStorage
if (cql3Table)
return new ArrayList<ColumnDef>();
- return getColumnMeta(client, true);
+ return getColumnMeta(client, true, true);
}
/** convert key to a tuple */
http://git-wip-us.apache.org/repos/asf/cassandra/blob/389ff55e/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
index 7780ca9..79abc2c 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
@@ -23,6 +23,8 @@ import java.nio.charset.CharacterCodingException;
import java.util.*;
+import org.apache.cassandra.cql3.CFDefinition;
+import org.apache.cassandra.cql3.ColumnIdentifier;
import org.apache.cassandra.db.IColumn;
import org.apache.cassandra.db.Column;
import org.apache.cassandra.db.marshal.*;
@@ -31,6 +33,8 @@ import org.apache.cassandra.hadoop.*;
import org.apache.cassandra.hadoop.cql3.CqlConfigHelper;
import org.apache.cassandra.thrift.*;
import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+
import org.apache.hadoop.mapreduce.*;
import org.apache.pig.Expression;
import org.apache.pig.Expression.OpType;
@@ -61,7 +65,8 @@ public class CqlStorage extends AbstractCassandraStorage
private String columns;
private String outputQuery;
private String whereClause;
-
+ private boolean hasCompactValueAlias = false;
+
public CqlStorage()
{
this(1000);
@@ -450,7 +455,7 @@ public class CqlStorage extends AbstractCassandraStorage
}
// get other columns
- List<ColumnDef> columns = getColumnMeta(client, false);
+ List<ColumnDef> columns = getColumnMeta(client, false, !hasCompactValueAlias);
// combine all columns in a list
if (keyColumns != null && columns != null)
@@ -458,7 +463,140 @@ public class CqlStorage extends AbstractCassandraStorage
return keyColumns;
}
-
+
+ /** get keys meta data */
+ protected List<ColumnDef> getKeysMeta(Cassandra.Client client)
+ throws Exception
+ {
+ String query = "SELECT key_aliases, " +
+ " column_aliases, " +
+ " key_validator, " +
+ " comparator, " +
+ " keyspace_name, " +
+ " value_alias, " +
+ " default_validator " +
+ "FROM system.schema_columnfamilies " +
+ "WHERE keyspace_name = '%s'" +
+ " AND columnfamily_name = '%s' ";
+
+ CqlResult result = client.execute_cql3_query(
+ ByteBufferUtil.bytes(String.format(query, keyspace, column_family)),
+ Compression.NONE,
+ ConsistencyLevel.ONE);
+
+ if (result == null || result.rows == null || result.rows.isEmpty())
+ return null;
+
+ Iterator<CqlRow> iteraRow = result.rows.iterator();
+ List<ColumnDef> keys = new ArrayList<ColumnDef>();
+ if (iteraRow.hasNext())
+ {
+ CqlRow cqlRow = iteraRow.next();
+ String name = ByteBufferUtil.string(cqlRow.columns.get(4).value);
+ logger.debug("Found ksDef name: {}", name);
+ String keyString = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(0).getValue()));
+
+ logger.debug("partition keys: {}", keyString);
+ List<String> keyNames = FBUtilities.fromJsonList(keyString);
+
+ Iterator<String> iterator = keyNames.iterator();
+ while (iterator.hasNext())
+ {
+ ColumnDef cDef = new ColumnDef();
+ cDef.name = ByteBufferUtil.bytes(iterator.next());
+ keys.add(cDef);
+ }
+ // classic thrift tables
+ if (keys.size() == 0)
+ {
+ CFDefinition cfDefinition = getCfDefinition(keyspace, column_family, client);
+ for (ColumnIdentifier column : cfDefinition.keys.keySet())
+ {
+ String key = column.toString();
+ logger.debug("name: {} ", key);
+ ColumnDef cDef = new ColumnDef();
+ cDef.name = ByteBufferUtil.bytes(key);
+ keys.add(cDef);
+ }
+ for (ColumnIdentifier column : cfDefinition.columns.keySet())
+ {
+ String key = column.toString();
+ logger.debug("name: {} ", key);
+ ColumnDef cDef = new ColumnDef();
+ cDef.name = ByteBufferUtil.bytes(key);
+ keys.add(cDef);
+ }
+ }
+
+ keyString = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(1).getValue()));
+
+ logger.debug("cluster keys: {}", keyString);
+ keyNames = FBUtilities.fromJsonList(keyString);
+
+ iterator = keyNames.iterator();
+ while (iterator.hasNext())
+ {
+ ColumnDef cDef = new ColumnDef();
+ cDef.name = ByteBufferUtil.bytes(iterator.next());
+ keys.add(cDef);
+ }
+
+ String validator = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(2).getValue()));
+ logger.debug("row key validator: {}", validator);
+ AbstractType<?> keyValidator = parseType(validator);
+
+ Iterator<ColumnDef> keyItera = keys.iterator();
+ if (keyValidator instanceof CompositeType)
+ {
+ Iterator<AbstractType<?>> typeItera = ((CompositeType) keyValidator).types.iterator();
+ while (typeItera.hasNext())
+ keyItera.next().validation_class = typeItera.next().toString();
+ }
+ else
+ keyItera.next().validation_class = keyValidator.toString();
+
+ validator = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(3).getValue()));
+ logger.debug("cluster key validator: {}", validator);
+
+ if (keyItera.hasNext() && validator != null && !validator.isEmpty())
+ {
+ AbstractType<?> clusterKeyValidator = parseType(validator);
+
+ if (clusterKeyValidator instanceof CompositeType)
+ {
+ Iterator<AbstractType<?>> typeItera = ((CompositeType) clusterKeyValidator).types.iterator();
+ while (keyItera.hasNext())
+ keyItera.next().validation_class = typeItera.next().toString();
+ }
+ else
+ keyItera.next().validation_class = clusterKeyValidator.toString();
+ }
+
+ // compact value_alias column
+ if (cqlRow.columns.get(5).value != null)
+ {
+ try
+ {
+ String compactValidator = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(6).getValue()));
+ logger.debug("default validator: {}", compactValidator);
+ AbstractType<?> defaultValidator = parseType(compactValidator);
+
+ ColumnDef cDef = new ColumnDef();
+ cDef.name = cqlRow.columns.get(5).value;
+ cDef.validation_class = defaultValidator.toString();
+ keys.add(cDef);
+ hasCompactValueAlias = true;
+ }
+ catch (Exception e)
+ {
+ // no compact column at value_alias
+ }
+ }
+
+ }
+ return keys;
+ }
+
/** cql://[username:password@]<keyspace>/<columnfamily>[?[page_size=<size>]
* [&columns=<col1,col2>][&output_query=<prepared_statement_query>][&where_clause=<clause>]
* [&split_size=<size>][&partitioner=<partitioner>][&use_secondary=true|false]] */
[5/6] git commit: Merge branch 'cassandra-1.2' into cassandra-2.0
Posted by br...@apache.org.
Merge branch 'cassandra-1.2' into cassandra-2.0
Conflicts:
src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/006eec4a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/006eec4a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/006eec4a
Branch: refs/heads/cassandra-2.0
Commit: 006eec4a5dc76d79f3147ab1e1e78e17e304a88c
Parents: d493030 389ff55
Author: Brandon Williams <br...@apache.org>
Authored: Thu Sep 26 13:53:46 2013 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Thu Sep 26 13:53:46 2013 -0500
----------------------------------------------------------------------
.../hadoop/pig/AbstractCassandraStorage.java | 151 ++-----------------
.../cassandra/hadoop/pig/CassandraStorage.java | 2 +-
.../apache/cassandra/hadoop/pig/CqlStorage.java | 144 +++++++++++++++++-
3 files changed, 153 insertions(+), 144 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/006eec4a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/006eec4a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
index e66f585,09171a0..c9afff0
--- a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
@@@ -700,12 -698,20 +700,12 @@@ public class CassandraStorage extends A
/** get a list of column for the column family */
protected List<ColumnDef> getColumnMetadata(Cassandra.Client client, boolean cql3Table)
- throws InvalidRequestException,
- UnavailableException,
- TimedOutException,
- SchemaDisagreementException,
- TException,
- CharacterCodingException,
- org.apache.cassandra.exceptions.InvalidRequestException,
- ConfigurationException,
- NotFoundException
+ throws TException, CharacterCodingException, InvalidRequestException, ConfigurationException
{
if (cql3Table)
- return new ArrayList<ColumnDef>();
+ return new ArrayList<>();
- return getColumnMeta(client, true);
+ return getColumnMeta(client, true, true);
}
/** convert key to a tuple */
http://git-wip-us.apache.org/repos/asf/cassandra/blob/006eec4a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
index 86fe338,79abc2c..b96d10e
--- a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
@@@ -23,6 -23,9 +23,8 @@@ import java.nio.charset.CharacterCoding
import java.util.*;
+ import org.apache.cassandra.cql3.CFDefinition;
+ import org.apache.cassandra.cql3.ColumnIdentifier;
-import org.apache.cassandra.db.IColumn;
import org.apache.cassandra.db.Column;
import org.apache.cassandra.db.marshal.*;
import org.apache.cassandra.exceptions.ConfigurationException;