You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2018/12/10 15:07:45 UTC
[10/11] cassandra git commit: Merge branch 'cassandra-3.11' into trunk
http://git-wip-us.apache.org/repos/asf/cassandra/blob/33eada06/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
index cca59cf,e320f30..f54bc03
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
@@@ -104,10 -96,10 +104,10 @@@ public abstract class SSTableWriter ext
MetadataCollector metadataCollector,
SerializationHeader header,
Collection<Index> indexes,
- LifecycleTransaction txn)
+ LifecycleNewTracker lifecycleNewTracker)
{
Factory writerFactory = descriptor.getFormat().getWriterFactory();
- return writerFactory.open(descriptor, keyCount, repairedAt, pendingRepair, isTransient, metadata, metadataCollector, header, observers(descriptor, indexes, txn.opType()), txn);
- return writerFactory.open(descriptor, keyCount, repairedAt, metadata, metadataCollector, header, observers(descriptor, indexes, lifecycleNewTracker.opType()), lifecycleNewTracker);
++ return writerFactory.open(descriptor, keyCount, repairedAt, pendingRepair, isTransient, metadata, metadataCollector, header, observers(descriptor, indexes, lifecycleNewTracker.opType()), lifecycleNewTracker);
}
public static SSTableWriter create(Descriptor descriptor,
@@@ -118,13 -108,13 +118,13 @@@
int sstableLevel,
SerializationHeader header,
Collection<Index> indexes,
- LifecycleTransaction txn)
+ LifecycleNewTracker lifecycleNewTracker)
{
- CFMetaData metadata = Schema.instance.getCFMetaData(descriptor);
- return create(metadata, descriptor, keyCount, repairedAt, sstableLevel, header, indexes, lifecycleNewTracker);
+ TableMetadataRef metadata = Schema.instance.getTableMetadataRef(descriptor);
- return create(metadata, descriptor, keyCount, repairedAt, pendingRepair, isTransient, sstableLevel, header, indexes, txn);
++ return create(metadata, descriptor, keyCount, repairedAt, pendingRepair, isTransient, sstableLevel, header, indexes, lifecycleNewTracker);
}
- public static SSTableWriter create(CFMetaData metadata,
+ public static SSTableWriter create(TableMetadataRef metadata,
Descriptor descriptor,
long keyCount,
long repairedAt,
@@@ -133,26 -121,36 +133,26 @@@
int sstableLevel,
SerializationHeader header,
Collection<Index> indexes,
- LifecycleTransaction txn)
+ LifecycleNewTracker lifecycleNewTracker)
{
- MetadataCollector collector = new MetadataCollector(metadata.comparator).sstableLevel(sstableLevel);
- return create(descriptor, keyCount, repairedAt, metadata, collector, header, indexes, lifecycleNewTracker);
- }
-
- public static SSTableWriter create(String filename,
- long keyCount,
- long repairedAt,
- int sstableLevel,
- SerializationHeader header,
- Collection<Index> indexes,
- LifecycleNewTracker lifecycleNewTracker)
- {
- return create(Descriptor.fromFilename(filename), keyCount, repairedAt, sstableLevel, header, indexes, lifecycleNewTracker);
+ MetadataCollector collector = new MetadataCollector(metadata.get().comparator).sstableLevel(sstableLevel);
- return create(descriptor, keyCount, repairedAt, pendingRepair, isTransient, metadata, collector, header, indexes, txn);
++ return create(descriptor, keyCount, repairedAt, pendingRepair, isTransient, metadata, collector, header, indexes, lifecycleNewTracker);
}
@VisibleForTesting
- public static SSTableWriter create(String filename,
+ public static SSTableWriter create(Descriptor descriptor,
long keyCount,
long repairedAt,
+ UUID pendingRepair,
+ boolean isTransient,
SerializationHeader header,
Collection<Index> indexes,
- LifecycleTransaction txn)
+ LifecycleNewTracker lifecycleNewTracker)
{
- return create(descriptor, keyCount, repairedAt, pendingRepair, isTransient, 0, header, indexes, txn);
- Descriptor descriptor = Descriptor.fromFilename(filename);
- return create(descriptor, keyCount, repairedAt, 0, header, indexes, lifecycleNewTracker);
++ return create(descriptor, keyCount, repairedAt, pendingRepair, isTransient, 0, header, indexes, lifecycleNewTracker);
}
- private static Set<Component> components(CFMetaData metadata)
+ private static Set<Component> components(TableMetadata metadata)
{
Set<Component> components = new HashSet<Component>(Arrays.asList(Component.DATA,
Component.PRIMARY_INDEX,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/33eada06/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
index d65a7c0,9af7dc0..448808c
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
@@@ -19,16 -19,11 +19,16 @@@ package org.apache.cassandra.io.sstable
import java.util.Collection;
import java.util.Set;
+import java.util.UUID;
-import org.apache.cassandra.config.CFMetaData;
+import com.google.common.base.Preconditions;
+
+import org.apache.cassandra.io.sstable.SSTable;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.schema.TableMetadataRef;
import org.apache.cassandra.db.RowIndexEntry;
import org.apache.cassandra.db.SerializationHeader;
- import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+ import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.format.*;
@@@ -93,10 -87,9 +93,10 @@@ public class BigFormat implements SSTab
MetadataCollector metadataCollector,
SerializationHeader header,
Collection<SSTableFlushObserver> observers,
- LifecycleTransaction txn)
+ LifecycleNewTracker lifecycleNewTracker)
{
- return new BigTableWriter(descriptor, keyCount, repairedAt, metadata, metadataCollector, header, observers, lifecycleNewTracker);
+ SSTable.validateRepairedMetadata(repairedAt, pendingRepair, isTransient);
- return new BigTableWriter(descriptor, keyCount, repairedAt, pendingRepair, isTransient, metadata, metadataCollector, header, observers, txn);
++ return new BigTableWriter(descriptor, keyCount, repairedAt, pendingRepair, isTransient, metadata, metadataCollector, header, observers, lifecycleNewTracker);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/33eada06/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
index 7513e95,9083cd3..70f568d
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
@@@ -17,10 -17,15 +17,11 @@@
*/
package org.apache.cassandra.io.sstable.format.big;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
+import java.io.*;
import java.nio.ByteBuffer;
-import java.util.Collection;
-import java.util.Map;
-import java.util.Optional;
+import java.util.*;
+ import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@@ -73,10 -75,10 +73,10 @@@ public class BigTableWriter extends SST
MetadataCollector metadataCollector,
SerializationHeader header,
Collection<SSTableFlushObserver> observers,
- LifecycleTransaction txn)
+ LifecycleNewTracker lifecycleNewTracker)
{
- super(descriptor, keyCount, repairedAt, metadata, metadataCollector, header, observers);
+ super(descriptor, keyCount, repairedAt, pendingRepair, isTransient, metadata, metadataCollector, header, observers);
- txn.trackNew(this); // must track before any files are created
+ lifecycleNewTracker.trackNew(this); // must track before any files are created
if (compression)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/33eada06/src/java/org/apache/cassandra/io/sstable/format/big/BigTableZeroCopyWriter.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/format/big/BigTableZeroCopyWriter.java
index 400f119,0000000..8826381
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableZeroCopyWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableZeroCopyWriter.java
@@@ -1,226 -1,0 +1,227 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable.format.big;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.EnumMap;
+import java.util.Map;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
++import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.io.FSWriteError;
+import org.apache.cassandra.io.compress.BufferType;
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTable;
+import org.apache.cassandra.io.sstable.SSTableMultiWriter;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.SequentialWriter;
+import org.apache.cassandra.io.util.SequentialWriterOption;
+import org.apache.cassandra.net.async.RebufferingByteBufDataInputPlus;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.schema.TableMetadataRef;
+
+import static java.lang.String.format;
+import static org.apache.cassandra.utils.FBUtilities.prettyPrintMemory;
+
+public class BigTableZeroCopyWriter extends SSTable implements SSTableMultiWriter
+{
+ private static final Logger logger = LoggerFactory.getLogger(BigTableZeroCopyWriter.class);
+
+ private final TableMetadataRef metadata;
+ private volatile SSTableReader finalReader;
+ private final Map<Component.Type, SequentialWriter> componentWriters;
+
+ private static final SequentialWriterOption WRITER_OPTION =
+ SequentialWriterOption.newBuilder()
+ .trickleFsync(false)
+ .bufferSize(2 << 20)
+ .bufferType(BufferType.OFF_HEAP)
+ .build();
+
+ private static final ImmutableSet<Component> SUPPORTED_COMPONENTS =
+ ImmutableSet.of(Component.DATA,
+ Component.PRIMARY_INDEX,
+ Component.SUMMARY,
+ Component.STATS,
+ Component.COMPRESSION_INFO,
+ Component.FILTER,
+ Component.DIGEST,
+ Component.CRC);
+
+ public BigTableZeroCopyWriter(Descriptor descriptor,
+ TableMetadataRef metadata,
- LifecycleTransaction txn,
++ LifecycleNewTracker lifecycleNewTracker,
+ final Collection<Component> components)
+ {
+ super(descriptor, ImmutableSet.copyOf(components), metadata, DatabaseDescriptor.getDiskOptimizationStrategy());
+
- txn.trackNew(this);
++ lifecycleNewTracker.trackNew(this);
+ this.metadata = metadata;
+ this.componentWriters = new EnumMap<>(Component.Type.class);
+
+ if (!SUPPORTED_COMPONENTS.containsAll(components))
+ throw new AssertionError(format("Unsupported streaming component detected %s",
+ Sets.difference(ImmutableSet.copyOf(components), SUPPORTED_COMPONENTS)));
+
+ for (Component c : components)
+ componentWriters.put(c.type, makeWriter(descriptor, c));
+ }
+
+ private static SequentialWriter makeWriter(Descriptor descriptor, Component component)
+ {
+ return new SequentialWriter(new File(descriptor.filenameFor(component)), WRITER_OPTION, false);
+ }
+
+ private void write(DataInputPlus in, long size, SequentialWriter out) throws FSWriteError
+ {
+ final int BUFFER_SIZE = 1 << 20;
+ long bytesRead = 0;
+ byte[] buff = new byte[BUFFER_SIZE];
+ try
+ {
+ while (bytesRead < size)
+ {
+ int toRead = (int) Math.min(size - bytesRead, BUFFER_SIZE);
+ in.readFully(buff, 0, toRead);
+ int count = Math.min(toRead, BUFFER_SIZE);
+ out.write(buff, 0, count);
+ bytesRead += count;
+ }
+ out.sync();
+ }
+ catch (IOException e)
+ {
+ throw new FSWriteError(e, out.getPath());
+ }
+ }
+
+ @Override
+ public boolean append(UnfilteredRowIterator partition)
+ {
+ throw new UnsupportedOperationException("Operation not supported by BigTableBlockWriter");
+ }
+
+ @Override
+ public Collection<SSTableReader> finish(long repairedAt, long maxDataAge, boolean openResult)
+ {
+ return finish(openResult);
+ }
+
+ @Override
+ public Collection<SSTableReader> finish(boolean openResult)
+ {
+ setOpenResult(openResult);
+ return finished();
+ }
+
+ @Override
+ public Collection<SSTableReader> finished()
+ {
+ if (finalReader == null)
+ finalReader = SSTableReader.open(descriptor, components, metadata);
+
+ return ImmutableList.of(finalReader);
+ }
+
+ @Override
+ public SSTableMultiWriter setOpenResult(boolean openResult)
+ {
+ return null;
+ }
+
+ @Override
+ public long getFilePointer()
+ {
+ return 0;
+ }
+
+ @Override
+ public TableId getTableId()
+ {
+ return metadata.id;
+ }
+
+ @Override
+ public Throwable commit(Throwable accumulate)
+ {
+ for (SequentialWriter writer : componentWriters.values())
+ accumulate = writer.commit(accumulate);
+ return accumulate;
+ }
+
+ @Override
+ public Throwable abort(Throwable accumulate)
+ {
+ for (SequentialWriter writer : componentWriters.values())
+ accumulate = writer.abort(accumulate);
+ return accumulate;
+ }
+
+ @Override
+ public void prepareToCommit()
+ {
+ for (SequentialWriter writer : componentWriters.values())
+ writer.prepareToCommit();
+ }
+
+ @Override
+ public void close()
+ {
+ for (SequentialWriter writer : componentWriters.values())
+ writer.close();
+ }
+
+ public void writeComponent(Component.Type type, DataInputPlus in, long size)
+ {
+ logger.info("Writing component {} to {} length {}", type, componentWriters.get(type).getPath(), prettyPrintMemory(size));
+
+ if (in instanceof RebufferingByteBufDataInputPlus)
+ write((RebufferingByteBufDataInputPlus) in, size, componentWriters.get(type));
+ else
+ write(in, size, componentWriters.get(type));
+ }
+
+ private void write(RebufferingByteBufDataInputPlus in, long size, SequentialWriter writer)
+ {
+ logger.info("Block Writing component to {} length {}", writer.getPath(), prettyPrintMemory(size));
+
+ try
+ {
+ long bytesWritten = in.consumeUntil(writer, size);
+
+ if (bytesWritten != size)
+ throw new IOException(format("Failed to read correct number of bytes from channel %s", writer));
+ }
+ catch (IOException e)
+ {
+ throw new FSWriteError(e, writer.getPath());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/33eada06/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
index 5e60f7a,cf28bf7..874097d
--- a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
+++ b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
@@@ -436,16 -309,14 +436,16 @@@ public abstract class AbstractReplicati
{
try
{
- if (Integer.parseInt(rf) < 0)
+ ReplicationFactor rf = ReplicationFactor.fromString(s);
+ if (rf.hasTransientReplicas())
{
- throw new ConfigurationException("Replication factor must be non-negative; found " + rf);
+ if (DatabaseDescriptor.getNumTokens() > 1)
- throw new ConfigurationException("Transient replication is not supported with vnodes yet");
++ throw new ConfigurationException(String.format("Transient replication is not supported with vnodes yet"));
}
}
- catch (NumberFormatException e2)
+ catch (IllegalArgumentException e)
{
- throw new ConfigurationException("Replication factor must be numeric; found " + rf);
+ throw new ConfigurationException(e.getMessage());
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/33eada06/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
index 0a96f4c,5388dd6..31c60be
--- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
@@@ -128,8 -196,83 +128,8 @@@ public class StreamReceiveTask extends
task.session.taskCompleted(task);
return;
}
- cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
- hasViews = !Iterables.isEmpty(View.findAll(kscf.left, kscf.right));
- hasCDC = cfs.metadata.params.cdc;
-
- Collection<SSTableReader> readers = task.sstables;
-
- try (Refs<SSTableReader> refs = Refs.ref(readers))
- {
- /*
- * We have a special path for views and for CDC.
- *
- * For views, since the view requires cleaning up any pre-existing state, we must put all partitions
- * through the same write path as normal mutations. This also ensures any 2is are also updated.
- *
- * For CDC-enabled tables, we want to ensure that the mutations are run through the CommitLog so they
- * can be archived by the CDC process on discard.
- */
- if (hasViews || hasCDC)
- {
- for (SSTableReader reader : readers)
- {
- Keyspace ks = Keyspace.open(reader.getKeyspaceName());
- try (ISSTableScanner scanner = reader.getScanner())
- {
- while (scanner.hasNext())
- {
- try (UnfilteredRowIterator rowIterator = scanner.next())
- {
- Mutation m = new Mutation(PartitionUpdate.fromIterator(rowIterator, ColumnFilter.all(cfs.metadata)));
-
- // MV *can* be applied unsafe if there's no CDC on the CFS as we flush below
- // before transaction is done.
- //
- // If the CFS has CDC, however, these updates need to be written to the CommitLog
- // so they get archived into the cdc_raw folder
- ks.apply(m, hasCDC, true, false);
- }
- }
- }
- }
- }
- else
- {
- task.finishTransaction();
-
- logger.debug("[Stream #{}] Received {} sstables from {} ({})", task.session.planId(), readers.size(), task.session.peer, readers);
- // add sstables and build secondary indexes
- cfs.addSSTables(readers);
- cfs.indexManager.buildAllIndexesBlocking(readers);
-
- //invalidate row and counter cache
- if (cfs.isRowCacheEnabled() || cfs.metadata.isCounter())
- {
- List<Bounds<Token>> boundsToInvalidate = new ArrayList<>(readers.size());
- readers.forEach(sstable -> boundsToInvalidate.add(new Bounds<Token>(sstable.first.getToken(), sstable.last.getToken())));
- Set<Bounds<Token>> nonOverlappingBounds = Bounds.getNonOverlappingBounds(boundsToInvalidate);
- task.receiver.finished();;
- if (cfs.isRowCacheEnabled())
- {
- int invalidatedKeys = cfs.invalidateRowCache(nonOverlappingBounds);
- if (invalidatedKeys > 0)
- logger.debug("[Stream #{}] Invalidated {} row cache entries on table {}.{} after stream " +
- "receive task completed.", task.session.planId(), invalidatedKeys,
- cfs.keyspace.getName(), cfs.getTableName());
- }
-
- if (cfs.metadata.isCounter())
- {
- int invalidatedKeys = cfs.invalidateCounterCache(nonOverlappingBounds);
- if (invalidatedKeys > 0)
- logger.debug("[Stream #{}] Invalidated {} counter cache entries on table {}.{} after stream " +
- "receive task completed.", task.session.planId(), invalidatedKeys,
- cfs.keyspace.getName(), cfs.getTableName());
- }
- }
- }
- }
++ task.receiver.finished();
task.session.taskCompleted(task);
}
catch (Throwable t)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/33eada06/test/unit/org/apache/cassandra/db/ScrubTest.java
----------------------------------------------------------------------
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org