You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@fluo.apache.org by kt...@apache.org on 2018/05/08 17:38:05 UTC

[fluo] branch master updated: fixes #1026 enable scanning notifications (#1032)

This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/fluo.git


The following commit(s) were added to refs/heads/master by this push:
     new b89a11b  fixes #1026 enable scanning notifications (#1032)
b89a11b is described below

commit b89a11b4fa44b835c4962a8e86a591d3c14c96dd
Author: Keith Turner <ke...@deenlo.com>
AuthorDate: Tue May 8 13:38:01 2018 -0400

    fixes #1026 enable scanning notifications (#1032)
---
 .travis.yml                                        |   2 +-
 .../org/apache/fluo/cluster/runner/AppRunner.java  |  11 +-
 .../java/org/apache/fluo/command/FluoScan.java     |  26 ++++-
 .../apache/fluo/core/util/NotificationScanner.java |  88 ++++++++++++++++
 .../java/org/apache/fluo/core/util/ScanUtil.java   | 113 ++++++++++++++------
 .../fluo/core/util/NotificationScannerTest.java    | 114 +++++++++++++++++++++
 6 files changed, 318 insertions(+), 36 deletions(-)

diff --git a/.travis.yml b/.travis.yml
index b071db0..c996f5c 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -27,6 +27,6 @@ jdk:
   - openjdk8
 env:
   - ADDITIONAL_MAVEN_OPTS=
-  - ADDITIONAL_MAVEN_OPTS=-Daccumulo.version=1.8.1
+  - ADDITIONAL_MAVEN_OPTS=-Daccumulo.version=1.9.0
 script:
   - mvn clean verify javadoc:jar $ADDITIONAL_MAVEN_OPTS
diff --git a/modules/cluster/src/main/java/org/apache/fluo/cluster/runner/AppRunner.java b/modules/cluster/src/main/java/org/apache/fluo/cluster/runner/AppRunner.java
index 3ec090f..355e16b 100644
--- a/modules/cluster/src/main/java/org/apache/fluo/cluster/runner/AppRunner.java
+++ b/modules/cluster/src/main/java/org/apache/fluo/cluster/runner/AppRunner.java
@@ -19,6 +19,7 @@ import java.io.IOException;
 import java.lang.reflect.Method;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.EnumSet;
 import java.util.List;
 
 import javax.inject.Provider;
@@ -37,6 +38,7 @@ import org.apache.fluo.api.exceptions.FluoException;
 import org.apache.fluo.core.impl.Environment;
 import org.apache.fluo.core.impl.Notification;
 import org.apache.fluo.core.util.ScanUtil;
+import org.apache.fluo.core.util.ScanUtil.ScanFlags;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -233,8 +235,13 @@ public abstract class AppRunner {
     }
 
     public ScanUtil.ScanOpts getScanOpts() {
-      return new ScanUtil.ScanOpts(startRow, endRow, columns, exactRow, rowPrefix, help,
-          hexEncNonAscii, scanAccumuloTable, false);
+      EnumSet<ScanFlags> flags = EnumSet.noneOf(ScanFlags.class);
+
+      ScanUtil.setFlag(flags, help, ScanFlags.HELP);
+      ScanUtil.setFlag(flags, hexEncNonAscii, ScanFlags.HEX);
+      ScanUtil.setFlag(flags, scanAccumuloTable, ScanFlags.ACCUMULO);
+
+      return new ScanUtil.ScanOpts(startRow, endRow, columns, exactRow, rowPrefix, flags);
     }
   }
 }
diff --git a/modules/command/src/main/java/org/apache/fluo/command/FluoScan.java b/modules/command/src/main/java/org/apache/fluo/command/FluoScan.java
index bcd03c0..3844e4d 100644
--- a/modules/command/src/main/java/org/apache/fluo/command/FluoScan.java
+++ b/modules/command/src/main/java/org/apache/fluo/command/FluoScan.java
@@ -17,12 +17,14 @@ package org.apache.fluo.command;
 
 import java.io.IOException;
 import java.util.Collections;
