You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2020/04/23 13:53:25 UTC
[cassandra] branch trunk updated: Fix CqlInputFormat regression
from the switch to system.size_estimates
This is an automated email from the ASF dual-hosted git repository.
aleksey pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new a8327eb Fix CqlInputFormat regression from the switch to system.size_estimates
a8327eb is described below
commit a8327eb8868c8d9d03c253a88509ce64d2ac227b
Author: David Capwell <dc...@gmail.com>
AuthorDate: Thu Mar 12 10:29:37 2020 -0700
Fix CqlInputFormat regression from the switch to system.size_estimates
patch by David Capwell; reviewed by Aleksey Yeschenko and Brandon
Williams for CASSANDRA-15637
---
CHANGES.txt | 1 +
.../apache/cassandra/db/SizeEstimatesRecorder.java | 48 +++-
.../org/apache/cassandra/db/SystemKeyspace.java | 79 +++++-
.../cassandra/hadoop/cql3/CqlClientHelper.java | 91 +++++++
.../cassandra/hadoop/cql3/CqlInputFormat.java | 298 ++++++++++++++++-----
.../LimitedLocalNodeFirstLocalBalancingPolicy.java | 18 +-
.../apache/cassandra/service/StorageService.java | 30 ++-
.../service/StorageServiceServerTest.java | 39 +++
8 files changed, 508 insertions(+), 96 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index d7e764c..ab8a7eb 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.0-alpha4
+ * Fix CqlInputFormat regression from the switch to system.size_estimates (CASSANDRA-15637)
* Allow sending Entire SSTables over SSL (CASSANDRA-15740)
* Fix CQLSH UTF-8 encoding issue for Python 2/3 compatibility (CASSANDRA-15739)
* Fix batch statement preparation when multiple tables and parameters are used (CASSANDRA-15730)
diff --git a/src/java/org/apache/cassandra/db/SizeEstimatesRecorder.java b/src/java/org/apache/cassandra/db/SizeEstimatesRecorder.java
index d61c297..fe38d64 100644
--- a/src/java/org/apache/cassandra/db/SizeEstimatesRecorder.java
+++ b/src/java/org/apache/cassandra/db/SizeEstimatesRecorder.java
@@ -70,12 +70,39 @@ public class SizeEstimatesRecorder extends SchemaChangeListener implements Runna
for (Keyspace keyspace : Keyspace.nonLocalStrategy())
{
- Collection<Range<Token>> localRanges = StorageService.instance.getPrimaryRangesForEndpoint(keyspace.getName(),
- FBUtilities.getBroadcastAddressAndPort());
+ // In tools the call to describe_splits_ex() used to be coupled with the call to describe_local_ring() so
+ // most access was for the local primary range; after creating the size_estimates table this was changed
+ // to be the primary range.
+ // In a multi-dc setup its not uncommon for the local ring to be offset by 1 for the next DC; example:
+ // DC1: [0, 10, 20, 30]
+ // DC2: [1, 11, 21, 31]
+ // DC3: [2, 12, 22, 32]
+ // When working with the primary ring we have:
+ // [0, 1, 2, 10, 11, 12, 20, 21, 22, 30, 31, 32]
+ // this then leads to primrary ranges with one token in it, which cause the estimates to be less useful.
+ // Since only one range was published some tools make this assumption; for this reason we can't publish
+ // all ranges (including the replica ranges) nor can we keep backwards compatability and publish primary
+ // range. If we publish multiple ranges downstream integrations may start to see duplicate data.
+ // See CASSANDRA-15637
+ Collection<Range<Token>> primaryRanges = StorageService.instance.getPrimaryRanges(keyspace.getName());
+ Collection<Range<Token>> localPrimaryRanges = StorageService.instance.getLocalPrimaryRange();
+ boolean rangesAreEqual = primaryRanges.equals(localPrimaryRanges);
for (ColumnFamilyStore table : keyspace.getColumnFamilyStores())
{
long start = System.nanoTime();
- recordSizeEstimates(table, localRanges);
+
+ // compute estimates for primary ranges for backwards compatability
+ Map<Range<Token>, Pair<Long, Long>> estimates = computeSizeEstimates(table, primaryRanges);
+ SystemKeyspace.updateSizeEstimates(table.metadata.keyspace, table.metadata.name, estimates);
+ SystemKeyspace.updateTableEstimates(table.metadata.keyspace, table.metadata.name, SystemKeyspace.TABLE_ESTIMATES_TYPE_PRIMARY, estimates);
+
+ if (!rangesAreEqual)
+ {
+ // compute estimate for local primary range
+ estimates = computeSizeEstimates(table, localPrimaryRanges);
+ }
+ SystemKeyspace.updateTableEstimates(table.metadata.keyspace, table.metadata.name, SystemKeyspace.TABLE_ESTIMATES_TYPE_LOCAL_PRIMARY, estimates);
+
long passed = System.nanoTime() - start;
if (logger.isTraceEnabled())
logger.trace("Spent {} milliseconds on estimating {}.{} size",
@@ -87,11 +114,11 @@ public class SizeEstimatesRecorder extends SchemaChangeListener implements Runna
}
@SuppressWarnings("resource")
- private void recordSizeEstimates(ColumnFamilyStore table, Collection<Range<Token>> localRanges)
+ private static Map<Range<Token>, Pair<Long, Long>> computeSizeEstimates(ColumnFamilyStore table, Collection<Range<Token>> ranges)
{
// for each local primary range, estimate (crudely) mean partition size and partitions count.
- Map<Range<Token>, Pair<Long, Long>> estimates = new HashMap<>(localRanges.size());
- for (Range<Token> localRange : localRanges)
+ Map<Range<Token>, Pair<Long, Long>> estimates = new HashMap<>(ranges.size());
+ for (Range<Token> localRange : ranges)
{
for (Range<Token> unwrappedRange : localRange.unwrap())
{
@@ -124,11 +151,10 @@ public class SizeEstimatesRecorder extends SchemaChangeListener implements Runna
}
}
- // atomically update the estimates.
- SystemKeyspace.updateSizeEstimates(table.metadata.keyspace, table.metadata.name, estimates);
+ return estimates;
}
- private long estimatePartitionsCount(Collection<SSTableReader> sstables, Range<Token> range)
+ private static long estimatePartitionsCount(Collection<SSTableReader> sstables, Range<Token> range)
{
long count = 0;
for (SSTableReader sstable : sstables)
@@ -136,7 +162,7 @@ public class SizeEstimatesRecorder extends SchemaChangeListener implements Runna
return count;
}
- private long estimateMeanPartitionSize(Collection<SSTableReader> sstables)
+ private static long estimateMeanPartitionSize(Collection<SSTableReader> sstables)
{
long sum = 0, count = 0;
for (SSTableReader sstable : sstables)
@@ -151,6 +177,6 @@ public class SizeEstimatesRecorder extends SchemaChangeListener implements Runna
@Override
public void onDropTable(String keyspace, String table)
{
- SystemKeyspace.clearSizeEstimates(keyspace, table);
+ SystemKeyspace.clearEstimates(keyspace, table);
}
}
diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java
index c427c8f..eb31f2d 100644
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@ -100,7 +100,9 @@ public final class SystemKeyspace
public static final String PEER_EVENTS_V2 = "peer_events_v2";
public static final String COMPACTION_HISTORY = "compaction_history";
public static final String SSTABLE_ACTIVITY = "sstable_activity";
- public static final String SIZE_ESTIMATES = "size_estimates";
+ public static final String TABLE_ESTIMATES = "table_estimates";
+ public static final String TABLE_ESTIMATES_TYPE_PRIMARY = "primary";
+ public static final String TABLE_ESTIMATES_TYPE_LOCAL_PRIMARY = "local_primary";
public static final String AVAILABLE_RANGES_V2 = "available_ranges_v2";
public static final String TRANSFERRED_RANGES_V2 = "transferred_ranges_v2";
public static final String VIEW_BUILDS_IN_PROGRESS = "view_builds_in_progress";
@@ -112,6 +114,7 @@ public final class SystemKeyspace
@Deprecated public static final String LEGACY_PEER_EVENTS = "peer_events";
@Deprecated public static final String LEGACY_TRANSFERRED_RANGES = "transferred_ranges";
@Deprecated public static final String LEGACY_AVAILABLE_RANGES = "available_ranges";
+ @Deprecated public static final String LEGACY_SIZE_ESTIMATES = "size_estimates";
public static final TableMetadata Batches =
@@ -237,9 +240,10 @@ public final class SystemKeyspace
+ "PRIMARY KEY ((keyspace_name, columnfamily_name, generation)))")
.build();
- private static final TableMetadata SizeEstimates =
- parse(SIZE_ESTIMATES,
- "per-table primary range size estimates",
+ @Deprecated
+ private static final TableMetadata LegacySizeEstimates =
+ parse(LEGACY_SIZE_ESTIMATES,
+ "per-table primary range size estimates, table is deprecated in favor of " + TABLE_ESTIMATES,
"CREATE TABLE %s ("
+ "keyspace_name text,"
+ "table_name text,"
@@ -250,6 +254,20 @@ public final class SystemKeyspace
+ "PRIMARY KEY ((keyspace_name), table_name, range_start, range_end))")
.build();
+ private static final TableMetadata TableEstimates =
+ parse(TABLE_ESTIMATES,
+ "per-table range size estimates",
+ "CREATE TABLE %s ("
+ + "keyspace_name text,"
+ + "table_name text,"
+ + "range_type text,"
+ + "range_start text,"
+ + "range_end text,"
+ + "mean_partition_size bigint,"
+ + "partitions_count bigint,"
+ + "PRIMARY KEY ((keyspace_name), table_name, range_type, range_start, range_end))")
+ .build();
+
private static final TableMetadata AvailableRangesV2 =
parse(AVAILABLE_RANGES_V2,
"available keyspace/ranges during bootstrap/replace that are ready to be served",
@@ -397,7 +415,8 @@ public final class SystemKeyspace
LegacyPeerEvents,
CompactionHistory,
SSTableActivity,
- SizeEstimates,
+ LegacySizeEstimates,
+ TableEstimates,
AvailableRangesV2,
LegacyAvailableRanges,
TransferredRangesV2,
@@ -1248,40 +1267,74 @@ public final class SystemKeyspace
public static void updateSizeEstimates(String keyspace, String table, Map<Range<Token>, Pair<Long, Long>> estimates)
{
long timestamp = FBUtilities.timestampMicros();
- PartitionUpdate.Builder update = new PartitionUpdate.Builder(SizeEstimates, UTF8Type.instance.decompose(keyspace), SizeEstimates.regularAndStaticColumns(), estimates.size());
+ int nowInSec = FBUtilities.nowInSeconds();
+ PartitionUpdate.Builder update = new PartitionUpdate.Builder(LegacySizeEstimates, UTF8Type.instance.decompose(keyspace), LegacySizeEstimates.regularAndStaticColumns(), estimates.size());
// delete all previous values with a single range tombstone.
+ update.add(new RangeTombstone(Slice.make(LegacySizeEstimates.comparator, table), new DeletionTime(timestamp - 1, nowInSec)));
+
+ // add a CQL row for each primary token range.
+ for (Map.Entry<Range<Token>, Pair<Long, Long>> entry : estimates.entrySet())
+ {
+ Range<Token> range = entry.getKey();
+ Pair<Long, Long> values = entry.getValue();
+ update.add(Rows.simpleBuilder(LegacySizeEstimates, table, range.left.toString(), range.right.toString())
+ .timestamp(timestamp)
+ .add("partitions_count", values.left)
+ .add("mean_partition_size", values.right)
+ .build());
+ }
+ new Mutation(update.build()).apply();
+ }
+
+ /**
+ * Writes the current partition count and size estimates into table_estimates
+ */
+ public static void updateTableEstimates(String keyspace, String table, String type, Map<Range<Token>, Pair<Long, Long>> estimates)
+ {
+ long timestamp = FBUtilities.timestampMicros();
int nowInSec = FBUtilities.nowInSeconds();
- update.add(new RangeTombstone(Slice.make(SizeEstimates.comparator, table), new DeletionTime(timestamp - 1, nowInSec)));
+ PartitionUpdate.Builder update = new PartitionUpdate.Builder(TableEstimates, UTF8Type.instance.decompose(keyspace), TableEstimates.regularAndStaticColumns(), estimates.size());
+
+ // delete all previous values with a single range tombstone.
+ update.add(new RangeTombstone(Slice.make(TableEstimates.comparator, table, type), new DeletionTime(timestamp - 1, nowInSec)));
// add a CQL row for each primary token range.
for (Map.Entry<Range<Token>, Pair<Long, Long>> entry : estimates.entrySet())
{
Range<Token> range = entry.getKey();
Pair<Long, Long> values = entry.getValue();
- update.add(Rows.simpleBuilder(SizeEstimates, table, range.left.toString(), range.right.toString())
+ update.add(Rows.simpleBuilder(TableEstimates, table, type, range.left.toString(), range.right.toString())
.timestamp(timestamp)
.add("partitions_count", values.left)
.add("mean_partition_size", values.right)
.build());
}
+
new Mutation(update.build()).apply();
}
+
/**
* Clears size estimates for a table (on table drop)
*/
- public static void clearSizeEstimates(String keyspace, String table)
+ public static void clearEstimates(String keyspace, String table)
{
- String cql = format("DELETE FROM %s WHERE keyspace_name = ? AND table_name = ?", SizeEstimates.toString());
+ String cqlFormat = "DELETE FROM %s WHERE keyspace_name = ? AND table_name = ?";
+ String cql = format(cqlFormat, LegacySizeEstimates.toString());
+ executeInternal(cql, keyspace, table);
+ cql = String.format(cqlFormat, TableEstimates.toString());
executeInternal(cql, keyspace, table);
}
/**
* Clears size estimates for a keyspace (used to manually clean when we miss a keyspace drop)
*/
- public static void clearSizeEstimates(String keyspace)
+ public static void clearEstimates(String keyspace)
{
- String cql = String.format("DELETE FROM %s.%s WHERE keyspace_name = ?", SchemaConstants.SYSTEM_KEYSPACE_NAME, SIZE_ESTIMATES);
+ String cqlFormat = "DELETE FROM %s WHERE keyspace_name = ?";
+ String cql = String.format(cqlFormat, LegacySizeEstimates.toString());
+ executeInternal(cql, keyspace);
+ cql = String.format(cqlFormat, TableEstimates.toString());
executeInternal(cql, keyspace);
}
@@ -1291,7 +1344,7 @@ public final class SystemKeyspace
public static synchronized SetMultimap<String, String> getTablesWithSizeEstimates()
{
SetMultimap<String, String> keyspaceTableMap = HashMultimap.create();
- String cql = String.format("SELECT keyspace_name, table_name FROM %s.%s", SchemaConstants.SYSTEM_KEYSPACE_NAME, SIZE_ESTIMATES);
+ String cql = String.format("SELECT keyspace_name, table_name FROM %s", TableEstimates.toString(), TABLE_ESTIMATES_TYPE_PRIMARY);
UntypedResultSet rs = executeInternal(cql);
for (UntypedResultSet.Row row : rs)
{
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlClientHelper.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlClientHelper.java
new file mode 100644
index 0000000..d154243
--- /dev/null
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlClientHelper.java
@@ -0,0 +1,91 @@
+package org.apache.cassandra.hadoop.cql3;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+import com.datastax.driver.core.Host;
+import com.datastax.driver.core.Metadata;
+import com.datastax.driver.core.Token;
+import com.datastax.driver.core.TokenRange;
+
+public class CqlClientHelper
+{
+ private CqlClientHelper()
+ {
+ }
+
+ public static Map<TokenRange, List<Host>> getLocalPrimaryRangeForDC(String keyspace, Metadata metadata, String targetDC)
+ {
+ Objects.requireNonNull(keyspace, "keyspace");
+ Objects.requireNonNull(metadata, "metadata");
+ Objects.requireNonNull(targetDC, "targetDC");
+
+ // In 2.1 the logic was to have a set of nodes used as a seed, they were used to query
+ // client.describe_local_ring(keyspace) -> List<TokenRange>; this should include all nodes in the local dc.
+ // TokenRange contained the endpoints in order, so .endpoints.get(0) is the primary owner
+ // Client does not have a similar API, instead it returns Set<Host>. To replicate this we first need
+ // to compute the primary owners, then add in the replicas
+
+ List<Token> tokens = new ArrayList<>();
+ Map<Token, Host> tokenToHost = new HashMap<>();
+ for (Host host : metadata.getAllHosts())
+ {
+ if (!targetDC.equals(host.getDatacenter()))
+ continue;
+
+ for (Token token : host.getTokens())
+ {
+ Host previous = tokenToHost.putIfAbsent(token, host);
+ if (previous != null)
+ throw new IllegalStateException("Two hosts share the same token; hosts " + host.getHostId() + ":"
+ + host.getTokens() + ", " + previous.getHostId() + ":" + previous.getTokens());
+ tokens.add(token);
+ }
+ }
+ Collections.sort(tokens);
+
+ Map<TokenRange, List<Host>> rangeToReplicas = new HashMap<>();
+
+ // The first token in the ring uses the last token as its 'start', handle this here to simplify the loop
+ Token start = tokens.get(tokens.size() - 1);
+ Token end = tokens.get(0);
+
+ addRange(keyspace, metadata, tokenToHost, rangeToReplicas, start, end);
+ for (int i = 1; i < tokens.size(); i++)
+ {
+ start = tokens.get(i - 1);
+ end = tokens.get(i);
+
+ addRange(keyspace, metadata, tokenToHost, rangeToReplicas, start, end);
+ }
+
+ return rangeToReplicas;
+ }
+
+ private static void addRange(String keyspace,
+ Metadata metadata,
+ Map<Token, Host> tokenToHost,
+ Map<TokenRange, List<Host>> rangeToReplicas,
+ Token start, Token end)
+ {
+ Host host = tokenToHost.get(end);
+ String dc = host.getDatacenter();
+
+ TokenRange range = metadata.newTokenRange(start, end);
+ List<Host> replicas = new ArrayList<>();
+ replicas.add(host);
+ // get all the replicas for the specific DC
+ for (Host replica : metadata.getReplicas(keyspace, range))
+ {
+ if (dc.equals(replica.getDatacenter()) && !host.equals(replica))
+ replicas.add(replica);
+ }
+ List<Host> previous = rangeToReplicas.put(range, replicas);
+ if (previous != null)
+ throw new IllegalStateException("Two hosts (" + host + ", " + previous + ") map to the same token range: " + range);
+ }
+}
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java
index eae1fa2..262965f 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java
@@ -18,6 +18,7 @@
package org.apache.cassandra.hadoop.cql3;
import java.io.IOException;
+import java.net.InetAddress;
import java.util.*;
import java.util.concurrent.*;
@@ -27,9 +28,16 @@ import com.datastax.driver.core.Metadata;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
+import com.datastax.driver.core.SimpleStatement;
+import com.datastax.driver.core.Statement;
import com.datastax.driver.core.TokenRange;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
+import com.datastax.driver.core.exceptions.InvalidQueryException;
import org.apache.cassandra.schema.SchemaConstants;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.InputSplit;
@@ -130,10 +138,15 @@ public class CqlInputFormat extends org.apache.hadoop.mapreduce.InputFormat<Long
ExecutorService executor = new ThreadPoolExecutor(0, 128, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
List<org.apache.hadoop.mapreduce.InputSplit> splits = new ArrayList<>();
- try (Cluster cluster = CqlConfigHelper.getInputCluster(ConfigHelper.getInputInitialAddress(conf).split(","), conf);
+ String[] inputInitialAddress = ConfigHelper.getInputInitialAddress(conf).split(",");
+ try (Cluster cluster = CqlConfigHelper.getInputCluster(inputInitialAddress, conf);
Session session = cluster.connect())
{
- List<Future<List<org.apache.hadoop.mapreduce.InputSplit>>> splitfutures = new ArrayList<>();
+ List<SplitFuture> splitfutures = new ArrayList<>();
+ //TODO if the job range is defined and does perfectly match tokens, then the logic will be unable to get estimates since they are pre-computed
+ // tokens: [0, 10, 20]
+ // job range: [0, 10) - able to get estimate
+ // job range: [5, 15) - unable to get estimate
Pair<String, String> jobKeyRange = ConfigHelper.getInputKeyRange(conf);
Range<Token> jobRange = null;
if (jobKeyRange != null)
@@ -145,14 +158,18 @@ public class CqlInputFormat extends org.apache.hadoop.mapreduce.InputFormat<Long
Metadata metadata = cluster.getMetadata();
// canonical ranges and nodes holding replicas
- Map<TokenRange, Set<Host>> masterRangeNodes = getRangeMap(keyspace, metadata);
-
+ Map<TokenRange, List<Host>> masterRangeNodes = getRangeMap(keyspace, metadata, getTargetDC(metadata, inputInitialAddress));
for (TokenRange range : masterRangeNodes.keySet())
{
if (jobRange == null)
{
- // for each tokenRange, pick a live owner and ask it to compute bite-sized splits
- splitfutures.add(executor.submit(new SplitCallable(range, masterRangeNodes.get(range), conf, session)));
+ for (TokenRange unwrapped : range.unwrap())
+ {
+ // for each tokenRange, pick a live owner and ask it for the byte-sized splits
+ SplitFuture task = new SplitFuture(new SplitCallable(unwrapped, masterRangeNodes.get(range), conf, session));
+ executor.submit(task);
+ splitfutures.add(task);
+ }
}
else
{
@@ -161,24 +178,61 @@ public class CqlInputFormat extends org.apache.hadoop.mapreduce.InputFormat<Long
{
for (TokenRange intersection: range.intersectWith(jobTokenRange))
{
- // for each tokenRange, pick a live owner and ask it to compute bite-sized splits
- splitfutures.add(executor.submit(new SplitCallable(intersection, masterRangeNodes.get(range), conf, session)));
+ for (TokenRange unwrapped : intersection.unwrap())
+ {
+ // for each tokenRange, pick a live owner and ask it for the byte-sized splits
+ SplitFuture task = new SplitFuture(new SplitCallable(unwrapped, masterRangeNodes.get(range), conf, session));
+ executor.submit(task);
+ splitfutures.add(task);
+ }
}
}
}
}
// wait until we have all the results back
- for (Future<List<org.apache.hadoop.mapreduce.InputSplit>> futureInputSplits : splitfutures)
+ List<SplitFuture> failedTasks = new ArrayList<>();
+ int maxSplits = 0;
+ long expectedPartionsForFailedRanges = 0;
+ for (SplitFuture task : splitfutures)
{
try
{
- splits.addAll(futureInputSplits.get());
+ List<ColumnFamilySplit> tokenRangeSplits = task.get();
+ if (tokenRangeSplits.size() > maxSplits)
+ {
+ maxSplits = tokenRangeSplits.size();
+ expectedPartionsForFailedRanges = tokenRangeSplits.get(0).getLength();
+ }
}
catch (Exception e)
{
- throw new IOException("Could not get input splits", e);
+ failedTasks.add(task);
+ }
+ }
+ // The estimate is only stored on a single host, if that host is down then can not get the estimate
+ // its more than likely that a single host could be "too large" for one split but there is no way of
+ // knowning!
+ // This logic attempts to guess the estimate from all the successful ranges
+ if (!failedTasks.isEmpty())
+ {
+ // if every split failed this will be 0
+ if (maxSplits == 0)
+ throwAllSplitsFailed(failedTasks);
+ for (SplitFuture task : failedTasks)
+ {
+ try
+ {
+ // the task failed, so this should throw
+ task.get();
+ }
+ catch (Exception cause)
+ {
+ logger.warn("Unable to get estimate for {}, the host {} had a exception; falling back to default estimate", task.splitCallable.tokenRange, task.splitCallable.hosts.get(0), cause);
+ }
}
+ for (SplitFuture task : failedTasks)
+ splits.addAll(toSplit(task.splitCallable.hosts, splitTokenRange(task.splitCallable.tokenRange, maxSplits, expectedPartionsForFailedRanges)));
}
}
finally
@@ -191,19 +245,73 @@ public class CqlInputFormat extends org.apache.hadoop.mapreduce.InputFormat<Long
return splits;
}
- private TokenRange rangeToTokenRange(Metadata metadata, Range<Token> range)
+ private static IllegalStateException throwAllSplitsFailed(List<SplitFuture> failedTasks)
{
- return metadata.newTokenRange(metadata.newToken(partitioner.getTokenFactory().toString(range.left)),
- metadata.newToken(partitioner.getTokenFactory().toString(range.right)));
+ IllegalStateException exception = new IllegalStateException("No successful tasks found");
+ for (SplitFuture task : failedTasks)
+ {
+ try
+ {
+ // the task failed, so this should throw
+ task.get();
+ }
+ catch (Exception cause)
+ {
+ exception.addSuppressed(cause);
+ }
+ }
+ throw exception;
}
- private Map<TokenRange, Long> getSubSplits(String keyspace, String cfName, TokenRange range, Configuration conf, Session session)
+ private static String getTargetDC(Metadata metadata, String[] inputInitialAddress)
+ {
+ BiMultiValMap<InetAddress, String> addressToDc = new BiMultiValMap<>();
+ Multimap<String, InetAddress> dcToAddresses = addressToDc.inverse();
+
+ // only way to match is off the broadcast addresses, so for all hosts do a existence check
+ Set<InetAddress> addresses = new HashSet<>(inputInitialAddress.length);
+ for (String inputAddress : inputInitialAddress)
+ addresses.addAll(parseAddress(inputAddress));
+
+ for (Host host : metadata.getAllHosts())
+ {
+ InetAddress address = host.getBroadcastAddress();
+ if (addresses.contains(address))
+ addressToDc.put(address, host.getDatacenter());
+ }
+
+ switch (dcToAddresses.keySet().size())
+ {
+ case 1:
+ return Iterables.getOnlyElement(dcToAddresses.keySet());
+ case 0:
+ throw new IllegalStateException("Input addresses could not be used to find DC; non match client metadata");
+ default:
+ // Mutliple DCs found, attempt to pick the first based off address list. This is to mimic the 2.1
+ // behavior which would connect in order and the first node successfully able to connect to was the
+ // local DC to use; since client abstracts this, we rely on existence as a proxy for connect.
+ for (String inputAddress : inputInitialAddress)
+ {
+ for (InetAddress add : parseAddress(inputAddress))
+ {
+ String dc = addressToDc.get(add);
+ // possible the address isn't in the cluster and the client dropped, so ignore null
+ if (dc != null)
+ return dc;
+ }
+ }
+ // some how we were able to connect to the cluster, find multiple DCs using matching, and yet couldn't
+ // match again...
+ throw new AssertionError("Unable to infer datacenter from initial addresses; multiple datacenters found "
+ + dcToAddresses.keySet() + ", should only use addresses from one datacenter");
+ }
+ }
+
+ private static List<InetAddress> parseAddress(String str)
{
- int splitSize = ConfigHelper.getInputSplitSize(conf);
- int splitSizeMb = ConfigHelper.getInputSplitSizeInMb(conf);
try
{
- return describeSplits(keyspace, cfName, range, splitSize, splitSizeMb, session);
+ return Arrays.asList(InetAddress.getAllByName(str));
}
catch (Exception e)
{
@@ -211,22 +319,35 @@ public class CqlInputFormat extends org.apache.hadoop.mapreduce.InputFormat<Long
}
}
- private Map<TokenRange, Set<Host>> getRangeMap(String keyspace, Metadata metadata)
+ private TokenRange rangeToTokenRange(Metadata metadata, Range<Token> range)
+ {
+ return metadata.newTokenRange(metadata.newToken(partitioner.getTokenFactory().toString(range.left)),
+ metadata.newToken(partitioner.getTokenFactory().toString(range.right)));
+ }
+
+ private Map<TokenRange, Long> getSubSplits(String keyspace, String cfName, TokenRange range, Host host, Configuration conf, Session session)
{
- return metadata.getTokenRanges()
- .stream()
- .collect(toMap(p -> p, p -> metadata.getReplicas('"' + keyspace + '"', p)));
+ int splitSize = ConfigHelper.getInputSplitSize(conf);
+ int splitSizeMb = ConfigHelper.getInputSplitSizeInMb(conf);
+ return describeSplits(keyspace, cfName, range, host, splitSize, splitSizeMb, session);
}
- private Map<TokenRange, Long> describeSplits(String keyspace, String table, TokenRange tokenRange, int splitSize, int splitSizeMb, Session session)
+ private static Map<TokenRange, List<Host>> getRangeMap(String keyspace, Metadata metadata, String targetDC)
{
- String query = String.format("SELECT mean_partition_size, partitions_count " +
- "FROM %s.%s " +
- "WHERE keyspace_name = ? AND table_name = ? AND range_start = ? AND range_end = ?",
- SchemaConstants.SYSTEM_KEYSPACE_NAME,
- SystemKeyspace.SIZE_ESTIMATES);
+ return CqlClientHelper.getLocalPrimaryRangeForDC(keyspace, metadata, targetDC);
+ }
- ResultSet resultSet = session.execute(query, keyspace, table, tokenRange.getStart().toString(), tokenRange.getEnd().toString());
+ private Map<TokenRange, Long> describeSplits(String keyspace, String table, TokenRange tokenRange, Host host, int splitSize, int splitSizeMb, Session session)
+ {
+ // In 2.1 the host list was walked in-order (only move to next if IOException) and calls
+ // org.apache.cassandra.service.StorageService.getSplits(java.lang.String, java.lang.String, org.apache.cassandra.dht.Range<org.apache.cassandra.dht.Token>, int)
+ // that call computes totalRowCountEstimate (used to compute #splits) then splits the ring based off those estimates
+ //
+ // The main difference is that the estimates in 2.1 were computed based off the data, so replicas could answer the estimates
+ // In 3.0 we rely on the below CQL query which is local and only computes estimates for the primary range; this
+ // puts us in a sticky spot to answer, if the node fails what do we do? 3.0 behavior only matches 2.1 IFF all
+ // nodes are up and healthy
+ ResultSet resultSet = queryTableEstimates(session, host, keyspace, table, tokenRange);
Row row = resultSet.one();
@@ -250,14 +371,47 @@ public class CqlInputFormat extends org.apache.hadoop.mapreduce.InputFormat<Long
if (splitCount == 0)
{
Map<TokenRange, Long> wrappedTokenRange = new HashMap<>();
- wrappedTokenRange.put(tokenRange, (long) 128);
+ wrappedTokenRange.put(tokenRange, partitionCount == 0 ? 128L : partitionCount);
return wrappedTokenRange;
}
+ return splitTokenRange(tokenRange, splitCount, partitionCount / splitCount);
+ }
+
+ private static ResultSet queryTableEstimates(Session session, Host host, String keyspace, String table, TokenRange tokenRange)
+ {
+ try
+ {
+ String query = String.format("SELECT mean_partition_size, partitions_count " +
+ "FROM %s.%s " +
+ "WHERE keyspace_name = ? AND table_name = ? AND range_type = '%s' AND range_start = ? AND range_end = ?",
+ SchemaConstants.SYSTEM_KEYSPACE_NAME,
+ SystemKeyspace.TABLE_ESTIMATES,
+ SystemKeyspace.TABLE_ESTIMATES_TYPE_LOCAL_PRIMARY);
+ Statement stmt = new SimpleStatement(query, keyspace, table, tokenRange.getStart().toString(), tokenRange.getEnd().toString()).setHost(host);
+ return session.execute(stmt);
+ }
+ catch (InvalidQueryException e)
+ {
+ // if the table doesn't exist, fall back to old table. This is likely to return no records in a multi
+ // DC setup, but should work fine in a single DC setup.
+ String query = String.format("SELECT mean_partition_size, partitions_count " +
+ "FROM %s.%s " +
+ "WHERE keyspace_name = ? AND table_name = ? AND range_start = ? AND range_end = ?",
+ SchemaConstants.SYSTEM_KEYSPACE_NAME,
+ SystemKeyspace.LEGACY_SIZE_ESTIMATES);
+
+ Statement stmt = new SimpleStatement(query, keyspace, table, tokenRange.getStart().toString(), tokenRange.getEnd().toString()).setHost(host);
+ return session.execute(stmt);
+ }
+ }
+
+ private static Map<TokenRange, Long> splitTokenRange(TokenRange tokenRange, int splitCount, long partitionCount)
+ {
List<TokenRange> splitRanges = tokenRange.splitEvenly(splitCount);
Map<TokenRange, Long> rangesWithLength = Maps.newHashMapWithExpectedSize(splitRanges.size());
for (TokenRange range : splitRanges)
- rangesWithLength.put(range, partitionCount/splitCount);
+ rangesWithLength.put(range, partitionCount);
return rangesWithLength;
}
@@ -277,56 +431,70 @@ public class CqlInputFormat extends org.apache.hadoop.mapreduce.InputFormat<Long
* Gets a token tokenRange and splits it up according to the suggested
* size into input splits that Hadoop can use.
*/
- class SplitCallable implements Callable<List<org.apache.hadoop.mapreduce.InputSplit>>
+ class SplitCallable implements Callable<List<ColumnFamilySplit>>
{
private final TokenRange tokenRange;
- private final Set<Host> hosts;
+ private final List<Host> hosts;
private final Configuration conf;
private final Session session;
- public SplitCallable(TokenRange tr, Set<Host> hosts, Configuration conf, Session session)
+ public SplitCallable(TokenRange tokenRange, List<Host> hosts, Configuration conf, Session session)
{
- this.tokenRange = tr;
+ Preconditions.checkArgument(!hosts.isEmpty(), "hosts list requires at least 1 host but was empty");
+ this.tokenRange = tokenRange;
this.hosts = hosts;
this.conf = conf;
this.session = session;
}
- public List<org.apache.hadoop.mapreduce.InputSplit> call() throws Exception
+ public List<ColumnFamilySplit> call() throws Exception
{
- ArrayList<org.apache.hadoop.mapreduce.InputSplit> splits = new ArrayList<>();
- Map<TokenRange, Long> subSplits;
- subSplits = getSubSplits(keyspace, cfName, tokenRange, conf, session);
- // turn the sub-ranges into InputSplits
- String[] endpoints = new String[hosts.size()];
+ Map<TokenRange, Long> subSplits = getSubSplits(keyspace, cfName, tokenRange, hosts.get(0), conf, session);
+ return toSplit(hosts, subSplits);
+ }
- // hadoop needs hostname, not ip
- int endpointIndex = 0;
- for (Host endpoint : hosts)
- endpoints[endpointIndex++] = endpoint.getAddress().getHostName();
+ }
- boolean partitionerIsOpp = partitioner instanceof OrderPreservingPartitioner || partitioner instanceof ByteOrderedPartitioner;
+ private static class SplitFuture extends FutureTask<List<ColumnFamilySplit>>
+ {
+ private final SplitCallable splitCallable;
- for (Map.Entry<TokenRange, Long> subSplitEntry : subSplits.entrySet())
- {
- List<TokenRange> ranges = subSplitEntry.getKey().unwrap();
- for (TokenRange subrange : ranges)
- {
- ColumnFamilySplit split =
- new ColumnFamilySplit(
- partitionerIsOpp ?
- subrange.getStart().toString().substring(2) : subrange.getStart().toString(),
- partitionerIsOpp ?
- subrange.getEnd().toString().substring(2) : subrange.getEnd().toString(),
- subSplitEntry.getValue(),
- endpoints);
-
- logger.trace("adding {}", split);
- splits.add(split);
- }
- }
- return splits;
+ SplitFuture(SplitCallable splitCallable)
+ {
+ super(splitCallable);
+ this.splitCallable = splitCallable;
}
}
+
+ private List<ColumnFamilySplit> toSplit(List<Host> hosts, Map<TokenRange, Long> subSplits)
+ {
+ // turn the sub-ranges into InputSplits
+ String[] endpoints = new String[hosts.size()];
+
+ // hadoop needs hostname, not ip
+ int endpointIndex = 0;
+ for (Host endpoint : hosts)
+ endpoints[endpointIndex++] = endpoint.getAddress().getHostName();
+
+ boolean partitionerIsOpp = partitioner instanceof OrderPreservingPartitioner || partitioner instanceof ByteOrderedPartitioner;
+
+ ArrayList<ColumnFamilySplit> splits = new ArrayList<>();
+ for (Map.Entry<TokenRange, Long> subSplitEntry : subSplits.entrySet())
+ {
+ TokenRange subrange = subSplitEntry.getKey();
+ ColumnFamilySplit split =
+ new ColumnFamilySplit(
+ partitionerIsOpp ?
+ subrange.getStart().toString().substring(2) : subrange.getStart().toString(),
+ partitionerIsOpp ?
+ subrange.getEnd().toString().substring(2) : subrange.getEnd().toString(),
+ subSplitEntry.getValue(),
+ endpoints);
+
+ logger.trace("adding {}", split);
+ splits.add(split);
+ }
+ return splits;
+ }
}
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/LimitedLocalNodeFirstLocalBalancingPolicy.java b/src/java/org/apache/cassandra/hadoop/cql3/LimitedLocalNodeFirstLocalBalancingPolicy.java
index 256da2d..59b4eca 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/LimitedLocalNodeFirstLocalBalancingPolicy.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/LimitedLocalNodeFirstLocalBalancingPolicy.java
@@ -56,6 +56,7 @@ class LimitedLocalNodeFirstLocalBalancingPolicy implements LoadBalancingPolicy
private final CopyOnWriteArraySet<Host> liveReplicaHosts = new CopyOnWriteArraySet<>();
private final Set<InetAddress> replicaAddresses = new HashSet<>();
+ private final Set<String> allowedDCs = new CopyOnWriteArraySet<>();
public LimitedLocalNodeFirstLocalBalancingPolicy(String[] replicas)
{
@@ -78,15 +79,22 @@ class LimitedLocalNodeFirstLocalBalancingPolicy implements LoadBalancingPolicy
@Override
public void init(Cluster cluster, Collection<Host> hosts)
{
- List<Host> replicaHosts = new ArrayList<>();
+ // first find which DCs the user defined
+ Set<String> dcs = new HashSet<>();
for (Host host : hosts)
{
if (replicaAddresses.contains(host.getAddress()))
- {
+ dcs.add(host.getDatacenter());
+ }
+ // filter to all nodes within the targeted DCs
+ List<Host> replicaHosts = new ArrayList<>();
+ for (Host host : hosts)
+ {
+ if (dcs.contains(host.getDatacenter()))
replicaHosts.add(host);
- }
}
liveReplicaHosts.addAll(replicaHosts);
+ allowedDCs.addAll(dcs);
logger.trace("Initialized with replica hosts: {}", replicaHosts);
}
@@ -136,7 +144,7 @@ class LimitedLocalNodeFirstLocalBalancingPolicy implements LoadBalancingPolicy
@Override
public void onAdd(Host host)
{
- if (replicaAddresses.contains(host.getAddress()))
+ if (liveReplicaHosts.contains(host))
{
liveReplicaHosts.add(host);
logger.trace("Added a new host {}", host);
@@ -146,7 +154,7 @@ class LimitedLocalNodeFirstLocalBalancingPolicy implements LoadBalancingPolicy
@Override
public void onUp(Host host)
{
- if (replicaAddresses.contains(host.getAddress()))
+ if (liveReplicaHosts.contains(host))
{
liveReplicaHosts.add(host);
logger.trace("The host {} is now up", host);
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index fd350bf..08f0612 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -3677,14 +3677,14 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
String keyspace = tablesByKeyspace.getKey();
if (!Schema.instance.getKeyspaces().contains(keyspace))
{
- SystemKeyspace.clearSizeEstimates(keyspace);
+ SystemKeyspace.clearEstimates(keyspace);
}
else
{
for (String table : tablesByKeyspace.getValue())
{
if (Schema.instance.getTableMetadataRef(keyspace, table) == null)
- SystemKeyspace.clearSizeEstimates(keyspace, table);
+ SystemKeyspace.clearEstimates(keyspace, table);
}
}
}
@@ -3912,6 +3912,32 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
return localDCPrimaryRanges;
}
+ public Collection<Range<Token>> getLocalPrimaryRange()
+ {
+ return getLocalPrimaryRangeForEndpoint(FBUtilities.getBroadcastAddressAndPort());
+ }
+
+ public Collection<Range<Token>> getLocalPrimaryRangeForEndpoint(InetAddressAndPort referenceEndpoint)
+ {
+ IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
+ TokenMetadata tokenMetadata = this.tokenMetadata.cloneOnlyTokenMap();
+ String dc = snitch.getDatacenter(referenceEndpoint);
+ Set<Token> tokens = new HashSet<>(tokenMetadata.getTokens(referenceEndpoint));
+
+ // filter tokens to the single DC
+ List<Token> filteredTokens = Lists.newArrayList();
+ for (Token token : tokenMetadata.sortedTokens())
+ {
+ InetAddressAndPort endpoint = tokenMetadata.getEndpoint(token);
+ if (dc.equals(snitch.getDatacenter(endpoint)))
+ filteredTokens.add(token);
+ }
+
+ return getAllRanges(filteredTokens).stream()
+ .filter(t -> tokens.contains(t.right))
+ .collect(Collectors.toList());
+ }
+
/**
* Get all ranges that span the ring given a set
* of tokens. All ranges are in sorted order of
diff --git a/test/unit/org/apache/cassandra/service/StorageServiceServerTest.java b/test/unit/org/apache/cassandra/service/StorageServiceServerTest.java
index 8e91342..6e7704a 100644
--- a/test/unit/org/apache/cassandra/service/StorageServiceServerTest.java
+++ b/test/unit/org/apache/cassandra/service/StorageServiceServerTest.java
@@ -193,6 +193,45 @@ public class StorageServiceServerTest
// no need to insert extra data, even an "empty" database will have a little information in the system keyspace
StorageService.instance.takeSnapshot(UUID.randomUUID().toString(), SchemaConstants.SCHEMA_KEYSPACE_NAME);
}
+ @Test
+ public void testLocalPrimaryRangeForEndpointWithNetworkTopologyStrategy() throws Exception
+ {
+ TokenMetadata metadata = StorageService.instance.getTokenMetadata();
+ metadata.clearUnsafe();
+
+ // DC1
+ metadata.updateNormalToken(new StringToken("A"), InetAddressAndPort.getByName("127.0.0.1"));
+ metadata.updateNormalToken(new StringToken("C"), InetAddressAndPort.getByName("127.0.0.2"));
+
+ // DC2
+ metadata.updateNormalToken(new StringToken("B"), InetAddressAndPort.getByName("127.0.0.4"));
+ metadata.updateNormalToken(new StringToken("D"), InetAddressAndPort.getByName("127.0.0.5"));
+
+ Map<String, String> configOptions = new HashMap<>();
+ configOptions.put("DC1", "2");
+ configOptions.put("DC2", "2");
+ configOptions.put(ReplicationParams.CLASS, "NetworkTopologyStrategy");
+
+ Keyspace.clear("Keyspace1");
+ KeyspaceMetadata meta = KeyspaceMetadata.create("Keyspace1", KeyspaceParams.create(false, configOptions));
+ Schema.instance.load(meta);
+
+ Collection<Range<Token>> primaryRanges = StorageService.instance.getLocalPrimaryRangeForEndpoint(InetAddressAndPort.getByName("127.0.0.1"));
+ assertEquals(1, primaryRanges.size());
+ assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("C"), new StringToken("A"))));
+
+ primaryRanges = StorageService.instance.getLocalPrimaryRangeForEndpoint(InetAddressAndPort.getByName("127.0.0.2"));
+ assertEquals(1, primaryRanges.size());
+ assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("A"), new StringToken("C"))));
+
+ primaryRanges = StorageService.instance.getLocalPrimaryRangeForEndpoint(InetAddressAndPort.getByName("127.0.0.4"));
+ assertEquals(1, primaryRanges.size());
+ assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("D"), new StringToken("B"))));
+
+ primaryRanges = StorageService.instance.getLocalPrimaryRangeForEndpoint(InetAddressAndPort.getByName("127.0.0.5"));
+ assertEquals(1, primaryRanges.size());
+ assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("B"), new StringToken("D"))));
+ }
@Test
public void testPrimaryRangeForEndpointWithinDCWithNetworkTopologyStrategy() throws Exception
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org