You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2015/11/10 15:43:13 UTC
[2/5] cassandra git commit: Merge branch 'cassandra-2.2' into
cassandra-3.0
Merge branch 'cassandra-2.2' into cassandra-3.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e42164b6
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e42164b6
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e42164b6
Branch: refs/heads/trunk
Commit: e42164b63baf7c86ac64078f68e61097c4741711
Parents: 1e64a9d 177f607
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Tue Nov 10 14:39:16 2015 +0000
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Tue Nov 10 14:39:16 2015 +0000
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../cassandra/hadoop/cql3/CqlRecordWriter.java | 94 ++++++++++----------
2 files changed, 50 insertions(+), 46 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e42164b6/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 33adefb,81ceb25..24e42c0
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,45 -1,5 +1,47 @@@
-2.2.4
+3.0.1
++Merged from 2.2:
+ * (Hadoop) ensure that Cluster instances are always closed (CASSANDRA-10058)
+Merged from 2.1:
+ * Reject counter writes in CQLSSTableWriter (CASSANDRA-10258)
+ * Remove superfluous COUNTER_MUTATION stage mapping (CASSANDRA-10605)
+
+
+3.0
+ * Fix AssertionError while flushing memtable due to materialized views
+ incorrectly inserting empty rows (CASSANDRA-10614)
+ * Store UDA initcond as CQL literal in the schema table, instead of a blob (CASSANDRA-10650)
+ * Don't use -1 for the position of partition key in schema (CASSANDRA-10491)
+ * Fix distinct queries in mixed version cluster (CASSANDRA-10573)
+ * Skip sstable on clustering in names query (CASSANDRA-10571)
+ * Remove value skipping as it breaks read-repair (CASSANDRA-10655)
+ * Fix bootstrapping with MVs (CASSANDRA-10621)
+ * Make sure EACH_QUORUM reads are using NTS (CASSANDRA-10584)
+ * Fix MV replica filtering for non-NetworkTopologyStrategy (CASSANDRA-10634)
+ * (Hadoop) fix CIF describeSplits() not handling 0 size estimates (CASSANDRA-10600)
+ * Fix reading of legacy sstables (CASSANDRA-10590)
+ * Use CQL type names in schema metadata tables (CASSANDRA-10365)
+ * Guard batchlog replay against integer division by zero (CASSANDRA-9223)
+ * Fix bug when adding a column to thrift with the same name than a primary key (CASSANDRA-10608)
+ * Add client address argument to IAuthenticator::newSaslNegotiator (CASSANDRA-8068)
+ * Fix implementation of LegacyLayout.LegacyBoundComparator (CASSANDRA-10602)
+ * Don't use 'names query' read path for counters (CASSANDRA-10572)
+ * Fix backward compatibility for counters (CASSANDRA-10470)
+ * Remove memory_allocator paramter from cassandra.yaml (CASSANDRA-10581,10628)
+ * Execute the metadata reload task of all registered indexes on CFS::reload (CASSANDRA-10604)
+ * Fix thrift cas operations with defined columns (CASSANDRA-10576)
+ * Fix PartitionUpdate.operationCount()for updates with static column operations (CASSANDRA-10606)
+ * Fix thrift get() queries with defined columns (CASSANDRA-10586)
+ * Fix marking of indexes as built and removed (CASSANDRA-10601)
+ * Skip initialization of non-registered 2i instances, remove Index::getIndexName (CASSANDRA-10595)
+ * Fix batches on multiple tables (CASSANDRA-10554)
+ * Ensure compaction options are validated when updating KeyspaceMetadata (CASSANDRA-10569)
+ * Flatten Iterator Transformation Hierarchy (CASSANDRA-9975)
+ * Remove token generator (CASSANDRA-5261)
+ * RolesCache should not be created for any authenticator that does not requireAuthentication (CASSANDRA-10562)
+ * Fix LogTransaction checking only a single directory for files (CASSANDRA-10421)
+ * Fix handling of range tombstones when reading old format sstables (CASSANDRA-10360)
+ * Aggregate with Initial Condition fails with C* 3.0 (CASSANDRA-10367)
+Merged from 2.2:
* (cqlsh) show partial trace if incomplete after max_trace_wait (CASSANDRA-7645)
* Use most up-to-date version of schema for system tables (CASSANDRA-10652)
* Deprecate memory_allocator in cassandra.yaml (CASSANDRA-10581,10628)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e42164b6/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
index 6b4caa5,14e24fb..23beba3
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
@@@ -108,31 -109,29 +108,29 @@@ class CqlRecordWriter extends RecordWri
CqlRecordWriter(Configuration conf)
{
this.conf = conf;
- this.queueSize = conf.getInt(ColumnFamilyOutputFormat.QUEUE_SIZE, 32 * FBUtilities.getAvailableProcessors());
- batchThreshold = conf.getLong(ColumnFamilyOutputFormat.BATCH_THRESHOLD, 32);
+ this.queueSize = conf.getInt(CqlOutputFormat.QUEUE_SIZE, 32 * FBUtilities.getAvailableProcessors());
+ batchThreshold = conf.getLong(CqlOutputFormat.BATCH_THRESHOLD, 32);
this.clients = new HashMap<>();
- try
+ try (Cluster cluster = CqlConfigHelper.getOutputCluster(ConfigHelper.getOutputInitialAddress(conf), conf))
{
String keyspace = ConfigHelper.getOutputKeyspace(conf);
- try (Session client = CqlConfigHelper.getOutputCluster(ConfigHelper.getOutputInitialAddress(conf), conf).connect(keyspace))
+ Session client = cluster.connect(keyspace);
+ ringCache = new NativeRingCache(conf);
+ if (client != null)
{
- ringCache = new NativeRingCache(conf);
- if (client != null)
- {
- TableMetadata tableMetadata = client.getCluster().getMetadata().getKeyspace(client.getLoggedKeyspace()).getTable(ConfigHelper.getOutputColumnFamily(conf));
- clusterColumns = tableMetadata.getClusteringColumns();
- partitionKeyColumns = tableMetadata.getPartitionKey();
-
- String cqlQuery = CqlConfigHelper.getOutputCql(conf).trim();
- if (cqlQuery.toLowerCase().startsWith("insert"))
- throw new UnsupportedOperationException("INSERT with CqlRecordWriter is not supported, please use UPDATE/DELETE statement");
- cql = appendKeyWhereClauses(cqlQuery);
- }
- else
- {
- throw new IllegalArgumentException("Invalid configuration specified " + conf);
- }
+ TableMetadata tableMetadata = client.getCluster().getMetadata().getKeyspace(client.getLoggedKeyspace()).getTable(ConfigHelper.getOutputColumnFamily(conf));
+ clusterColumns = tableMetadata.getClusteringColumns();
+ partitionKeyColumns = tableMetadata.getPartitionKey();
+
+ String cqlQuery = CqlConfigHelper.getOutputCql(conf).trim();
+ if (cqlQuery.toLowerCase().startsWith("insert"))
+ throw new UnsupportedOperationException("INSERT with CqlRecordWriter is not supported, please use UPDATE/DELETE statement");
+ cql = appendKeyWhereClauses(cqlQuery);
+ }
+ else
+ {
+ throw new IllegalArgumentException("Invalid configuration specified " + conf);
}
}
catch (Exception e)
@@@ -298,34 -298,37 +297,37 @@@
while (true)
{
// send the mutation to the last-used endpoint. first time through, this will NPE harmlessly.
- try
+ if (session != null)
{
- int i = 0;
- PreparedStatement statement = preparedStatement(client);
- while (bindVariables != null)
+ try
{
- BoundStatement boundStatement = new BoundStatement(statement);
- for (int columnPosition = 0; columnPosition < bindVariables.size(); columnPosition++)
+ int i = 0;
+ PreparedStatement statement = preparedStatement(session);
+ while (bindVariables != null)
{
- boundStatement.setBytesUnsafe(columnPosition, bindVariables.get(columnPosition));
+ BoundStatement boundStatement = new BoundStatement(statement);
+ for (int columnPosition = 0; columnPosition < bindVariables.size(); columnPosition++)
+ {
+ boundStatement.setBytesUnsafe(columnPosition, bindVariables.get(columnPosition));
+ }
+ session.execute(boundStatement);
+ i++;
+
+ if (i >= batchThreshold)
+ break;
+ bindVariables = queue.poll();
}
- client.execute(boundStatement);
- i++;
-
- if (i >= batchThreshold)
- break;
- bindVariables = queue.poll();
+ break;
}
- break;
- }
- catch (Exception e)
- {
- closeInternal();
- if (!iter.hasNext())
+ catch (Exception e)
{
- lastException = new IOException(e);
- break outer;
+ closeInternal();
+ if (!iter.hasNext())
+ {
+ lastException = new IOException(e);
+ break outer;
+ }
- }
+ }
}
// attempt to connect to a different endpoint