+import java.util.EnumSet;
 import java.util.List;
 
 import com.beust.jcommander.Parameter;
 import org.apache.fluo.api.config.FluoConfiguration;
 import org.apache.fluo.core.client.FluoAdminImpl;
 import org.apache.fluo.core.util.ScanUtil;
+import org.apache.fluo.core.util.ScanUtil.ScanFlags;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 
@@ -59,6 +61,9 @@ public class FluoScan {
         description = "Export key/values stored in Accumulo as JSON file.")
     public boolean exportAsJson = false;
 
+    @Parameter(names = "--ntfy", help = true, description = "Scan active notifications")
+    public boolean scanNtfy = false;
+
     public String getStartRow() {
       return startRow;
     }
@@ -90,11 +95,23 @@ public class FluoScan {
         throw new IllegalArgumentException(
             "Both \"--raw\" and \"--json\" can not be set together.");
       }
+
+      if (this.scanAccumuloTable && this.scanNtfy) {
+        throw new IllegalArgumentException(
+            "Both \"--raw\" and \"--ntfy\" can not be set together.");
+      }
     }
 
     public ScanUtil.ScanOpts getScanOpts() {
-      return new ScanUtil.ScanOpts(startRow, endRow, columns, exactRow, rowPrefix, help,
-          hexEncNonAscii, scanAccumuloTable, exportAsJson);
+      EnumSet<ScanFlags> flags = EnumSet.noneOf(ScanFlags.class);
+
+      ScanUtil.setFlag(flags, help, ScanFlags.HELP);
+      ScanUtil.setFlag(flags, hexEncNonAscii, ScanFlags.HEX);
+      ScanUtil.setFlag(flags, scanAccumuloTable, ScanFlags.ACCUMULO);
+      ScanUtil.setFlag(flags, exportAsJson, ScanFlags.JSON);
+      ScanUtil.setFlag(flags, scanNtfy, ScanFlags.NTFY);
+
+      return new ScanUtil.ScanOpts(startRow, endRow, columns, exactRow, rowPrefix, flags);
     }
 
     public static ScanOptions parse(String[] args) {
@@ -114,14 +131,17 @@ public class FluoScan {
     FluoConfiguration config = CommandUtil.resolveFluoConfig();
     config.setApplicationName(options.getApplicationName());
     options.overrideFluoConfig(config);
-    CommandUtil.verifyAppRunning(config);
 
     try {
       options.overrideFluoConfig(config);
       if (options.scanAccumuloTable) {
         config = FluoAdminImpl.mergeZookeeperConfig(config);
         ScanUtil.scanAccumulo(options.getScanOpts(), config, System.out);
+      } else if (options.scanNtfy) {
+        config = FluoAdminImpl.mergeZookeeperConfig(config);
+        ScanUtil.scanNotifications(options.getScanOpts(), config, System.out);
       } else {
+        CommandUtil.verifyAppRunning(config);
         ScanUtil.scanFluo(options.getScanOpts(), config, System.out);
       }
     } catch (RuntimeException | IOException e) {
diff --git a/modules/core/src/main/java/org/apache/fluo/core/util/NotificationScanner.java b/modules/core/src/main/java/org/apache/fluo/core/util/NotificationScanner.java
new file mode 100644
index 0000000..04afc13
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/core/util/NotificationScanner.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.core.util;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.function.Predicate;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Iterators;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.fluo.api.client.scanner.CellScanner;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.data.RowColumnValue;
+import org.apache.fluo.core.impl.Notification;
+
+import static java.util.stream.Collectors.toSet;
+
+public class NotificationScanner implements CellScanner {
+
+  private Iterable<Entry<Key, Value>> scanner;
+  private Predicate<RowColumnValue> filter;
+
+  private static Predicate<RowColumnValue> createColumnFilter(Collection<Column> allColumns) {
+    if (allColumns.size() == 0) {
+      return rcv -> true;
+    } else {
+      Set<Bytes> families = allColumns.stream().filter(col -> !col.isQualifierSet())
+          .map(col -> col.getFamily()).collect(toSet());
+      Set<Column> columns =
+          allColumns.stream().filter(col -> col.isQualifierSet()).collect(toSet());
+
+      if (families.size() == 0) {
+        return rcv -> columns.contains(rcv.getColumn());
+      } else if (columns.size() == 0) {
+        return rcv -> families.contains(rcv.getColumn().getFamily());
+      } else {
+        return rcv -> families.contains(rcv.getColumn().getFamily())
+            || columns.contains(rcv.getColumn());
+      }
+    }
+  }
+
+  NotificationScanner(Scanner scanner, Collection<Column> columns) {
+    this(scanner, createColumnFilter(columns));
+  }
+
+  NotificationScanner(Scanner scanner, Predicate<RowColumnValue> filter) {
+    scanner.clearColumns();
+    Notification.configureScanner(scanner);
+    this.scanner = scanner;
+    this.filter = filter;
+  }
+
+  @VisibleForTesting
+  NotificationScanner(Iterable<Entry<Key, Value>> scanner, Collection<Column> columns) {
+    this.scanner = scanner;
+    this.filter = createColumnFilter(columns);
+  }
+
+  @Override
+  public Iterator<RowColumnValue> iterator() {
+    Iterator<RowColumnValue> iter = Iterators.transform(scanner.iterator(), entry -> {
+      Notification n = Notification.from(entry.getKey());
+      return new RowColumnValue(n.getRow(), n.getColumn(), Bytes.of(entry.getValue().get()));
+    });
+
+    return Iterators.filter(iter, rcv -> filter.test(rcv));
+  }
+}
diff --git a/modules/core/src/main/java/org/apache/fluo/core/util/ScanUtil.java b/modules/core/src/main/java/org/apache/fluo/core/util/ScanUtil.java
index 3cb9d49..a6c8e68 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/util/ScanUtil.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/util/ScanUtil.java
@@ -20,6 +20,7 @@ import java.io.PrintStream;
 import java.text.DateFormat;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.EnumSet;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -33,6 +34,7 @@ import com.google.gson.GsonBuilder;
 import com.google.gson.JsonIOException;
 import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.fluo.accumulo.format.FluoFormatter;
 import org.apache.fluo.api.client.FluoClient;
@@ -112,6 +114,31 @@ public class ScanUtil {
     }
   }
 
+
+  private static void scan(ScanOpts options, PrintStream out, CellScanner cellScanner) {
+    Function<Bytes, String> encoder = getEncoder(options);
+
+    if (options.exportAsJson) {
+      generateJson(cellScanner, encoder, out);
+    } else {
+      for (RowColumnValue rcv : cellScanner) {
+        out.print(encoder.apply(rcv.getRow()));
+        out.print(' ');
+        out.print(encoder.apply(rcv.getColumn().getFamily()));
+        out.print(' ');
+        out.print(encoder.apply(rcv.getColumn().getQualifier()));
+        out.print(' ');
+        out.print(encoder.apply(rcv.getColumn().getVisibility()));
+        out.print("\t");
+        out.print(encoder.apply(rcv.getValue()));
+        out.println();
+        if (out.checkError()) {
+          break;
+        }
+      }
+    }
+  }
+
   public static void scanFluo(ScanOpts options, FluoConfiguration sConfig, PrintStream out)
       throws IOException {
 
@@ -121,27 +148,34 @@ public class ScanUtil {
         Span span = getSpan(options);
         Collection<Column> columns = getColumns(options);
         CellScanner cellScanner = s.scanner().over(span).fetch(columns).build();
-        Function<Bytes, String> encoder = getEncoder(options);
 
-        if (options.exportAsJson) {
-          generateJson(cellScanner, encoder, out);
-        } else {
-          for (RowColumnValue rcv : cellScanner) {
-            out.print(encoder.apply(rcv.getRow()));
-            out.print(' ');
-            out.print(encoder.apply(rcv.getColumn().getFamily()));
-            out.print(' ');
-            out.print(encoder.apply(rcv.getColumn().getQualifier()));
-            out.print(' ');
-            out.print(encoder.apply(rcv.getColumn().getVisibility()));
-            out.print("\t");
-            out.print(encoder.apply(rcv.getValue()));
-            out.println();
-            if (out.checkError()) {
-              break;
-            }
-          }
-        }
+        scan(options, out, cellScanner);
+      }
+    }
+  }
+
+  public static void scanNotifications(ScanOpts options, FluoConfiguration sConfig, PrintStream out)
+      throws IOException {
+
+    Connector conn = AccumuloUtil.getConnector(sConfig);
+
+    Span span = getSpan(options);
+    Collection<Column> columns = getColumns(options);
+
+    Scanner scanner = null;
+    try {
+      scanner = conn.createScanner(sConfig.getAccumuloTable(), Authorizations.EMPTY);
+
+      scanner.setRange(SpanUtil.toRange(span));
+
+      NotificationScanner ntfyScanner = new NotificationScanner(scanner, columns);
+
+      scan(options, out, ntfyScanner);
+    } catch (TableNotFoundException e) {
+      throw new RuntimeException(e);
+    } finally {
+      if (scanner != null) {
+        scanner.close();
       }
     }
   }
@@ -202,6 +236,18 @@ public class ScanUtil {
     }
   }
 
