You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@fluo.apache.org by ct...@apache.org on 2017/11/10 14:48:35 UTC

[fluo] 02/02: Use NewTableConfiguration when creating tables

This is an automated email from the ASF dual-hosted git repository.

ctubbsii pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/fluo.git

commit eb0889b5609292370b4569c576d2aee87c86f04b
Author: Christopher Tubbs <ct...@apache.org>
AuthorDate: Fri Nov 10 09:13:04 2017 -0500

    Use NewTableConfiguration when creating tables
    
    Use Accumulo's NewTableConfiguration when creating new application
    tables. This way, the initial table configuration, including locality
    group settings and compaction iterators, are already set on the table at
    the moment it is created, eliminating the possibility of race conditions
    related to setting configuration after the table is created.
    
    This change requires at least Accumulo 1.7.0.
    
    Travis-CI configuration was updated to test against the latest version
    of Accumulo (1.8.1) with the corresponding Thrift version (0.9.3)
    
    Include test to verify locality group serialization can be deserialized
    by Accumulo's public API, and adds some constants to refer to Accumulo
    properties and column family and locality group names.
---
 .travis.yml                                        |  4 +-
 .../apache/fluo/accumulo/util/AccumuloProps.java   |  7 ++-
 .../apache/fluo/accumulo/util/ColumnConstants.java |  6 +-
 .../org/apache/fluo/core/client/FluoAdminImpl.java | 72 +++++++++++++++-------
 .../fluo/integration/accumulo/TimeskippingIT.java  |  3 +-
 .../fluo/integration/client/FluoAdminImplIT.java   | 32 +++++++++-
 pom.xml                                            |  2 +-
 7 files changed, 95 insertions(+), 31 deletions(-)

diff --git a/.travis.yml b/.travis.yml
index b44275b..7f0dcf0 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -12,8 +12,8 @@
 # the License.
 language: java
 jdk:
-  - oraclejdk8
-script: mvn verify javadoc:jar
+  - openjdk8
+script: mvn clean verify javadoc:jar -Daccumulo.version=1.8.1 -Dthrift.version=0.9.3
 notifications:
   irc:
     channels:
diff --git a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/util/AccumuloProps.java b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/util/AccumuloProps.java
index e1966f0..c67b782 100644
--- a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/util/AccumuloProps.java
+++ b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/util/AccumuloProps.java
@@ -4,9 +4,9 @@
  * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance with the License. You may obtain a
  * copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software distributed under the License
  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
  * or implied. See the License for the specific language governing permissions and limitations under
@@ -21,5 +21,8 @@ public class AccumuloProps {
   public static final String TABLE_CLASSPATH = "table.classpath.context";
   public static final String TABLE_BLOCKCACHE_ENABLED = "table.cache.block.enable";
   public static final String TABLE_FORMATTER_CLASS = "table.formatter";
+  public static final String TABLE_GROUP_PREFIX = "table.group.";
+  public static final String TABLE_GROUPS_ENABLED = "table.groups.enabled";
+  public static final Object TABLE_ITERATOR_PREFIX = "table.iterator.";
 
 }
diff --git a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/util/ColumnConstants.java b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/util/ColumnConstants.java
index c38b9e6..7063adc 100644
--- a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/util/ColumnConstants.java
+++ b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/util/ColumnConstants.java
@@ -4,9 +4,9 @@
  * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance with the License. You may obtain a
  * copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software distributed under the License
  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
  * or implied. See the License for the specific language governing permissions and limitations under
