You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by mm...@apache.org on 2021/07/23 15:28:07 UTC
[accumulo] branch main updated: Make CachableBlockFile builder
require nonNull for cacheId (#2160)
This is an automated email from the ASF dual-hosted git repository.
mmiller pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push:
new 7d176b6 Make CachableBlockFile builder require nonNull for cacheId (#2160)
7d176b6 is described below
commit 7d176b60fc6a2ce28fcc3cfdc9a3b68684c76f87
Author: Mike Miller <mm...@apache.org>
AuthorDate: Fri Jul 23 11:27:55 2021 -0400
Make CachableBlockFile builder require nonNull for cacheId (#2160)
---
.../org/apache/accumulo/core/client/rfile/RFileScanner.java | 6 +++---
.../accumulo/core/client/rfile/RFileSummariesRetriever.java | 6 +++---
.../core/file/blockfile/impl/CachableBlockFile.java | 10 +++-------
.../org/apache/accumulo/core/summary/SummaryReader.java | 8 ++++----
.../accumulo/core/file/rfile/MultiLevelIndexTest.java | 8 ++++----
.../java/org/apache/accumulo/core/file/rfile/RFileTest.java | 13 +++++++------
6 files changed, 24 insertions(+), 27 deletions(-)
diff --git a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java
index 3413aec..bb4356d 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java
@@ -347,9 +347,9 @@ class RFileScanner extends ScannerOptions implements Scanner {
for (int i = 0; i < sources.length; i++) {
// TODO may have been a bug with multiple files and caching in older version...
FSDataInputStream inputStream = (FSDataInputStream) sources[i].getInputStream();
- CachableBuilder cb = new CachableBuilder().cacheId("source-" + i).input(inputStream)
- .length(sources[i].getLength()).conf(opts.in.getConf()).cacheProvider(cacheProvider)
- .cryptoService(cryptoService);
+ CachableBuilder cb =
+ new CachableBuilder().input(inputStream, "source-" + i).length(sources[i].getLength())
+ .conf(opts.in.getConf()).cacheProvider(cacheProvider).cryptoService(cryptoService);
readers.add(new RFile.Reader(cb));
}
diff --git a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileSummariesRetriever.java b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileSummariesRetriever.java
index 8dbb7a1..9341f50 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileSummariesRetriever.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileSummariesRetriever.java
@@ -93,9 +93,9 @@ class RFileSummariesRetriever implements SummaryInputArguments, SummaryFSOptions
try {
SummaryCollection all = new SummaryCollection();
CryptoService cservice = CryptoServiceFactory.newInstance(acuconf, ClassloaderType.JAVA);
- for (RFileSource source : sources) {
- SummaryReader fileSummary = SummaryReader.load(in.getFileSystem().getConf(),
- source.getInputStream(), source.getLength(), summarySelector, factory, cservice);
+ for (int i = 0; i < sources.length; i++) {
+ SummaryReader fileSummary = SummaryReader.load(in.getFileSystem().getConf(), sources[i],
+ "source-" + i, summarySelector, factory, cservice);
SummaryCollection sc = fileSummary
.getSummaries(Collections.singletonList(new Gatherer.RowRange(startRow, endRow)));
all.merge(sc, factory);
diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java
index 2b15fe3..b4c24e2 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java
@@ -77,11 +77,6 @@ public class CachableBlockFile {
Configuration hadoopConf = null;
CryptoService cryptoService = null;
- public CachableBuilder cacheId(String id) {
- this.cacheId = id;
- return this;
- }
-
public CachableBuilder conf(Configuration hadoopConf) {
this.hadoopConf = hadoopConf;
return this;
@@ -94,7 +89,8 @@ public class CachableBlockFile {
return this;
}
- public CachableBuilder input(InputStream is) {
+ public CachableBuilder input(InputStream is, String cacheId) {
+ this.cacheId = cacheId;
this.inputSupplier = () -> is;
return this;
}
@@ -359,7 +355,7 @@ public class CachableBlockFile {
}
public Reader(CachableBuilder b) {
- this.cacheId = b.cacheId;
+ this.cacheId = Objects.requireNonNull(b.cacheId);
this.inputSupplier = b.inputSupplier;
this.lengthSupplier = b.lengthSupplier;
this.fileLenCache = b.fileLenCache;
diff --git a/core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java b/core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java
index ce18259..341b6d9 100644
--- a/core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java
+++ b/core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java
@@ -21,7 +21,6 @@ package org.apache.accumulo.core.summary;
import java.io.DataInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
-import java.io.InputStream;
import java.io.PrintStream;
import java.io.UncheckedIOException;
import java.util.ArrayList;
@@ -30,6 +29,7 @@ import java.util.List;
import java.util.Map;
import java.util.function.Predicate;
+import org.apache.accumulo.core.client.rfile.RFileSource;
import org.apache.accumulo.core.client.summary.SummarizerConfiguration;
import org.apache.accumulo.core.file.blockfile.impl.BasicCacheProvider;
import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile;
@@ -174,11 +174,11 @@ public class SummaryReader {
return fileSummaries;
}
- public static SummaryReader load(Configuration conf, InputStream inputStream, long length,
+ public static SummaryReader load(Configuration conf, RFileSource source, String cacheId,
Predicate<SummarizerConfiguration> summarySelector, SummarizerFactory factory,
CryptoService cryptoService) throws IOException {
- CachableBuilder cb = new CachableBuilder().input(inputStream).length(length).conf(conf)
- .cryptoService(cryptoService);
+ CachableBuilder cb = new CachableBuilder().input(source.getInputStream(), cacheId)
+ .length(source.getLength()).conf(conf).cryptoService(cryptoService);
return load(new CachableBlockFile.Reader(cb), summarySelector, factory);
}
diff --git a/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiLevelIndexTest.java b/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiLevelIndexTest.java
index 6eaabc3..fb2cdd0 100644
--- a/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiLevelIndexTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiLevelIndexTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.accumulo.core.file.rfile;
+import static org.apache.accumulo.core.crypto.CryptoServiceFactory.ClassloaderType.JAVA;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -29,7 +30,6 @@ import java.util.Random;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.DefaultConfiguration;
import org.apache.accumulo.core.crypto.CryptoServiceFactory;
-import org.apache.accumulo.core.crypto.CryptoServiceFactory.ClassloaderType;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile;
import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile.CachableBuilder;
@@ -66,7 +66,7 @@ public class MultiLevelIndexTest {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
FSDataOutputStream dos = new FSDataOutputStream(baos, new FileSystem.Statistics("a"));
BCFile.Writer _cbw = new BCFile.Writer(dos, null, "gz", hadoopConf,
- CryptoServiceFactory.newInstance(aconf, ClassloaderType.JAVA));
+ CryptoServiceFactory.newInstance(aconf, JAVA));
BufferedWriter mliw = new BufferedWriter(new Writer(_cbw, maxBlockSize));
@@ -86,8 +86,8 @@ public class MultiLevelIndexTest {
byte[] data = baos.toByteArray();
SeekableByteArrayInputStream bais = new SeekableByteArrayInputStream(data);
FSDataInputStream in = new FSDataInputStream(bais);
- CachableBuilder cb = new CachableBuilder().input(in).length(data.length).conf(hadoopConf)
- .cryptoService(CryptoServiceFactory.newInstance(aconf, ClassloaderType.JAVA));
+ CachableBuilder cb = new CachableBuilder().input(in, "source-1").length(data.length)
+ .conf(hadoopConf).cryptoService(CryptoServiceFactory.newInstance(aconf, JAVA));
CachableBlockFile.Reader _cbr = new CachableBlockFile.Reader(cb);
Reader reader = new Reader(_cbr, RFile.RINDEX_VER_8);
diff --git a/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java b/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java
index 66572ed..ed62565 100644
--- a/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java
@@ -309,8 +309,8 @@ public class RFileTest {
LruBlockCache indexCache = (LruBlockCache) manager.getBlockCache(CacheType.INDEX);
LruBlockCache dataCache = (LruBlockCache) manager.getBlockCache(CacheType.DATA);
- CachableBuilder cb = new CachableBuilder().cacheId("source-1").input(in).length(fileLength)
- .conf(conf).cacheProvider(new BasicCacheProvider(indexCache, dataCache)).cryptoService(
+ CachableBuilder cb = new CachableBuilder().input(in, "source-1").length(fileLength).conf(conf)
+ .cacheProvider(new BasicCacheProvider(indexCache, dataCache)).cryptoService(
CryptoServiceFactory.newInstance(accumuloConfiguration, ClassloaderType.JAVA));
reader = new RFile.Reader(cb);
if (cfsi)
@@ -1746,10 +1746,11 @@ public class RFileTest {
aconf.set(Property.TSERV_INDEXCACHE_SIZE, Long.toString(100000000));
BlockCacheManager manager = BlockCacheManagerFactory.getInstance(aconf);
manager.start(new BlockCacheConfiguration(aconf));
- CachableBuilder cb = new CachableBuilder().input(in2).length(data.length).conf(hadoopConf)
- .cryptoService(CryptoServiceFactory.newInstance(aconf, ClassloaderType.JAVA))
- .cacheProvider(new BasicCacheProvider(manager.getBlockCache(CacheType.INDEX),
- manager.getBlockCache(CacheType.DATA)));
+ CachableBuilder cb =
+ new CachableBuilder().input(in2, "cache-1").length(data.length).conf(hadoopConf)
+ .cryptoService(CryptoServiceFactory.newInstance(aconf, ClassloaderType.JAVA))
+ .cacheProvider(new BasicCacheProvider(manager.getBlockCache(CacheType.INDEX),
+ manager.getBlockCache(CacheType.DATA)));
Reader reader = new RFile.Reader(cb);
checkIndex(reader);