+  public static enum ScanFlags {
+    HELP,
+    // hex encode node ascii
+    HEX,
+    // scan accumuo table directly
+    ACCUMULO,
+    // endode output as json
+    JSON,
+    // scan notification
+    NTFY
+  }
+
   public static class ScanOpts {
 
     private String startRow;
@@ -209,23 +255,24 @@ public class ScanUtil {
     private List<String> columns;
     private String exactRow;
     private String rowPrefix;
-    public boolean help;
-    public boolean hexEncNonAscii = true;
-    public boolean scanAccumuloTable = false;
-    public boolean exportAsJson = false;
+    public final boolean help;
+    public final boolean hexEncNonAscii;
+    public final boolean scanAccumuloTable;
+    public final boolean exportAsJson;
+    public final boolean scanNtfy;
 
     public ScanOpts(String startRow, String endRow, List<String> columns, String exactRow,
-        String rowPrefix, boolean help, boolean hexEncNonAscii, boolean scanAccumuloTable,
-        boolean exportAsJson) {
+        String rowPrefix, EnumSet<ScanFlags> flags) {
       this.startRow = startRow;
       this.endRow = endRow;
       this.columns = columns;
       this.exactRow = exactRow;
       this.rowPrefix = rowPrefix;
-      this.help = help;
-      this.hexEncNonAscii = hexEncNonAscii;
-      this.scanAccumuloTable = scanAccumuloTable;
-      this.exportAsJson = exportAsJson;
+      this.help = flags.contains(ScanFlags.HELP);
+      this.hexEncNonAscii = flags.contains(ScanFlags.HEX);
+      this.scanAccumuloTable = flags.contains(ScanFlags.ACCUMULO);
+      this.exportAsJson = flags.contains(ScanFlags.JSON);
+      this.scanNtfy = flags.contains(ScanFlags.NTFY);
     }
 
     public String getStartRow() {
@@ -251,4 +298,10 @@ public class ScanUtil {
       return columns;
     }
   }
+
+  public static void setFlag(EnumSet<ScanFlags> flags, boolean b, ScanFlags flag) {
+    if (b) {
+      flags.add(flag);
+    }
+  }
 }
diff --git a/modules/core/src/test/java/org/apache/fluo/core/util/NotificationScannerTest.java b/modules/core/src/test/java/org/apache/fluo/core/util/NotificationScannerTest.java
new file mode 100644
index 0000000..0228243
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/fluo/core/util/NotificationScannerTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.core.util;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.TreeMap;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.fluo.accumulo.util.ColumnConstants;
+import org.apache.fluo.accumulo.util.NotificationUtil;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.data.RowColumnValue;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class NotificationScannerTest {
+
+  private static class Data implements Iterable<Entry<Key, Value>> {
+    TreeMap<Key, Value> data = new TreeMap<>();
+
+    void putNtfy(String row, String fam, String qual) {
+      byte[] r = row.getBytes(StandardCharsets.UTF_8);
+      byte[] f = ColumnConstants.NOTIFY_CF.toArray();
+      byte[] q = NotificationUtil.encodeCol(new Column(fam, qual));
+
+      data.put(new Key(r, f, q, new byte[0], 42L), new Value(new byte[0]));
+    }
+
+    @Override
+    public Iterator<Entry<Key, Value>> iterator() {
+      return data.entrySet().iterator();
+    }
+  }
+
+
+  /**
+   * When scanning notifications, column filtering is done on the client side. This test ensures
+   * that filtering works correctly.
+   */
+  @Test
+  public void testColumnFiltering() {
+
+    Data data = new Data();
+    data.putNtfy("r001", "f8", "q2");
+    data.putNtfy("r001", "f9", "q1");
+    data.putNtfy("r002", "f8", "q2");
+    data.putNtfy("r002", "f8", "q3");
+    data.putNtfy("r004", "f9", "q3");
+    data.putNtfy("r004", "f9", "q4");
+
+    HashSet<RowColumnValue> expected = new HashSet<>();
+    expected.add(new RowColumnValue("r001", new Column("f8", "q2"), ""));
+    expected.add(new RowColumnValue("r001", new Column("f9", "q1"), ""));
+    expected.add(new RowColumnValue("r002", new Column("f8", "q2"), ""));
+    expected.add(new RowColumnValue("r002", new Column("f8", "q3"), ""));
+    expected.add(new RowColumnValue("r004", new Column("f9", "q3"), ""));
+    expected.add(new RowColumnValue("r004", new Column("f9", "q4"), ""));
+
+    NotificationScanner scanner = new NotificationScanner(data, Collections.emptySet());
+    HashSet<RowColumnValue> actual = new HashSet<>();
+    scanner.forEach(actual::add);
+    Assert.assertEquals(expected, actual);
+
+    scanner = new NotificationScanner(data, Arrays.asList(new Column("f9")));
+    actual.clear();
+    scanner.forEach(actual::add);
+    HashSet<RowColumnValue> expected2 = new HashSet<>();
+    expected.stream().filter(rcv -> rcv.getColumn().getsFamily().equals("f9"))
+        .forEach(expected2::add);
+    Assert.assertEquals(expected2, actual);
+
+    scanner = new NotificationScanner(data, Arrays.asList(new Column("f9"), new Column("f8")));
+    actual.clear();
+    scanner.forEach(actual::add);
+    Assert.assertEquals(expected, actual);
+
+    scanner = new NotificationScanner(data, Arrays.asList(new Column("f9", "q1")));
+    actual.clear();
+    scanner.forEach(actual::add);
+    expected2.clear();
+    expected2.add(new RowColumnValue("r001", new Column("f9", "q1"), ""));
+    Assert.assertEquals(expected2, actual);
+
+    scanner =
+        new NotificationScanner(data, Arrays.asList(new Column("f9", "q1"), new Column("f8")));
+    actual.clear();
+    scanner.forEach(actual::add);
+    expected2.clear();
+    expected2.add(new RowColumnValue("r001", new Column("f9", "q1"), ""));
+    expected2.add(new RowColumnValue("r001", new Column("f8", "q2"), ""));
+    expected2.add(new RowColumnValue("r002", new Column("f8", "q2"), ""));
+    expected2.add(new RowColumnValue("r002", new Column("f8", "q3"), ""));
+    Assert.assertEquals(expected2, actual);
+  }
+}

-- 
To stop receiving notification emails like this one, please contact
kturner@apache.org.