@@ -32,6 +32,8 @@ public class ColumnConstants {
   public static final long DATA_PREFIX = 0xa000000000000000L;
   public static final long TIMESTAMP_MASK = 0x1fffffffffffffffL;
   public static final Bytes NOTIFY_CF = Bytes.of("ntfy");
+  public static final String NOTIFY_LOCALITY_GROUP_NAME = "notify";
+  public static final Bytes GC_CF = Bytes.of("gc");
 
   private ColumnConstants() {}
 
diff --git a/modules/core/src/main/java/org/apache/fluo/core/client/FluoAdminImpl.java b/modules/core/src/main/java/org/apache/fluo/core/client/FluoAdminImpl.java
index fc2701f..158b7d1 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/client/FluoAdminImpl.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/client/FluoAdminImpl.java
@@ -27,8 +27,8 @@ import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Properties;
-import java.util.Set;
 import java.util.UUID;
 import java.util.regex.Pattern;
 
@@ -36,7 +36,9 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.admin.NewTableConfiguration;
 import org.apache.accumulo.core.iterators.IteratorUtil;
+import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.fluo.accumulo.iterators.GarbageCollectionIterator;
 import org.apache.fluo.accumulo.iterators.NotificationIterator;
@@ -47,6 +49,7 @@ import org.apache.fluo.accumulo.util.ZookeeperUtil;
 import org.apache.fluo.api.client.FluoAdmin;
 import org.apache.fluo.api.config.FluoConfiguration;
 import org.apache.fluo.api.config.SimpleConfiguration;
+import org.apache.fluo.api.data.Bytes;
 import org.apache.fluo.api.exceptions.FluoException;
 import org.apache.fluo.core.impl.FluoConfigurationImpl;
 import org.apache.fluo.core.observer.ObserverUtil;
@@ -56,7 +59,6 @@ import org.apache.fluo.core.worker.finder.hash.PartitionManager;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.NodeExistsException;
 import org.slf4j.Logger;
@@ -77,7 +79,6 @@ public class FluoAdminImpl implements FluoAdmin {
   public FluoAdminImpl(FluoConfiguration config) {
     this.config = config;
 
-
     appRootDir = ZookeeperUtil.parseRoot(config.getAppZookeepers());
     rootCurator = CuratorUtil.newRootFluoCurator(config);
     rootCurator.start();
@@ -145,7 +146,8 @@ public class FluoAdminImpl implements FluoAdmin {
     }
 
     try {
-      initialize(conn);
+      initializeApplicationInZooKeeper(conn);
+      Map<String, String> ntcProps = initializeApplicationTableProps();
 
       String accumuloJars;
       if (!config.getAccumuloJars().trim().isEmpty()) {
@@ -167,8 +169,7 @@ public class FluoAdminImpl implements FluoAdmin {
         String contextName = "fluo-" + config.getApplicationName();
         conn.instanceOperations().setProperty(
             AccumuloProps.VFS_CONTEXT_CLASSPATH_PROPERTY + contextName, accumuloClasspath);
-        conn.tableOperations().setProperty(config.getAccumuloTable(), AccumuloProps.TABLE_CLASSPATH,
-            contextName);
+        ntcProps.put(AccumuloProps.TABLE_CLASSPATH, contextName);
       }
 
       if (config.getObserverJarsUrl().isEmpty() && !config.getObserverInitDir().trim().isEmpty()) {
@@ -176,8 +177,11 @@ public class FluoAdminImpl implements FluoAdmin {
         config.setObserverJarsUrl(observerUrl);
       }
 
-      conn.tableOperations().setProperty(config.getAccumuloTable(),
-          AccumuloProps.TABLE_BLOCKCACHE_ENABLED, "true");
+      ntcProps.put(AccumuloProps.TABLE_BLOCKCACHE_ENABLED, "true");
+
+      NewTableConfiguration ntc = new NewTableConfiguration().withoutDefaultIterators();
+      ntc.setProperties(ntcProps);
+      conn.tableOperations().create(config.getAccumuloTable(), ntc);
 
       updateSharedConfig();
     } catch (NodeExistsException nee) {
@@ -190,7 +194,7 @@ public class FluoAdminImpl implements FluoAdmin {
     }
   }
 
-  private void initialize(Connector conn) throws Exception {
+  private void initializeApplicationInZooKeeper(Connector conn) throws Exception {
 
     final String accumuloInstanceName = conn.getInstance().getInstanceName();
     final String accumuloInstanceID = conn.getInstance().getInstanceID();
@@ -221,23 +225,49 @@ public class FluoAdminImpl implements FluoAdmin {
         CuratorUtil.NodeExistsPolicy.FAIL);
     CuratorUtil.putData(curator, ZookeeperPath.ORACLE_GC_TIMESTAMP, new byte[] {'0'},
         CuratorUtil.NodeExistsPolicy.FAIL);
+  }
 
-    conn.tableOperations().create(config.getAccumuloTable(), false);
-    Map<String, Set<Text>> groups = new HashMap<>();
-    groups.put("notify", Collections.singleton(ByteUtil.toText(ColumnConstants.NOTIFY_CF)));
-    conn.tableOperations().setLocalityGroups(config.getAccumuloTable(), groups);
-
-    IteratorSetting gcIter = new IteratorSetting(10, "gc", GarbageCollectionIterator.class);
-    GarbageCollectionIterator.setZookeepers(gcIter, config.getAppZookeepers());
+  private String encodeColumnFamily(Bytes cf) {
+    StringBuilder sb = new StringBuilder();
+    for (int i = 0; i < cf.length(); i++) {
+      int c = 0xff & cf.byteAt(i);
+      if (c == '\\') {
+        sb.append("\\\\");
+      } else if (c >= 32 && c <= 126 && c != ',') {
+        sb.append((char) c);
+      } else {
+        sb.append("\\x").append(String.format("%02X", c));
+      }
+    }
+    return sb.toString();
+  }
 
-    conn.tableOperations().attachIterator(config.getAccumuloTable(), gcIter,
-        EnumSet.of(IteratorUtil.IteratorScope.majc, IteratorUtil.IteratorScope.minc));
+  private Map<String, String> initializeApplicationTableProps() {
+    Map<String, String> ntcProps = new HashMap<>();
+    ntcProps.put(AccumuloProps.TABLE_GROUP_PREFIX + ColumnConstants.NOTIFY_LOCALITY_GROUP_NAME,
+        encodeColumnFamily(ColumnConstants.NOTIFY_CF));
+    ntcProps.put(AccumuloProps.TABLE_GROUPS_ENABLED, ColumnConstants.NOTIFY_LOCALITY_GROUP_NAME);
 
+    IteratorSetting gcIter =
+        new IteratorSetting(10, ColumnConstants.GC_CF.toString(), GarbageCollectionIterator.class);
+    GarbageCollectionIterator.setZookeepers(gcIter, config.getAppZookeepers());
     // the order relative to gc iter should not matter
-    IteratorSetting ntfyIter = new IteratorSetting(11, "ntfy", NotificationIterator.class);
+    IteratorSetting ntfyIter =
+        new IteratorSetting(11, ColumnConstants.NOTIFY_CF.toString(), NotificationIterator.class);
+
+    for (IteratorSetting setting : new IteratorSetting[] {gcIter, ntfyIter}) {
+      for (IteratorScope scope : EnumSet.of(IteratorUtil.IteratorScope.majc,
+          IteratorUtil.IteratorScope.minc)) {
+        String root = String.format("%s%s.%s", AccumuloProps.TABLE_ITERATOR_PREFIX,
+            scope.name().toLowerCase(), setting.getName());
+        for (Entry<String, String> prop : setting.getOptions().entrySet()) {
+          ntcProps.put(root + ".opt." + prop.getKey(), prop.getValue());
+        }
+        ntcProps.put(root, setting.getPriority() + "," + setting.getIteratorClass());
+      }
+    }
 
-    conn.tableOperations().attachIterator(config.getAccumuloTable(), ntfyIter,
-        EnumSet.of(IteratorUtil.IteratorScope.majc, IteratorUtil.IteratorScope.minc));
+    return ntcProps;
   }
 
   @Override
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/accumulo/TimeskippingIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/accumulo/TimeskippingIT.java
index eaecc29..9aeca79 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/accumulo/TimeskippingIT.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/accumulo/TimeskippingIT.java
@@ -20,6 +20,7 @@ import org.apache.accumulo.core.client.BatchWriter;
 import org.apache.accumulo.core.client.BatchWriterConfig;
 import org.apache.accumulo.core.client.IteratorSetting;
 import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.admin.NewTableConfiguration;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.fluo.integration.ITBase;
@@ -39,7 +40,7 @@ public class TimeskippingIT extends ITBase {
   @Test
   public void testTimestampSkippingIterPerformance() throws Exception {
 
-    conn.tableOperations().create("ttsi", false);
+    conn.tableOperations().create("ttsi", new NewTableConfiguration().withoutDefaultIterators());
 
     BatchWriter bw = conn.createBatchWriter("ttsi", new BatchWriterConfig());
     Mutation m = new Mutation("r1");
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/client/FluoAdminImplIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/client/FluoAdminImplIT.java
index 4a6a5cf..02b8edc 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/client/FluoAdminImplIT.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/client/FluoAdminImplIT.java
@@ -4,9 +4,9 @@
  * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance with the License. You may obtain a
  * copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software distributed under the License
  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
  * or implied. See the License for the specific language governing permissions and limitations under
@@ -15,7 +15,16 @@
 
 package org.apache.fluo.integration.client;
 
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
 import org.apache.curator.framework.CuratorFramework;
+import org.apache.fluo.accumulo.util.ColumnConstants;
 import org.apache.fluo.accumulo.util.ZookeeperUtil;
 import org.apache.fluo.api.client.FluoAdmin;
 import org.apache.fluo.api.client.FluoAdmin.AlreadyInitializedException;
@@ -26,6 +35,7 @@ import org.apache.fluo.core.client.FluoAdminImpl;
 import org.apache.fluo.core.client.FluoClientImpl;
 import org.apache.fluo.core.util.CuratorUtil;
 import org.apache.fluo.integration.ITBaseImpl;
+import org.apache.hadoop.io.Text;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
@@ -92,6 +102,23 @@ public class FluoAdminImplIT extends ITBaseImpl {
       InitializationOptions opts =
           new InitializationOptions().setClearZookeeper(true).setClearTable(true);
       admin.initialize(opts);
+
+      // verify locality groups were set on the table
+      Instance inst =
+          new ZooKeeperInstance(config.getAccumuloInstance(), config.getAccumuloZookeepers());
+      Connector conn = inst.getConnector(config.getAccumuloUser(),
+          new PasswordToken(config.getAccumuloPassword()));
+      Map<String, Set<Text>> localityGroups =
+          conn.tableOperations().getLocalityGroups(config.getAccumuloTable());
+      Assert.assertEquals("Unexpected locality group count.", 1, localityGroups.size());
+      Entry<String, Set<Text>> localityGroup = localityGroups.entrySet().iterator().next();
+      Assert.assertEquals("'notify' locality group not found.",
+          ColumnConstants.NOTIFY_LOCALITY_GROUP_NAME, localityGroup.getKey());
+      Assert.assertEquals("'notify' locality group does not contain exactly 1 column family.", 1,
+          localityGroup.getValue().size());
+      Text colFam = localityGroup.getValue().iterator().next();
+      Assert.assertTrue("'notify' locality group does not contain the correct column family.",
+          ColumnConstants.NOTIFY_CF.contentEquals(colFam.getBytes(), 0, colFam.getLength()));
     }
 
     try (FluoClientImpl client = new FluoClientImpl(localConfig)) {
@@ -156,4 +183,5 @@ public class FluoAdminImplIT extends ITBaseImpl {
       Assert.assertNotNull(curator.checkExists().forPath(ZookeeperUtil.parseRoot(zk + longPath)));
     }
   }
+
 }
diff --git a/pom.xml b/pom.xml
index 05b3afe..ee9021e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -54,7 +54,7 @@
     <url>https://github.com/apache/fluo/issues</url>
   </issueManagement>
   <properties>
-    <accumulo.version>1.6.5</accumulo.version>
+    <accumulo.version>1.7.3</accumulo.version>
     <curator.version>2.7.1</curator.version>
     <dropwizard.version>0.8.1</dropwizard.version>
     <findbugs.maxRank>11</findbugs.maxRank>

-- 
To stop receiving notification emails like this one, please contact
"commits@fluo.apache.org" <co...@fluo.apache.org>.