You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@fluo.apache.org by kt...@apache.org on 2018/04/17 14:32:46 UTC
[fluo] branch master updated: fixes #1024 Do less work in the wait
command (#1031)
This is an automated email from the ASF dual-hosted git repository.
kturner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/fluo.git
The following commit(s) were added to refs/heads/master by this push:
new 91884df fixes #1024 Do less work in the wait command (#1031)
91884df is described below
commit 91884df193b9819830bf21444ba1f42c09ba9c68
Author: Keith Turner <ke...@deenlo.com>
AuthorDate: Tue Apr 17 10:32:43 2018 -0400
fixes #1024 Do less work in the wait command (#1031)
---
.../java/org/apache/fluo/command/FluoWait.java | 113 +++++++++++++--------
.../java/org/apache/fluo/core/util/ScanUtil.java | 11 +-
.../core/worker/finder/hash/PartitionManager.java | 2 +-
.../fluo/core/worker/finder/hash/TableRange.java | 23 +++--
.../worker/finder/hash/PartitionManagerTest.java | 2 +-
.../core/worker/finder/hash/TableRangeTest.java | 5 +-
6 files changed, 99 insertions(+), 57 deletions(-)
diff --git a/modules/command/src/main/java/org/apache/fluo/command/FluoWait.java b/modules/command/src/main/java/org/apache/fluo/command/FluoWait.java
index 2bba0c7..9c0b191 100644
--- a/modules/command/src/main/java/org/apache/fluo/command/FluoWait.java
+++ b/modules/command/src/main/java/org/apache/fluo/command/FluoWait.java
@@ -4,9 +4,9 @@
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
@@ -15,72 +15,105 @@
package org.apache.fluo.command;
-import com.google.common.collect.Iterables;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.fluo.api.config.FluoConfiguration;
-import org.apache.fluo.api.exceptions.FluoException;
import org.apache.fluo.core.client.FluoAdminImpl;
import org.apache.fluo.core.impl.Environment;
import org.apache.fluo.core.impl.Notification;
+import org.apache.fluo.core.util.UtilWaitThread;
+import org.apache.fluo.core.worker.finder.hash.TableRange;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static java.util.concurrent.TimeUnit.MINUTES;
+
public class FluoWait {
private static final Logger log = LoggerFactory.getLogger(FluoWait.class);
- private static final long MIN_SLEEP_SEC = 10;
- private static final long MAX_SLEEP_SEC = 300;
-
- private static long calculateSleep(long notifyCount) {
- long sleep = notifyCount / 500;
- if (sleep < MIN_SLEEP_SEC) {
- return MIN_SLEEP_SEC;
- } else if (sleep > MAX_SLEEP_SEC) {
- return MAX_SLEEP_SEC;
- }
- return sleep;
+ private static final long MIN_SLEEP_MS = 250;
+ private static final long MAX_SLEEP_MS = MINUTES.toMillis(5);
+
+ private static List<TableRange> getRanges(Environment env)
+ throws TableNotFoundException, AccumuloSecurityException, AccumuloException {
+ List<TableRange> ranges =
+ TableRange.fromTexts(env.getConnector().tableOperations().listSplits(env.getTable()));
+ Collections.shuffle(ranges);
+ return ranges;
}
- private static long countNotifications(Environment env) {
- Scanner scanner;
+ private static boolean hasNotifications(Environment env, TableRange range)
+ throws TableNotFoundException {
+ Scanner scanner = null;
try {
scanner = env.getConnector().createScanner(env.getTable(), env.getAuthorizations());
- } catch (TableNotFoundException e) {
- log.error("An exception was thrown -", e);
- throw new FluoException(e);
+ scanner.setRange(range.getRange());
+ Notification.configureScanner(scanner);
+
+ return scanner.iterator().hasNext();
+ } finally {
+ if (scanner != null) {
+ scanner.close();
+ }
}
+ }
+
+ /**
+ * Wait until a range has no notifications.
+ *
+ * @return true if notifications were ever seen while waiting
+ */
+ private static boolean waitTillNoNotifications(Environment env, TableRange range)
+ throws TableNotFoundException {
+ boolean sawNotifications = false;
+ long retryTime = MIN_SLEEP_MS;
- Notification.configureScanner(scanner);
+ log.debug("Scanning tablet {} for notifications", range);
- return Iterables.size(scanner);
+ long start = System.currentTimeMillis();
+ while (hasNotifications(env, range)) {
+ sawNotifications = true;
+ long sleepTime = Math.max(System.currentTimeMillis() - start, retryTime);
+ log.debug("Tablet {} had notfications, will rescan in {}ms", range, sleepTime);
+ UtilWaitThread.sleep(sleepTime);
+ retryTime = Math.min(MAX_SLEEP_MS, (long) (retryTime * 1.5));
+ start = System.currentTimeMillis();
+ }
+
+ return sawNotifications;
}
+ /**
+ * Wait until a scan of the table completes without seeing notifications AND without the Oracle
+ * issuing any timestamps during the scan.
+ */
private static void waitUntilFinished(FluoConfiguration config) {
try (Environment env = new Environment(config)) {
- log.info("The wait command will exit when all notifications are processed");
- while (true) {
+ List<TableRange> ranges = getRanges(env);
+
+ outer: while (true) {
long ts1 = env.getSharedResources().getOracleClient().getStamp().getTxTimestamp();
- long ntfyCount = countNotifications(env);
- long ts2 = env.getSharedResources().getOracleClient().getStamp().getTxTimestamp();
- if (ntfyCount == 0 && ts1 == (ts2 - 1)) {
- log.info("All processing has finished!");
- break;
+ for (TableRange range : ranges) {
+ boolean sawNotifications = waitTillNoNotifications(env, range);
+ if (sawNotifications) {
+ ranges = getRanges(env);
+ // This range had notifications. Processing those notifications may have created
+ // notifications in previously scanned ranges, so start over.
+ continue outer;
+ }
}
+ long ts2 = env.getSharedResources().getOracleClient().getStamp().getTxTimestamp();
- try {
- long sleepSec = calculateSleep(ntfyCount);
- log.info("{} notifications are still outstanding. Will try again in {} seconds...",
- ntfyCount, sleepSec);
- Thread.sleep(1000 * sleepSec);
- } catch (InterruptedException e) {
- log.error("Sleep was interrupted! Exiting...");
- System.exit(-1);
+ // Check to ensure the Oracle issued no timestamps during the scan for notifications.
+ if (ts2 - ts1 == 1) {
+ break;
}
}
- } catch (FluoException e) {
- log.error(e.getMessage());
- System.exit(-1);
} catch (Exception e) {
log.error("An exception was thrown -", e);
System.exit(-1);
diff --git a/modules/core/src/main/java/org/apache/fluo/core/util/ScanUtil.java b/modules/core/src/main/java/org/apache/fluo/core/util/ScanUtil.java
index 4690fd5..3cb9d49 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/util/ScanUtil.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/util/ScanUtil.java
@@ -26,6 +26,11 @@ import java.util.List;
import java.util.Map;
import java.util.function.Function;
+import com.google.common.collect.Iterables;
+import com.google.gson.FieldNamingPolicy;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonIOException;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.security.Authorizations;
@@ -40,12 +45,6 @@ import org.apache.fluo.api.data.Column;
import org.apache.fluo.api.data.RowColumnValue;
import org.apache.fluo.api.data.Span;
-import com.google.common.collect.Iterables;
-import com.google.gson.FieldNamingPolicy;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-import com.google.gson.JsonIOException;
-
public class ScanUtil {
public static final String FLUO_VALUE = "value";
public static final String FLUO_COLUMN_VISIBILITY = "visibility";
diff --git a/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/PartitionManager.java b/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/PartitionManager.java
index 4474c78..89647f6 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/PartitionManager.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/PartitionManager.java
@@ -199,7 +199,7 @@ public class PartitionManager {
List<Bytes> zkSplits = new ArrayList<>();
SerializedSplits.deserialize(zkSplits::add, zkSplitData);
- Collection<TableRange> tableRanges = TableRange.toTabletRanges(zkSplits);
+ Collection<TableRange> tableRanges = TableRange.fromBytes(zkSplits);
PartitionInfo newPI = getGroupInfo(me, children, tableRanges, groupSize);
setPartitionInfo(newPI);
diff --git a/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/TableRange.java b/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/TableRange.java
index a7517f8..a3c70bb 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/TableRange.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/TableRange.java
@@ -4,9 +4,9 @@
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
@@ -20,10 +20,12 @@ import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
+import java.util.stream.Stream;
import org.apache.accumulo.core.data.Range;
import org.apache.fluo.api.data.Bytes;
import org.apache.fluo.core.util.ByteUtil;
+import org.apache.fluo.core.util.Hex;
import org.apache.hadoop.io.Text;
import static java.util.stream.Collectors.toList;
@@ -69,12 +71,14 @@ public class TableRange implements Comparable<TableRange> {
@Override
public String toString() {
- return getPrevEndRow() + " " + getEndRow();
- }
+ String per = prevEndRow == null ? "-INF" : Hex.encNonAscii(prevEndRow);
+ String er = endRow == null ? "+INF" : Hex.encNonAscii(endRow);
+ return "(" + per + " " + er + "]";
+ }
- public static Collection<TableRange> toTabletRanges(Collection<Bytes> rows) {
- List<Bytes> sortedRows = rows.stream().sorted().collect(toList());
+ private static List<TableRange> fromStream(Stream<Bytes> stream) {
+ List<Bytes> sortedRows = stream.sorted().collect(toList());
List<TableRange> tablets = new ArrayList<>(sortedRows.size() + 1);
for (int i = 0; i < sortedRows.size(); i++) {
tablets.add(new TableRange(i == 0 ? null : sortedRows.get(i - 1), sortedRows.get(i)));
@@ -83,9 +87,16 @@ public class TableRange implements Comparable<TableRange> {
tablets.add(new TableRange(
sortedRows.size() == 0 ? null : sortedRows.get(sortedRows.size() - 1), null));
return tablets;
+
}
+ public static List<TableRange> fromBytes(Collection<Bytes> rows) {
+ return fromStream(rows.stream());
+ }
+ public static List<TableRange> fromTexts(Collection<Text> rows) {
+ return fromStream(rows.stream().map(ByteUtil::toBytes));
+ }
public Range getRange() {
Text tper = Optional.ofNullable(prevEndRow).map(ByteUtil::toText).orElse(null);
diff --git a/modules/core/src/test/java/org/apache/fluo/core/worker/finder/hash/PartitionManagerTest.java b/modules/core/src/test/java/org/apache/fluo/core/worker/finder/hash/PartitionManagerTest.java
index 3bf5307..8b38065 100644
--- a/modules/core/src/test/java/org/apache/fluo/core/worker/finder/hash/PartitionManagerTest.java
+++ b/modules/core/src/test/java/org/apache/fluo/core/worker/finder/hash/PartitionManagerTest.java
@@ -50,7 +50,7 @@ public class PartitionManagerTest {
Collection<Bytes> rows = IntStream.iterate(0, i -> i + 1000).limit(numSplits)
.mapToObj(i -> String.format("r%06d", i)).map(Bytes::of).collect(toList());
- Collection<TableRange> tablets = TableRange.toTabletRanges(rows);
+ Collection<TableRange> tablets = TableRange.fromBytes(rows);
Set<String> idCombos = new HashSet<>();
Map<Integer, RangeSet> groupTablets = new HashMap<>();
diff --git a/modules/core/src/test/java/org/apache/fluo/core/worker/finder/hash/TableRangeTest.java b/modules/core/src/test/java/org/apache/fluo/core/worker/finder/hash/TableRangeTest.java
index 637186f..0018d09 100644
--- a/modules/core/src/test/java/org/apache/fluo/core/worker/finder/hash/TableRangeTest.java
+++ b/modules/core/src/test/java/org/apache/fluo/core/worker/finder/hash/TableRangeTest.java
@@ -69,8 +69,7 @@ public class TableRangeTest {
Bytes sp2 = Bytes.of("m1");
Bytes sp3 = Bytes.of("r1");
- Collection<TableRange> trc1 =
- new HashSet<>(TableRange.toTabletRanges(Arrays.asList(sp2, sp3, sp1)));
+ Collection<TableRange> trc1 = new HashSet<>(TableRange.fromBytes(Arrays.asList(sp2, sp3, sp1)));
Assert.assertEquals(4, trc1.size());
Assert.assertTrue(trc1.contains(new TableRange(null, sp1)));
@@ -78,7 +77,7 @@ public class TableRangeTest {
Assert.assertTrue(trc1.contains(new TableRange(sp2, sp3)));
Assert.assertTrue(trc1.contains(new TableRange(sp3, null)));
- Collection<TableRange> trc2 = new HashSet<>(TableRange.toTabletRanges(Collections.emptyList()));
+ Collection<TableRange> trc2 = new HashSet<>(TableRange.fromBytes(Collections.emptyList()));
Assert.assertEquals(1, trc2.size());
Assert.assertTrue(trc2.contains(new TableRange(null, null)));
}
--
To stop receiving notification emails like this one, please contact
kturner@apache.org.