You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by mi...@apache.org on 2016/02/05 18:40:44 UTC
[57/61] [abbrv] lucene-solr git commit: SOLR-8575: Fix HDFSLogReader
replay status numbers and a performance bug where we can reopen
FSDataInputStream too often.
SOLR-8575: Fix HDFSLogReader replay status numbers and a performance bug where we can reopen FSDataInputStream too often.
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/ec4c7231
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/ec4c7231
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/ec4c7231
Branch: refs/heads/lucene-6835
Commit: ec4c72310f3548b93139b25a12d6e9a16ac9e322
Parents: 0f43705
Author: Mark Miller <ma...@apache.org>
Authored: Thu Feb 4 17:54:01 2016 -0500
Committer: Mark Miller <ma...@apache.org>
Committed: Thu Feb 4 17:54:01 2016 -0500
----------------------------------------------------------------------
solr/CHANGES.txt | 2 +
.../apache/solr/update/HdfsTransactionLog.java | 22 +--
.../java/org/apache/solr/update/UpdateLog.java | 2 +-
.../TlogReplayBufferedWhileIndexingTest.java | 136 +++++++++++++++++++
...HdfsTlogReplayBufferedWhileIndexingTest.java | 63 +++++++++
5 files changed, 213 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ec4c7231/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index c396d39..348cf01 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -445,6 +445,8 @@ Bug Fixes
* SOLR-8607: The Schema API refuses to add new fields that match existing dynamic fields.
(Jan Høydahl, Steve Rowe)
+* SOLR-8575: Fix HDFSLogReader replay status numbers and a performance bug where we can reopen
+ FSDataInputStream too often. (Mark Miller, Patrick Dvorack)
Optimizations
----------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ec4c7231/solr/core/src/java/org/apache/solr/update/HdfsTransactionLog.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/HdfsTransactionLog.java b/solr/core/src/java/org/apache/solr/update/HdfsTransactionLog.java
index 3db65c6..bff3486 100644
--- a/solr/core/src/java/org/apache/solr/update/HdfsTransactionLog.java
+++ b/solr/core/src/java/org/apache/solr/update/HdfsTransactionLog.java
@@ -390,16 +390,18 @@ public class HdfsTransactionLog extends TransactionLog {
// we actually need a new reader to
// see if any data was added by the writer
- if (fis.position() >= sz) {
+ if (pos >= sz) {
+ log.info("Read available inputstream data, opening new inputstream pos={} sz={}", pos, sz);
+
+ synchronized (HdfsTransactionLog.this) {
+ sz = fos.size();
+ }
+
fis.close();
tlogOutStream.hflush();
- try {
- FSDataInputStream fdis = fs.open(tlogFile);
- fis = new FSDataFastInputStream(fdis, pos);
- sz = fs.getFileStatus(tlogFile).getLen();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
+
+ FSDataInputStream fdis = fs.open(tlogFile);
+ fis = new FSDataFastInputStream(fdis, pos);
}
if (pos == 0) {
@@ -446,7 +448,7 @@ public class HdfsTransactionLog extends TransactionLog {
@Override
public long currentSize() {
- return sz;
+ return fos.size();
}
}
@@ -604,5 +606,3 @@ class FSDataFastInputStream extends FastInputStream {
return "readFromStream="+readFromStream +" pos="+pos +" end="+end + " bufferPos="+getBufferPos() + " position="+position() ;
}
}
-
-
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ec4c7231/solr/core/src/java/org/apache/solr/update/UpdateLog.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/UpdateLog.java b/solr/core/src/java/org/apache/solr/update/UpdateLog.java
index 78c30b9..214d9a5 100644
--- a/solr/core/src/java/org/apache/solr/update/UpdateLog.java
+++ b/solr/core/src/java/org/apache/solr/update/UpdateLog.java
@@ -1324,7 +1324,7 @@ public class UpdateLog implements PluginInfoInitialized {
loglog.info(
"log replay status {} active={} starting pos={} current pos={} current size={} % read={}",
translog, activeLog, recoveryInfo.positionOfStart, cpos, csize,
- Math.round(cpos / (double) csize * 100.));
+ Math.floor(cpos / (double) csize * 100.));
}
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ec4c7231/solr/core/src/test/org/apache/solr/cloud/TlogReplayBufferedWhileIndexingTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/TlogReplayBufferedWhileIndexingTest.java b/solr/core/src/test/org/apache/solr/cloud/TlogReplayBufferedWhileIndexingTest.java
new file mode 100644
index 0000000..5c03a60
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/TlogReplayBufferedWhileIndexingTest.java
@@ -0,0 +1,136 @@
+package org.apache.solr.cloud;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.lucene.util.LuceneTestCase.Nightly;
+import org.apache.solr.SolrTestCaseJ4.SuppressSSL;
+import org.apache.lucene.util.LuceneTestCase.Slow;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.embedded.JettySolrRunner;
+import org.apache.solr.common.SolrInputDocument;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+@Slow
+@Nightly
+@SuppressSSL
+public class TlogReplayBufferedWhileIndexingTest extends AbstractFullDistribZkTestBase {
+
+ private List<StoppableIndexingThread> threads;
+
+ public TlogReplayBufferedWhileIndexingTest() throws Exception {
+ super();
+ sliceCount = 1;
+ fixShardCount(2);
+ schemaString = "schema15.xml"; // we need a string id
+ }
+
+ @BeforeClass
+ public static void beforeRestartWhileUpdatingTest() throws Exception {
+ System.setProperty("leaderVoteWait", "300000");
+ System.setProperty("solr.autoCommit.maxTime", "10000");
+ System.setProperty("solr.autoSoftCommit.maxTime", "3000");
+ if (System.getProperty("solr.hdfs.home") != null) useFactory("solr.StandardDirectoryFactory");
+ }
+
+ @AfterClass
+ public static void afterRestartWhileUpdatingTest() {
+ System.clearProperty("leaderVoteWait");
+ System.clearProperty("solr.autoCommit.maxTime");
+ System.clearProperty("solr.autoSoftCommit.maxTime");
+ }
+
+ @Test
+ public void test() throws Exception {
+ handle.clear();
+ handle.put("timestamp", SKIPVAL);
+
+ waitForRecoveriesToFinish(false);
+
+ int numThreads = 1;
+
+ threads = new ArrayList<>(numThreads);
+
+ ArrayList<JettySolrRunner> allJetty = new ArrayList<>();
+ allJetty.addAll(jettys);
+ allJetty.remove(shardToLeaderJetty.get("shard1").jetty);
+ assert allJetty.size() == 1 : allJetty.size();
+ ChaosMonkey.stop(allJetty.get(0));
+
+ StoppableIndexingThread indexThread;
+ for (int i = 0; i < numThreads; i++) {
+ indexThread = new StoppableIndexingThread(controlClient, cloudClient, Integer.toString(i), false, 50000, 1, false);
+ threads.add(indexThread);
+ indexThread.start();
+ }
+
+ Thread.sleep(2000);
+
+ ChaosMonkey.start(allJetty.get(0));
+
+ Thread.sleep(45000);
+
+ waitForThingsToLevelOut(320);
+
+ Thread.sleep(2000);
+
+ waitForRecoveriesToFinish(DEFAULT_COLLECTION, cloudClient.getZkStateReader(), false, true);
+
+ for (StoppableIndexingThread thread : threads) {
+ thread.safeStop();
+ thread.safeStop();
+ }
+
+ waitForThingsToLevelOut(30);
+
+ checkShardConsistency(false, false);
+ }
+
+ @Override
+ protected void indexDoc(SolrInputDocument doc) throws IOException,
+ SolrServerException {
+ cloudClient.add(doc);
+ }
+
+
+ @Override
+ public void distribTearDown() throws Exception {
+ // make sure threads have been stopped...
+ if (threads != null) {
+ for (StoppableIndexingThread thread : threads) {
+ thread.safeStop();
+ }
+ }
+
+ super.distribTearDown();
+ }
+
+ // skip the randoms - they can deadlock...
+ @Override
+ protected void indexr(Object... fields) throws Exception {
+ SolrInputDocument doc = new SolrInputDocument();
+ addFields(doc, fields);
+ addFields(doc, "rnd_b", true);
+ indexDoc(doc);
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ec4c7231/solr/core/src/test/org/apache/solr/cloud/hdfs/HdfsTlogReplayBufferedWhileIndexingTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/hdfs/HdfsTlogReplayBufferedWhileIndexingTest.java b/solr/core/src/test/org/apache/solr/cloud/hdfs/HdfsTlogReplayBufferedWhileIndexingTest.java
new file mode 100644
index 0000000..534bb90
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/hdfs/HdfsTlogReplayBufferedWhileIndexingTest.java
@@ -0,0 +1,63 @@
+package org.apache.solr.cloud.hdfs;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.lucene.util.LuceneTestCase.Slow;
+import org.apache.solr.cloud.TlogReplayBufferedWhileIndexingTest;
+import org.apache.solr.util.BadHdfsThreadsFilter;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+import com.carrotsearch.randomizedtesting.annotations.Nightly;
+import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
+
+@Slow
+@Nightly
+@ThreadLeakFilters(defaultFilters = true, filters = {
+ BadHdfsThreadsFilter.class // hdfs currently leaks thread(s)
+})
+public class HdfsTlogReplayBufferedWhileIndexingTest extends TlogReplayBufferedWhileIndexingTest {
+
+ public HdfsTlogReplayBufferedWhileIndexingTest() throws Exception {
+ super();
+ }
+
+ private static MiniDFSCluster dfsCluster;
+
+ @BeforeClass
+ public static void setupClass() throws Exception {
+ dfsCluster = HdfsTestUtil.setupClass(createTempDir().toFile().getAbsolutePath());
+ System.setProperty("solr.hdfs.blockcache.blocksperbank", "2048");
+ }
+
+ @AfterClass
+ public static void teardownClass() throws Exception {
+ HdfsTestUtil.teardownClass(dfsCluster);
+ dfsCluster = null;
+ }
+
+
+ @Override
+ protected String getDataDir(String dataDir) throws IOException {
+ return HdfsTestUtil.getDataDir(dfsCluster, dataDir);
+ }
+
+}