You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by bl...@apache.org on 2016/08/12 13:47:16 UTC
[2/4] cassandra git commit: Merge branch cassandra-2.2 into
cassandra-3.0
Merge branch cassandra-2.2 into cassandra-3.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c85749e2
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c85749e2
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c85749e2
Branch: refs/heads/trunk
Commit: c85749e2bc5c65a03b00994565cd0b6b1a642e65
Parents: 2a828af e86d531
Author: Benjamin Lerer <b....@gmail.com>
Authored: Fri Aug 12 15:34:01 2016 +0200
Committer: Benjamin Lerer <b....@gmail.com>
Committed: Fri Aug 12 15:37:13 2016 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/service/StorageProxy.java | 49 +++++++++++++-------
2 files changed, 34 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c85749e2/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 3ce9f9e,ddc6720..959b967
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,39 -1,5 +1,40 @@@
-2.2.8
+3.0.9
+ * Fix potential resource leak in RMIServerSocketFactoryImpl (CASSANDRA-12331)
+ * Backport CASSANDRA-12002 (CASSANDRA-12177)
+ * Make sure compaction stats are updated when compaction is interrupted (CASSANDRA-12100)
+ * Fix potential bad messaging service message for paged range reads
+ within mixed-version 3.x clusters (CASSANDRA-12249)
+ * Change commitlog and sstables to track dirty and clean intervals (CASSANDRA-11828)
+ * NullPointerException during compaction on table with static columns (CASSANDRA-12336)
+ * Fixed ConcurrentModificationException when reading metrics in GraphiteReporter (CASSANDRA-11823)
+ * Fix upgrade of super columns on thrift (CASSANDRA-12335)
+ * Fixed flacky BlacklistingCompactionsTest, switched to fixed size types and increased corruption size (CASSANDRA-12359)
+ * Rerun ReplicationAwareTokenAllocatorTest on failure to avoid flakiness (CASSANDRA-12277)
+ * Exception when computing read-repair for range tombstones (CASSANDRA-12263)
+ * Lost counter writes in compact table and static columns (CASSANDRA-12219)
+ * AssertionError with MVs on updating a row that isn't indexed due to a null value (CASSANDRA-12247)
+ * Disable RR and speculative retry with EACH_QUORUM reads (CASSANDRA-11980)
+ * Add option to override compaction space check (CASSANDRA-12180)
+ * Faster startup by only scanning each directory for temporary files once (CASSANDRA-12114)
+ * Respond with v1/v2 protocol header when responding to driver that attempts
+ to connect with too low of a protocol version (CASSANDRA-11464)
+ * NullPointerExpception when reading/compacting table (CASSANDRA-11988)
+ * Fix problem with undeleteable rows on upgrade to new sstable format (CASSANDRA-12144)
+ * Fix paging logic for deleted partitions with static columns (CASSANDRA-12107)
+ * Wait until the message is being send to decide which serializer must be used (CASSANDRA-11393)
+ * Fix migration of static thrift column names with non-text comparators (CASSANDRA-12147)
+ * Fix upgrading sparse tables that are incorrectly marked as dense (CASSANDRA-11315)
+ * Fix reverse queries ignoring range tombstones (CASSANDRA-11733)
+ * Avoid potential race when rebuilding CFMetaData (CASSANDRA-12098)
+ * Avoid missing sstables when getting the canonical sstables (CASSANDRA-11996)
+ * Always select the live sstables when getting sstables in bounds (CASSANDRA-11944)
+ * Fix column ordering of results with static columns for Thrift requests in
+ a mixed 2.x/3.x cluster, also fix potential non-resolved duplication of
+ those static columns in query results (CASSANDRA-12123)
+ * Avoid digest mismatch with empty but static rows (CASSANDRA-12090)
+ * Fix EOF exception when altering column type (CASSANDRA-11820)
+Merged from 2.2:
+ * Update StorageProxy range metrics for timeouts, failures and unavailables (CASSANDRA-9507)
* Add Sigar to classes included in clientutil.jar (CASSANDRA-11635)
* Add decay to histograms and timers used for metrics (CASSANDRA-11752)
* Fix hanging stream session (CASSANDRA-10992)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c85749e2/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/StorageProxy.java
index 483da67,7b7979d..8a151f2
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@@ -1955,201 -1716,267 +1955,218 @@@ public class StorageProxy implements St
}
}
- public static List<Row> getRangeSlice(AbstractRangeCommand command, ConsistencyLevel consistency_level)
- throws UnavailableException, ReadFailureException, ReadTimeoutException
+ private static class SingleRangeResponse extends AbstractIterator<RowIterator> implements PartitionIterator
{
- Tracing.trace("Computing ranges to query");
- long startTime = System.nanoTime();
+ private final ReadCallback handler;
+ private PartitionIterator result;
- Keyspace keyspace = Keyspace.open(command.keyspace);
- List<Row> rows;
- // now scan until we have enough results
- try
+ private SingleRangeResponse(ReadCallback handler)
{
- int liveRowCount = 0;
- boolean countLiveRows = command.countCQL3Rows() || command.ignoredTombstonedPartitions();
- rows = new ArrayList<>();
+ this.handler = handler;
+ }
- // when dealing with LocalStrategy keyspaces, we can skip the range splitting and merging (which can be
- // expensive in clusters with vnodes)
- List<? extends AbstractBounds<RowPosition>> ranges;
- if (keyspace.getReplicationStrategy() instanceof LocalStrategy)
- ranges = command.keyRange.unwrap();
- else
- ranges = getRestrictedRanges(command.keyRange);
-
- // determine the number of rows to be fetched and the concurrency factor
- int rowsToBeFetched = command.limit();
- int concurrencyFactor;
- if (command.requiresScanningAllRanges())
- {
- // all nodes must be queried
- rowsToBeFetched *= ranges.size();
- concurrencyFactor = ranges.size();
- logger.debug("Requested rows: {}, ranges.size(): {}; concurrent range requests: {}",
- command.limit(),
- ranges.size(),
- concurrencyFactor);
- Tracing.trace("Submitting range requests on {} ranges with a concurrency of {}",
- ranges.size(), concurrencyFactor);
+ private void waitForResponse() throws ReadTimeoutException
+ {
+ if (result != null)
+ return;
+
+ try
+ {
+ result = handler.get();
}
- else
+ catch (DigestMismatchException e)
{
- // our estimate of how many result rows there will be per-range
- float resultRowsPerRange = estimateResultRowsPerRange(command, keyspace);
- // underestimate how many rows we will get per-range in order to increase the likelihood that we'll
- // fetch enough rows in the first round
- resultRowsPerRange -= resultRowsPerRange * CONCURRENT_SUBREQUESTS_MARGIN;
- concurrencyFactor = resultRowsPerRange == 0.0
- ? 1
- : Math.max(1, Math.min(ranges.size(), (int) Math.ceil(command.limit() / resultRowsPerRange)));
-
- logger.trace("Estimated result rows per range: {}; requested rows: {}, ranges.size(): {}; concurrent range requests: {}",
- resultRowsPerRange,
- command.limit(),
- ranges.size(),
- concurrencyFactor);
- Tracing.trace("Submitting range requests on {} ranges with a concurrency of {} ({} rows per range expected)",
- ranges.size(),
- concurrencyFactor,
- resultRowsPerRange);
- }
-
- boolean haveSufficientRows = false;
- int i = 0;
- AbstractBounds<RowPosition> nextRange = null;
- List<InetAddress> nextEndpoints = null;
- List<InetAddress> nextFilteredEndpoints = null;
- while (i < ranges.size())
- {
- List<Pair<AbstractRangeCommand, ReadCallback<RangeSliceReply, Iterable<Row>>>> scanHandlers = new ArrayList<>(concurrencyFactor);
- int concurrentFetchStartingIndex = i;
- int concurrentRequests = 0;
- while ((i - concurrentFetchStartingIndex) < concurrencyFactor)
- {
- AbstractBounds<RowPosition> range = nextRange == null
- ? ranges.get(i)
- : nextRange;
- List<InetAddress> liveEndpoints = nextEndpoints == null
- ? getLiveSortedEndpoints(keyspace, range.right)
- : nextEndpoints;
- List<InetAddress> filteredEndpoints = nextFilteredEndpoints == null
- ? consistency_level.filterForQuery(keyspace, liveEndpoints)
- : nextFilteredEndpoints;
- ++i;
- ++concurrentRequests;
-
- // getRestrictedRange has broken the queried range into per-[vnode] token ranges, but this doesn't take
- // the replication factor into account. If the intersection of live endpoints for 2 consecutive ranges
- // still meets the CL requirements, then we can merge both ranges into the same RangeSliceCommand.
- while (i < ranges.size())
- {
- nextRange = ranges.get(i);
- nextEndpoints = getLiveSortedEndpoints(keyspace, nextRange.right);
- nextFilteredEndpoints = consistency_level.filterForQuery(keyspace, nextEndpoints);
-
- // If the current range right is the min token, we should stop merging because CFS.getRangeSlice
- // don't know how to deal with a wrapping range.
- // Note: it would be slightly more efficient to have CFS.getRangeSlice on the destination nodes unwraps
- // the range if necessary and deal with it. However, we can't start sending wrapped range without breaking
- // wire compatibility, so It's likely easier not to bother;
- if (range.right.isMinimum())
- break;
-
- List<InetAddress> merged = intersection(liveEndpoints, nextEndpoints);
-
- // Check if there is enough endpoint for the merge to be possible.
- if (!consistency_level.isSufficientLiveNodes(keyspace, merged))
- break;
-
- List<InetAddress> filteredMerged = consistency_level.filterForQuery(keyspace, merged);
-
- // Estimate whether merging will be a win or not
- if (!DatabaseDescriptor.getEndpointSnitch().isWorthMergingForRangeQuery(filteredMerged, filteredEndpoints, nextFilteredEndpoints))
- break;
-
- // If we get there, merge this range and the next one
- range = range.withNewRight(nextRange.right);
- liveEndpoints = merged;
- filteredEndpoints = filteredMerged;
- ++i;
- }
-
- AbstractRangeCommand nodeCmd = command.forSubRange(range);
+ throw new AssertionError(e); // no digests in range slices yet
+ }
+ }
- // collect replies and resolve according to consistency level
- RangeSliceResponseResolver resolver = new RangeSliceResponseResolver(nodeCmd.keyspace, command.timestamp);
- List<InetAddress> minimalEndpoints = filteredEndpoints.subList(0, Math.min(filteredEndpoints.size(), consistency_level.blockFor(keyspace)));
- ReadCallback<RangeSliceReply, Iterable<Row>> handler = new ReadCallback<>(resolver, consistency_level, nodeCmd, minimalEndpoints);
- handler.assureSufficientLiveNodes();
- resolver.setSources(filteredEndpoints);
- if (filteredEndpoints.size() == 1
- && filteredEndpoints.get(0).equals(FBUtilities.getBroadcastAddress()))
- {
- StageManager.getStage(Stage.READ).execute(new LocalRangeSliceRunnable(nodeCmd, handler));
- }
- else
- {
- MessageOut<? extends AbstractRangeCommand> message = nodeCmd.createMessage();
- for (InetAddress endpoint : filteredEndpoints)
- {
- Tracing.trace("Enqueuing request to {}", endpoint);
- MessagingService.instance().sendRRWithFailure(message, endpoint, handler);
- }
- }
- scanHandlers.add(Pair.create(nodeCmd, handler));
- }
- Tracing.trace("Submitted {} concurrent range requests covering {} ranges", concurrentRequests, i - concurrentFetchStartingIndex);
+ protected RowIterator computeNext()
+ {
+ waitForResponse();
+ return result.hasNext() ? result.next() : endOfData();
+ }
- List<AsyncOneResponse> repairResponses = new ArrayList<>();
- for (Pair<AbstractRangeCommand, ReadCallback<RangeSliceReply, Iterable<Row>>> cmdPairHandler : scanHandlers)
- {
- ReadCallback<RangeSliceReply, Iterable<Row>> handler = cmdPairHandler.right;
- RangeSliceResponseResolver resolver = (RangeSliceResponseResolver)handler.resolver;
+ public void close()
+ {
+ if (result != null)
+ result.close();
+ }
+ }
- try
- {
- for (Row row : handler.get())
- {
- rows.add(row);
- if (countLiveRows)
- liveRowCount += row.getLiveCount(command.predicate, command.timestamp);
- }
- repairResponses.addAll(resolver.repairResults);
- }
- catch (ReadTimeoutException|ReadFailureException ex)
- {
- // we timed out or failed waiting for responses
- int blockFor = consistency_level.blockFor(keyspace);
- int responseCount = resolver.responses.size();
- String gotData = responseCount > 0
- ? resolver.isDataPresent() ? " (including data)" : " (only digests)"
- : "";
-
- boolean isTimeout = ex instanceof ReadTimeoutException;
- if (Tracing.isTracing())
- {
- Tracing.trace("{}; received {} of {} responses{} for range {} of {}",
- (isTimeout ? "Timed out" : "Failed"), responseCount, blockFor, gotData, i, ranges.size());
- }
- else if (logger.isDebugEnabled())
- {
- logger.debug("Range slice {}; received {} of {} responses{} for range {} of {}",
- (isTimeout ? "timeout" : "failure"), responseCount, blockFor, gotData, i, ranges.size());
- }
- throw ex;
- }
- catch (DigestMismatchException e)
- {
- throw new AssertionError(e); // no digests in range slices yet
- }
+ private static class RangeCommandIterator extends AbstractIterator<RowIterator> implements PartitionIterator
+ {
+ private final Iterator<RangeForQuery> ranges;
+ private final int totalRangeCount;
+ private final PartitionRangeReadCommand command;
+ private final Keyspace keyspace;
+ private final ConsistencyLevel consistency;
- // if we're done, great, otherwise, move to the next range
- int count = countLiveRows ? liveRowCount : rows.size();
- if (count >= rowsToBeFetched)
- {
- haveSufficientRows = true;
- break;
- }
- }
+ private final long startTime;
+ private DataLimits.Counter counter;
+ private PartitionIterator sentQueryIterator;
- try
- {
- FBUtilities.waitOnFutures(repairResponses, DatabaseDescriptor.getWriteRpcTimeout());
- }
- catch (TimeoutException ex)
- {
- // We got all responses, but timed out while repairing
- int blockFor = consistency_level.blockFor(keyspace);
- if (Tracing.isTracing())
- Tracing.trace("Timed out while read-repairing after receiving all {} data and digest responses", blockFor);
- else
- logger.debug("Range slice timeout while read-repairing after receiving all {} data and digest responses", blockFor);
- throw new ReadTimeoutException(consistency_level, blockFor-1, blockFor, true);
- }
+ private int concurrencyFactor;
+ // The two following "metric" are maintained to improve the concurrencyFactor
+ // when it was not good enough initially.
+ private int liveReturned;
+ private int rangesQueried;
- if (haveSufficientRows)
- return command.postReconciliationProcessing(rows);
+ public RangeCommandIterator(RangeIterator ranges, PartitionRangeReadCommand command, int concurrencyFactor, Keyspace keyspace, ConsistencyLevel consistency)
+ {
+ this.command = command;
+ this.concurrencyFactor = concurrencyFactor;
+ this.startTime = System.nanoTime();
+ this.ranges = new RangeMerger(ranges, keyspace, consistency);
+ this.totalRangeCount = ranges.rangeCount();
+ this.consistency = consistency;
+ this.keyspace = keyspace;
+ }
- // we didn't get enough rows in our concurrent fetch; recalculate our concurrency factor
- // based on the results we've seen so far (as long as we still have ranges left to query)
- if (i < ranges.size())
+ public RowIterator computeNext()
+ {
- while (sentQueryIterator == null || !sentQueryIterator.hasNext())
++ try
+ {
- // If we don't have more range to handle, we're done
- if (!ranges.hasNext())
- return endOfData();
-
- // else, sends the next batch of concurrent queries (after having close the previous iterator)
- if (sentQueryIterator != null)
++ while (sentQueryIterator == null || !sentQueryIterator.hasNext())
{
- liveReturned += counter.counted();
- sentQueryIterator.close();
- float fetchedRows = countLiveRows ? liveRowCount : rows.size();
- float remainingRows = rowsToBeFetched - fetchedRows;
- float actualRowsPerRange;
- if (fetchedRows == 0.0)
- {
- // we haven't actually gotten any results, so query all remaining ranges at once
- actualRowsPerRange = 0.0f;
- concurrencyFactor = ranges.size() - i;
- }
- else
++ // If we don't have more range to handle, we're done
++ if (!ranges.hasNext())
++ return endOfData();
+
- // It's not the first batch of queries and we're not done, so we we can use what has been
- // returned so far to improve our rows-per-range estimate and update the concurrency accordingly
- updateConcurrencyFactor();
++ // else, sends the next batch of concurrent queries (after having close the previous iterator)
++ if (sentQueryIterator != null)
+ {
- actualRowsPerRange = fetchedRows / i;
- concurrencyFactor = Math.max(1, Math.min(ranges.size() - i, Math.round(remainingRows / actualRowsPerRange)));
++ liveReturned += counter.counted();
++ sentQueryIterator.close();
++
++ // It's not the first batch of queries and we're not done, so we we can use what has been
++ // returned so far to improve our rows-per-range estimate and update the concurrency accordingly
++ updateConcurrencyFactor();
+ }
- logger.trace("Didn't get enough response rows; actual rows per range: {}; remaining rows: {}, new concurrent requests: {}",
- actualRowsPerRange, (int) remainingRows, concurrencyFactor);
++ sentQueryIterator = sendNextRequests();
}
- sentQueryIterator = sendNextRequests();
- }
+
- return sentQueryIterator.next();
++ return sentQueryIterator.next();
++ }
++ catch (UnavailableException e)
++ {
++ rangeMetrics.unavailables.mark();
++ throw e;
++ }
++ catch (ReadTimeoutException e)
++ {
++ rangeMetrics.timeouts.mark();
++ throw e;
++ }
++ catch (ReadFailureException e)
++ {
++ rangeMetrics.failures.mark();
++ throw e;
+ }
}
- catch (ReadTimeoutException e)
+
+ private void updateConcurrencyFactor()
{
- rangeMetrics.timeouts.mark();
- throw e;
+ if (liveReturned == 0)
+ {
+ // we haven't actually gotten any results, so query all remaining ranges at once
+ concurrencyFactor = totalRangeCount - rangesQueried;
+ return;
+ }
+
+ // Otherwise, compute how many rows per range we got on average and pick a concurrency factor
+ // that should allow us to fetch all remaining rows with the next batch of (concurrent) queries.
+ int remainingRows = command.limits().count() - liveReturned;
+ float rowsPerRange = (float)liveReturned / (float)rangesQueried;
+ concurrencyFactor = Math.max(1, Math.min(totalRangeCount - rangesQueried, Math.round(remainingRows / rowsPerRange)));
+ logger.trace("Didn't get enough response rows; actual rows per range: {}; remaining rows: {}, new concurrent requests: {}",
+ rowsPerRange, (int) remainingRows, concurrencyFactor);
}
- catch (UnavailableException e)
+
+ private SingleRangeResponse query(RangeForQuery toQuery)
{
- rangeMetrics.unavailables.mark();
- throw e;
+ PartitionRangeReadCommand rangeCommand = command.forSubRange(toQuery.range);
+
+ DataResolver resolver = new DataResolver(keyspace, rangeCommand, consistency, toQuery.filteredEndpoints.size());
+
+ int blockFor = consistency.blockFor(keyspace);
+ int minResponses = Math.min(toQuery.filteredEndpoints.size(), blockFor);
+ List<InetAddress> minimalEndpoints = toQuery.filteredEndpoints.subList(0, minResponses);
+ ReadCallback handler = new ReadCallback(resolver, consistency, rangeCommand, minimalEndpoints);
+
+ handler.assureSufficientLiveNodes();
+
+ if (toQuery.filteredEndpoints.size() == 1 && canDoLocalRequest(toQuery.filteredEndpoints.get(0)))
+ {
+ StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(rangeCommand, handler));
+ }
+ else
+ {
+ for (InetAddress endpoint : toQuery.filteredEndpoints)
+ {
+ MessageOut<ReadCommand> message = rangeCommand.createMessage(MessagingService.instance().getVersion(endpoint));
+ Tracing.trace("Enqueuing request to {}", endpoint);
+ MessagingService.instance().sendRRWithFailure(message, endpoint, handler);
+ }
+ }
+
+ return new SingleRangeResponse(handler);
}
- catch (ReadFailureException e)
+
+ private PartitionIterator sendNextRequests()
{
- rangeMetrics.failures.mark();
- throw e;
+ List<PartitionIterator> concurrentQueries = new ArrayList<>(concurrencyFactor);
+ for (int i = 0; i < concurrencyFactor && ranges.hasNext(); i++)
+ {
+ concurrentQueries.add(query(ranges.next()));
+ ++rangesQueried;
+ }
+
+ Tracing.trace("Submitted {} concurrent range requests", concurrentQueries.size());
+ // We want to count the results for the sake of updating the concurrency factor (see updateConcurrencyFactor) but we don't want to
+ // enforce any particular limit at this point (this could break code than rely on postReconciliationProcessing), hence the DataLimits.NONE.
+ counter = DataLimits.NONE.newCounter(command.nowInSec(), true);
+ return counter.applyTo(PartitionIterators.concat(concurrentQueries));
}
- finally
+
+ public void close()
{
- long latency = System.nanoTime() - startTime;
- rangeMetrics.addNano(latency);
- Keyspace.open(command.keyspace).getColumnFamilyStore(command.columnFamily).metric.coordinatorScanLatency.update(latency, TimeUnit.NANOSECONDS);
+ try
+ {
+ if (sentQueryIterator != null)
+ sentQueryIterator.close();
+ }
+ finally
+ {
+ long latency = System.nanoTime() - startTime;
+ rangeMetrics.addNano(latency);
+ Keyspace.openAndGetStore(command.metadata()).metric.coordinatorScanLatency.update(latency, TimeUnit.NANOSECONDS);
+ }
}
- return command.postReconciliationProcessing(rows);
+ }
+
+ @SuppressWarnings("resource")
+ public static PartitionIterator getRangeSlice(PartitionRangeReadCommand command, ConsistencyLevel consistencyLevel)
- throws UnavailableException, ReadFailureException, ReadTimeoutException
+ {
+ Tracing.trace("Computing ranges to query");
+
+ Keyspace keyspace = Keyspace.open(command.metadata().ksName);
+ RangeIterator ranges = new RangeIterator(command, keyspace, consistencyLevel);
+
+ // our estimate of how many result rows there will be per-range
+ float resultsPerRange = estimateResultsPerRange(command, keyspace);
+ // underestimate how many rows we will get per-range in order to increase the likelihood that we'll
+ // fetch enough rows in the first round
+ resultsPerRange -= resultsPerRange * CONCURRENT_SUBREQUESTS_MARGIN;
+ int concurrencyFactor = resultsPerRange == 0.0
+ ? 1
+ : Math.max(1, Math.min(ranges.rangeCount(), (int) Math.ceil(command.limits().count() / resultsPerRange)));
+ logger.trace("Estimated result rows per range: {}; requested rows: {}, ranges.size(): {}; concurrent range requests: {}",
+ resultsPerRange, command.limits().count(), ranges.rangeCount(), concurrencyFactor);
+ Tracing.trace("Submitting range requests on {} ranges with a concurrency of {} ({} rows per range expected)", ranges.rangeCount(), concurrencyFactor, resultsPerRange);
+
+ // Note that in general, a RangeCommandIterator will honor the command limit for each range, but will not enforce it globally.
+
+ return command.limits().filter(command.postReconciliationProcessing(new RangeCommandIterator(ranges, command, concurrencyFactor, keyspace, consistencyLevel)), command.nowInSec());
}
public Map<String, List<String>> getSchemaVersions()