You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@fluo.apache.org by kt...@apache.org on 2016/07/15 22:07:53 UTC
[05/10] incubator-fluo-recipes git commit: Updated package names in
core module
http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/main/java/org/apache/fluo/recipes/core/types/TypedSnapshotBase.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/core/types/TypedSnapshotBase.java b/modules/core/src/main/java/org/apache/fluo/recipes/core/types/TypedSnapshotBase.java
new file mode 100644
index 0000000..7764e67
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/recipes/core/types/TypedSnapshotBase.java
@@ -0,0 +1,555 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.core.types;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import com.google.common.collect.Maps;
+import org.apache.commons.collections.map.DefaultedMap;
+import org.apache.fluo.api.client.SnapshotBase;
+import org.apache.fluo.api.config.ScannerConfiguration;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.data.RowColumn;
+import org.apache.fluo.api.iterator.RowIterator;
+import org.apache.fluo.recipes.core.types.TypeLayer.Data;
+import org.apache.fluo.recipes.core.types.TypeLayer.FamilyMethods;
+import org.apache.fluo.recipes.core.types.TypeLayer.QualifierMethods;
+import org.apache.fluo.recipes.core.types.TypeLayer.RowMethods;
+
+// TODO need to refactor column to use Encoder
+
+/**
+ * A {@link SnapshotBase} that uses a {@link TypeLayer}
+ *
+ * @since 1.0.0
+ */
+public class TypedSnapshotBase implements SnapshotBase {
+
+ private SnapshotBase snapshot;
+ private Encoder encoder;
+ private TypeLayer tl;
+
+ /**
+ * @since 1.0.0
+ */
+ public class VisibilityMethods extends Value {
+
+ VisibilityMethods(Data data) {
+ super(data);
+ }
+
+ public Value vis(Bytes cv) {
+ data.vis = cv;
+ return new Value(data);
+ }
+
+ public Value vis(byte[] cv) {
+ data.vis = Bytes.of(cv);
+ return new Value(data);
+ }
+
+ public Value vis(ByteBuffer bb) {
+ data.vis = Bytes.of(bb);
+ return new Value(data);
+ }
+
+ public Value vis(String cv) {
+ data.vis = Bytes.of(cv);
+ return new Value(data);
+ }
+ }
+
+ /**
+ * @since 1.0.0
+ */
+ public class Value {
+ private Bytes bytes;
+ private boolean gotBytes = false;
+ Data data;
+
+ public Bytes getBytes() {
+ if (!gotBytes) {
+ try {
+ bytes = snapshot.get(data.row, data.getCol());
+ gotBytes = true;
+ } catch (Exception e) {
+ if (e instanceof RuntimeException) {
+ throw (RuntimeException) e;
+ }
+ throw new RuntimeException(e);
+ }
+ }
+
+ return bytes;
+ }
+
+ private Value(Bytes bytes) {
+ this.bytes = bytes;
+ this.gotBytes = true;
+ }
+
+ private Value(Data data) {
+ this.data = data;
+ this.gotBytes = false;
+ }
+
+ public Integer toInteger() {
+ if (getBytes() == null) {
+ return null;
+ }
+ return encoder.decodeInteger(getBytes());
+ }
+
+ public int toInteger(int defaultValue) {
+ if (getBytes() == null) {
+ return defaultValue;
+ }
+ return encoder.decodeInteger(getBytes());
+ }
+
+ public Long toLong() {
+ if (getBytes() == null) {
+ return null;
+ }
+ return encoder.decodeLong(getBytes());
+ }
+
+ public long toLong(long defaultValue) {
+ if (getBytes() == null) {
+ return defaultValue;
+ }
+ return encoder.decodeLong(getBytes());
+ }
+
+ @Override
+ public String toString() {
+ if (getBytes() == null) {
+ return null;
+ }
+ return encoder.decodeString(getBytes());
+ }
+
+ public String toString(String defaultValue) {
+ if (getBytes() == null) {
+ return defaultValue;
+ }
+ return encoder.decodeString(getBytes());
+ }
+
+ public Float toFloat() {
+ if (getBytes() == null) {
+ return null;
+ }
+ return encoder.decodeFloat(getBytes());
+ }
+
+ public float toFloat(float defaultValue) {
+ if (getBytes() == null) {
+ return defaultValue;
+ }
+ return encoder.decodeFloat(getBytes());
+ }
+
+ public Double toDouble() {
+ if (getBytes() == null) {
+ return null;
+ }
+ return encoder.decodeDouble(getBytes());
+ }
+
+ public double toDouble(double defaultValue) {
+ if (getBytes() == null) {
+ return defaultValue;
+ }
+ return encoder.decodeDouble(getBytes());
+ }
+
+ public Boolean toBoolean() {
+ if (getBytes() == null) {
+ return null;
+ }
+ return encoder.decodeBoolean(getBytes());
+ }
+
+ public boolean toBoolean(boolean defaultValue) {
+ if (getBytes() == null) {
+ return defaultValue;
+ }
+ return encoder.decodeBoolean(getBytes());
+ }
+
+ public byte[] toBytes() {
+ if (getBytes() == null) {
+ return null;
+ }
+ return getBytes().toArray();
+ }
+
+ public byte[] toBytes(byte[] defaultValue) {
+ if (getBytes() == null) {
+ return defaultValue;
+ }
+ return getBytes().toArray();
+ }
+
+ public ByteBuffer toByteBuffer() {
+ if (getBytes() == null) {
+ return null;
+ }
+ return ByteBuffer.wrap(getBytes().toArray());
+ }
+
+ public ByteBuffer toByteBuffer(ByteBuffer defaultValue) {
+ if (getBytes() == null) {
+ return defaultValue;
+ }
+ return toByteBuffer();
+ }
+
+ @Override
+ public int hashCode() {
+ if (getBytes() == null) {
+ return 0;
+ }
+
+ return getBytes().hashCode();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o instanceof Value) {
+ Value ov = (Value) o;
+ if (getBytes() == null) {
+ return ov.getBytes() == null;
+ } else {
+ return getBytes().equals(ov.getBytes());
+ }
+ }
+
+ return false;
+ }
+ }
+
+ /**
+ * @since 1.0.0
+ */
+ public class ValueQualifierBuilder extends QualifierMethods<VisibilityMethods> {
+
+ ValueQualifierBuilder(Data data) {
+ tl.super(data);
+ }
+
+ @Override
+ VisibilityMethods create(Data data) {
+ return new VisibilityMethods(data);
+ }
+ }
+
+ /**
+ * @since 1.0.0
+ */
+ public class ValueFamilyMethods extends FamilyMethods<ValueQualifierBuilder, Value> {
+
+ ValueFamilyMethods(Data data) {
+ tl.super(data);
+ }
+
+ @Override
+ ValueQualifierBuilder create1(Data data) {
+ return new ValueQualifierBuilder(data);
+ }
+
+ @Override
+ Value create2(Data data) {
+ return new Value(data);
+ }
+
+ public Map<Column, Value> columns(Set<Column> columns) {
+ try {
+ return wrap(snapshot.get(data.row, columns));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public Map<Column, Value> columns(Column... columns) {
+ try {
+ return wrap(snapshot.get(data.row, new HashSet<>(Arrays.asList(columns))));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ /**
+ * @since 1.0.0
+ */
+ public class MapConverter {
+ private Collection<Bytes> rows;
+ private Set<Column> columns;
+
+ public MapConverter(Collection<Bytes> rows, Set<Column> columns) {
+ this.rows = rows;
+ this.columns = columns;
+ }
+
+ private Map<Bytes, Map<Column, Bytes>> getInput() {
+ try {
+ return snapshot.get(rows, columns);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ private Map wrap2(Map m) {
+ return Collections.unmodifiableMap(DefaultedMap.decorate(m, new DefaultedMap(new Value(
+ (Bytes) null))));
+ }
+
+ @SuppressWarnings("unchecked")
+ public Map<String, Map<Column, Value>> toStringMap() {
+ Map<Bytes, Map<Column, Bytes>> in = getInput();
+ Map<String, Map<Column, Value>> out = new HashMap<>();
+
+ for (Entry<Bytes, Map<Column, Bytes>> rowEntry : in.entrySet()) {
+ out.put(encoder.decodeString(rowEntry.getKey()), wrap(rowEntry.getValue()));
+ }
+
+ return wrap2(out);
+ }
+
+ @SuppressWarnings("unchecked")
+ public Map<Long, Map<Column, Value>> toLongMap() {
+ Map<Bytes, Map<Column, Bytes>> in = getInput();
+ Map<Long, Map<Column, Value>> out = new HashMap<>();
+
+ for (Entry<Bytes, Map<Column, Bytes>> rowEntry : in.entrySet()) {
+ out.put(encoder.decodeLong(rowEntry.getKey()), wrap(rowEntry.getValue()));
+ }
+
+ return wrap2(out);
+ }
+
+ @SuppressWarnings("unchecked")
+ public Map<Integer, Map<Column, Value>> toIntegerMap() {
+ Map<Bytes, Map<Column, Bytes>> in = getInput();
+ Map<Integer, Map<Column, Value>> out = new HashMap<>();
+
+ for (Entry<Bytes, Map<Column, Bytes>> rowEntry : in.entrySet()) {
+ out.put(encoder.decodeInteger(rowEntry.getKey()), wrap(rowEntry.getValue()));
+ }
+
+ return wrap2(out);
+ }
+
+ @SuppressWarnings("unchecked")
+ public Map<Bytes, Map<Column, Value>> toBytesMap() {
+ Map<Bytes, Map<Column, Bytes>> in = getInput();
+ Map<Bytes, Map<Column, Value>> out = new HashMap<>();
+
+ for (Entry<Bytes, Map<Column, Bytes>> rowEntry : in.entrySet()) {
+ out.put(rowEntry.getKey(), wrap(rowEntry.getValue()));
+ }
+
+ return wrap2(out);
+ }
+ }
+
+ /**
+ * @since 1.0.0
+ */
+ public class ColumnsMethods {
+ private Collection<Bytes> rows;
+
+ public ColumnsMethods(Collection<Bytes> rows) {
+ this.rows = rows;
+ }
+
+ public MapConverter columns(Set<Column> columns) {
+ return new MapConverter(rows, columns);
+ }
+
+ public MapConverter columns(Column... columns) {
+ return columns(new HashSet<>(Arrays.asList(columns)));
+ }
+ }
+
+ /**
+ * @since 1.0.0
+ */
+ public class ValueRowMethods extends RowMethods<ValueFamilyMethods> {
+
+ ValueRowMethods() {
+ tl.super();
+ }
+
+ @Override
+ ValueFamilyMethods create(Data data) {
+ return new ValueFamilyMethods(data);
+ }
+
+ public ColumnsMethods rows(Collection<Bytes> rows) {
+ return new ColumnsMethods(rows);
+ }
+
+ public ColumnsMethods rows(Bytes... rows) {
+ return new ColumnsMethods(Arrays.asList(rows));
+ }
+
+ public ColumnsMethods rowsString(String... rows) {
+ return rowsString(Arrays.asList(rows));
+ }
+
+ public ColumnsMethods rowsString(Collection<String> rows) {
+ ArrayList<Bytes> conv = new ArrayList<>();
+ for (String row : rows) {
+ conv.add(encoder.encode(row));
+ }
+
+ return rows(conv);
+ }
+
+ public ColumnsMethods rowsLong(Long... rows) {
+ return rowsLong(Arrays.asList(rows));
+ }
+
+ public ColumnsMethods rowsLong(Collection<Long> rows) {
+ ArrayList<Bytes> conv = new ArrayList<>();
+ for (Long row : rows) {
+ conv.add(encoder.encode(row));
+ }
+
+ return rows(conv);
+ }
+
+ public ColumnsMethods rowsInteger(Integer... rows) {
+ return rowsInteger(Arrays.asList(rows));
+ }
+
+ public ColumnsMethods rowsInteger(Collection<Integer> rows) {
+ ArrayList<Bytes> conv = new ArrayList<>();
+ for (Integer row : rows) {
+ conv.add(encoder.encode(row));
+ }
+
+ return rows(conv);
+ }
+
+ public ColumnsMethods rowsBytes(byte[]... rows) {
+ return rowsBytes(Arrays.asList(rows));
+ }
+
+ public ColumnsMethods rowsBytes(Collection<byte[]> rows) {
+ ArrayList<Bytes> conv = new ArrayList<>();
+ for (byte[] row : rows) {
+ conv.add(Bytes.of(row));
+ }
+
+ return rows(conv);
+ }
+
+ public ColumnsMethods rowsByteBuffers(ByteBuffer... rows) {
+ return rowsByteBuffers(Arrays.asList(rows));
+ }
+
+ public ColumnsMethods rowsByteBuffers(Collection<ByteBuffer> rows) {
+ ArrayList<Bytes> conv = new ArrayList<>();
+ for (ByteBuffer row : rows) {
+ conv.add(Bytes.of(row));
+ }
+
+ return rows(conv);
+ }
+
+ }
+
+ TypedSnapshotBase(SnapshotBase snapshot, Encoder encoder, TypeLayer tl) {
+ this.snapshot = snapshot;
+ this.encoder = encoder;
+ this.tl = tl;
+ }
+
+ @Override
+ public Bytes get(Bytes row, Column column) {
+ return snapshot.get(row, column);
+ }
+
+ @Override
+ public Map<Column, Bytes> get(Bytes row, Set<Column> columns) {
+ return snapshot.get(row, columns);
+ }
+
+ @Override
+ public Map<Bytes, Map<Column, Bytes>> get(Collection<RowColumn> rowColumns) {
+ return snapshot.get(rowColumns);
+ }
+
+ @Override
+ public RowIterator get(ScannerConfiguration config) {
+ return snapshot.get(config);
+ }
+
+ @Override
+ public Map<Bytes, Map<Column, Bytes>> get(Collection<Bytes> rows, Set<Column> columns) {
+ return snapshot.get(rows, columns);
+ }
+
+ public ValueRowMethods get() {
+ return new ValueRowMethods();
+ }
+
+ @SuppressWarnings({"unchecked"})
+ private Map<Column, Value> wrap(Map<Column, Bytes> map) {
+ Map<Column, Value> ret = Maps.transformValues(map, input -> new Value(input));
+ return Collections.unmodifiableMap(DefaultedMap.decorate(ret, new Value((Bytes) null)));
+ }
+
+ @Override
+ public long getStartTimestamp() {
+ return snapshot.getStartTimestamp();
+ }
+
+ @Override
+ public String gets(String row, Column column) {
+ return snapshot.gets(row, column);
+ }
+
+ @Override
+ public Map<Column, String> gets(String row, Set<Column> columns) {
+ return snapshot.gets(row, columns);
+ }
+
+ @Override
+ public Map<String, Map<Column, String>> gets(Collection<String> rows, Set<Column> columns) {
+ return snapshot.gets(rows, columns);
+ }
+
+ @Override
+ public Map<String, Map<Column, String>> gets(Collection<RowColumn> rowColumns) {
+ return snapshot.gets(rowColumns);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/main/java/org/apache/fluo/recipes/core/types/TypedTransaction.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/core/types/TypedTransaction.java b/modules/core/src/main/java/org/apache/fluo/recipes/core/types/TypedTransaction.java
new file mode 100644
index 0000000..17631e0
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/recipes/core/types/TypedTransaction.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.core.types;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.fluo.api.client.Transaction;
+import org.apache.fluo.api.exceptions.CommitException;
+
+/**
+ * A {@link Transaction} that uses a {@link TypeLayer}
+ *
+ * @since 1.0.0
+ */
+public class TypedTransaction extends TypedTransactionBase implements Transaction {
+
+ private final Transaction closeTx;
+
+ @VisibleForTesting
+ protected TypedTransaction(Transaction tx, Encoder encoder, TypeLayer tl) {
+ super(tx, encoder, tl);
+ closeTx = tx;
+ }
+
+ @Override
+ public void commit() throws CommitException {
+ closeTx.commit();
+ }
+
+ @Override
+ public void close() {
+ closeTx.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/main/java/org/apache/fluo/recipes/core/types/TypedTransactionBase.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/core/types/TypedTransactionBase.java b/modules/core/src/main/java/org/apache/fluo/recipes/core/types/TypedTransactionBase.java
new file mode 100644
index 0000000..69ec694
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/recipes/core/types/TypedTransactionBase.java
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.core.types;
+
+import java.nio.ByteBuffer;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.fluo.api.client.TransactionBase;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.exceptions.AlreadySetException;
+import org.apache.fluo.recipes.core.types.TypeLayer.Data;
+import org.apache.fluo.recipes.core.types.TypeLayer.FamilyMethods;
+import org.apache.fluo.recipes.core.types.TypeLayer.QualifierMethods;
+import org.apache.fluo.recipes.core.types.TypeLayer.RowMethods;
+
+/**
+ * A {@link TransactionBase} that uses a {@link TypeLayer}
+ *
+ * @since 1.0.0
+ */
+public class TypedTransactionBase extends TypedSnapshotBase implements TransactionBase {
+
+ private final TransactionBase tx;
+ private final Encoder encoder;
+ private final TypeLayer tl;
+
+ /**
+ * @since 1.0.0
+ */
+ public class Mutator {
+
+ private boolean set = false;
+ Data data;
+
+ Mutator(Data data) {
+ this.data = data;
+ }
+
+ void checkNotSet() {
+ if (set) {
+ throw new IllegalStateException("Already set value");
+ }
+ }
+
+ public void set(Bytes bytes) throws AlreadySetException {
+ checkNotSet();
+ tx.set(data.row, data.getCol(), bytes);
+ set = true;
+ }
+
+ public void set(String s) throws AlreadySetException {
+ set(encoder.encode(s));
+ }
+
+ public void set(int i) throws AlreadySetException {
+ set(encoder.encode(i));
+ }
+
+ public void set(long l) throws AlreadySetException {
+ set(encoder.encode(l));
+ }
+
+ public void set(float f) throws AlreadySetException {
+ set(encoder.encode(f));
+ }
+
+ public void set(double d) throws AlreadySetException {
+ set(encoder.encode(d));
+ }
+
+ public void set(boolean b) throws AlreadySetException {
+ set(encoder.encode(b));
+ }
+
+ public void set(byte[] ba) throws AlreadySetException {
+ set(Bytes.of(ba));
+ }
+
+ public void set(ByteBuffer bb) throws AlreadySetException {
+ set(Bytes.of(bb));
+ }
+
+ /**
+ * Set an empty value
+ */
+ public void set() throws AlreadySetException {
+ set(Bytes.EMPTY);
+ }
+
+ /**
+ * Reads the current value of the row/column, adds i, sets the sum. If the row/column does not
+ * have a current value, then it defaults to zero.
+ *
+ * @param i Integer increment amount
+ * @throws AlreadySetException if value was previously set in transaction
+ */
+ public void increment(int i) throws AlreadySetException {
+ checkNotSet();
+ Bytes val = tx.get(data.row, data.getCol());
+ int v = 0;
+ if (val != null) {
+ v = encoder.decodeInteger(val);
+ }
+ tx.set(data.row, data.getCol(), encoder.encode(v + i));
+ }
+
+ /**
+ * Reads the current value of the row/column, adds l, sets the sum. If the row/column does not
+ * have a current value, then it defaults to zero.
+ *
+ * @param l Long increment amount
+ * @throws AlreadySetException if value was previously set in transaction
+ */
+ public void increment(long l) throws AlreadySetException {
+ checkNotSet();
+ Bytes val = tx.get(data.row, data.getCol());
+ long v = 0;
+ if (val != null) {
+ v = encoder.decodeLong(val);
+ }
+ tx.set(data.row, data.getCol(), encoder.encode(v + l));
+ }
+
+ public void delete() throws AlreadySetException {
+ checkNotSet();
+ tx.delete(data.row, data.getCol());
+ set = true;
+ }
+
+ public void weaklyNotify() {
+ checkNotSet();
+ tx.setWeakNotification(data.row, data.getCol());
+ set = true;
+ }
+
+ }
+
+ /**
+ * @since 1.0.0
+ */
+ public class VisibilityMutator extends Mutator {
+
+ VisibilityMutator(Data data) {
+ super(data);
+ }
+
+ public Mutator vis(String cv) {
+ checkNotSet();
+ data.vis = Bytes.of(cv);
+ return new Mutator(data);
+ }
+
+ public Mutator vis(Bytes cv) {
+ checkNotSet();
+ data.vis = cv;
+ return new Mutator(data);
+ }
+
+ public Mutator vis(byte[] cv) {
+ checkNotSet();
+ data.vis = Bytes.of(cv);
+ return new Mutator(data);
+ }
+
+ public Mutator vis(ByteBuffer cv) {
+ checkNotSet();
+ data.vis = Bytes.of(cv);
+ return new Mutator(data);
+ }
+ }
+
+ /**
+ * @since 1.0.0
+ */
+ public class MutatorQualifierMethods extends QualifierMethods<VisibilityMutator> {
+
+ MutatorQualifierMethods(Data data) {
+ tl.super(data);
+ }
+
+ @Override
+ VisibilityMutator create(Data data) {
+ return new VisibilityMutator(data);
+ }
+ }
+
+ /**
+ * @since 1.0.0
+ */
+ public class MutatorFamilyMethods extends FamilyMethods<MutatorQualifierMethods, Mutator> {
+
+ MutatorFamilyMethods(Data data) {
+ tl.super(data);
+ }
+
+ @Override
+ MutatorQualifierMethods create1(Data data) {
+ return new MutatorQualifierMethods(data);
+ }
+
+ @Override
+ Mutator create2(Data data) {
+ return new Mutator(data);
+ }
+ }
+
+ /**
+ * @since 1.0.0
+ */
+ public class MutatorRowMethods extends RowMethods<MutatorFamilyMethods> {
+
+ MutatorRowMethods() {
+ tl.super();
+ }
+
+ @Override
+ MutatorFamilyMethods create(Data data) {
+ return new MutatorFamilyMethods(data);
+ }
+
+ }
+
+ @VisibleForTesting
+ protected TypedTransactionBase(TransactionBase tx, Encoder encoder, TypeLayer tl) {
+ super(tx, encoder, tl);
+ this.tx = tx;
+ this.encoder = encoder;
+ this.tl = tl;
+ }
+
+ public MutatorRowMethods mutate() {
+ return new MutatorRowMethods();
+ }
+
+ @Override
+ public void set(Bytes row, Column col, Bytes value) throws AlreadySetException {
+ tx.set(row, col, value);
+ }
+
+ @Override
+ public void set(String row, Column col, String value) throws AlreadySetException {
+ tx.set(row, col, value);
+ }
+
+ @Override
+ public void setWeakNotification(Bytes row, Column col) {
+ tx.setWeakNotification(row, col);
+ }
+
+ @Override
+ public void setWeakNotification(String row, Column col) {
+ tx.setWeakNotification(row, col);
+ }
+
+ @Override
+ public void delete(Bytes row, Column col) throws AlreadySetException {
+ tx.delete(row, col);
+ }
+
+ @Override
+ public void delete(String row, Column col) {
+ tx.delete(row, col);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/main/java/org/apache/fluo/recipes/data/RowHasher.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/data/RowHasher.java b/modules/core/src/main/java/org/apache/fluo/recipes/data/RowHasher.java
deleted file mode 100644
index 2501fa1..0000000
--- a/modules/core/src/main/java/org/apache/fluo/recipes/data/RowHasher.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.fluo.recipes.data;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.regex.Pattern;
-
-import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
-import com.google.common.hash.Hashing;
-import org.apache.fluo.api.data.Bytes;
-import org.apache.fluo.api.data.BytesBuilder;
-import org.apache.fluo.recipes.common.Pirtos;
-
-/**
- * This recipe provides code to help add a hash of the row as a prefix of the row. Using this recipe
- * rows are structured like the following.
- *
- * <p>
- * {@code <prefix>:<fixed len row hash>:<user row>}
- *
- * <p>
- * The recipe also provides code the help generate split points and configure balancing of the
- * prefix.
- *
- * <p>
- * The project documentation has more information.
- */
-public class RowHasher {
-
- private static final int HASH_LEN = 4;
-
- public Pirtos getTableOptimizations(int numTablets) {
-
- List<Bytes> splits = new ArrayList<>(numTablets - 1);
-
- int numSplits = numTablets - 1;
- int distance = (((int) Math.pow(Character.MAX_RADIX, HASH_LEN) - 1) / numTablets) + 1;
- int split = distance;
- for (int i = 0; i < numSplits; i++) {
- splits.add(Bytes.of(prefix
- + Strings.padStart(Integer.toString(split, Character.MAX_RADIX), HASH_LEN, '0')));
- split += distance;
- }
-
- splits.add(Bytes.of(prefix + "~"));
-
-
- Pirtos pirtos = new Pirtos();
- pirtos.setSplits(splits);
- pirtos.setTabletGroupingRegex(Pattern.quote(prefix.toString()));
-
- return pirtos;
- }
-
-
- private Bytes prefix;
-
- public RowHasher(String prefix) {
- this.prefix = Bytes.of(prefix + ":");
- }
-
- /**
- * @return Returns input with prefix and hash of input prepended.
- */
- public Bytes addHash(String row) {
- return addHash(Bytes.of(row));
- }
-
- /**
- * @return Returns input with prefix and hash of input prepended.
- */
- public Bytes addHash(Bytes row) {
- BytesBuilder builder = Bytes.newBuilder(prefix.length() + 5 + row.length());
- builder.append(prefix);
- builder.append(genHash(row));
- builder.append(":");
- builder.append(row);
- return builder.toBytes();
- }
-
- private boolean hasHash(Bytes row) {
- for (int i = prefix.length(); i < prefix.length() + HASH_LEN; i++) {
- byte b = row.byteAt(i);
- boolean isAlphaNum = (b >= 'a' && b <= 'z') || (b >= '0' && b <= '9');
- if (!isAlphaNum) {
- return false;
- }
- }
-
- if (row.byteAt(prefix.length() - 1) != ':' || row.byteAt(prefix.length() + HASH_LEN) != ':') {
- return false;
- }
-
- return true;
- }
-
- /**
- * @return Returns input with prefix and hash stripped from beginning.
- */
- public Bytes removeHash(Bytes row) {
- Preconditions.checkArgument(row.length() >= prefix.length() + 5,
- "Row is shorter than expected " + row);
- Preconditions.checkArgument(row.subSequence(0, prefix.length()).equals(prefix),
- "Row does not have expected prefix " + row);
- Preconditions.checkArgument(hasHash(row), "Row does not have expected hash " + row);
- return row.subSequence(prefix.length() + 5, row.length());
- }
-
- private static String genHash(Bytes row) {
- int hash = Hashing.murmur3_32().hashBytes(row.toArray()).asInt();
- hash = hash & 0x7fffffff;
- // base 36 gives a lot more bins in 4 bytes than hex, but it is still human readable which is
- // nice for debugging.
- String hashString =
- Strings.padStart(Integer.toString(hash, Character.MAX_RADIX), HASH_LEN, '0');
- hashString = hashString.substring(hashString.length() - HASH_LEN);
-
- return hashString;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/main/java/org/apache/fluo/recipes/export/Export.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/export/Export.java b/modules/core/src/main/java/org/apache/fluo/recipes/export/Export.java
deleted file mode 100644
index c477ab1..0000000
--- a/modules/core/src/main/java/org/apache/fluo/recipes/export/Export.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.fluo.recipes.export;
-
-import java.util.Objects;
-
-public class Export<K, V> {
- private final K key;
- private final V value;
-
- public Export(K key, V val) {
- Objects.requireNonNull(key);
- Objects.requireNonNull(val);
- this.key = key;
- this.value = val;
- }
-
- public K getKey() {
- return key;
- }
-
- public V getValue() {
- return value;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/main/java/org/apache/fluo/recipes/export/ExportBucket.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/export/ExportBucket.java b/modules/core/src/main/java/org/apache/fluo/recipes/export/ExportBucket.java
deleted file mode 100644
index fa9bb45..0000000
--- a/modules/core/src/main/java/org/apache/fluo/recipes/export/ExportBucket.java
+++ /dev/null
@@ -1,203 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.fluo.recipes.export;
-
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.Map.Entry;
-
-import com.google.common.base.Preconditions;
-import org.apache.fluo.api.client.TransactionBase;
-import org.apache.fluo.api.config.ScannerConfiguration;
-import org.apache.fluo.api.data.Bytes;
-import org.apache.fluo.api.data.Column;
-import org.apache.fluo.api.data.RowColumn;
-import org.apache.fluo.api.data.Span;
-import org.apache.fluo.api.iterator.ColumnIterator;
-import org.apache.fluo.api.iterator.RowIterator;
-import org.apache.fluo.recipes.impl.BucketUtil;
-import org.apache.fluo.recipes.types.StringEncoder;
-import org.apache.fluo.recipes.types.TypeLayer;
-import org.apache.fluo.recipes.types.TypedTransactionBase;
-
-/**
- * This class encapsulates a buckets serialization code.
- */
-class ExportBucket {
- private static final String NOTIFICATION_CF = "fluoRecipes";
- private static final String NOTIFICATION_CQ_PREFIX = "eq:";
- private static final Column EXPORT_COL = new Column("e", "v");
- private static final Column NEXT_COL = new Column("e", "next");
-
- static Column newNotificationColumn(String queueId) {
- return new Column(NOTIFICATION_CF, NOTIFICATION_CQ_PREFIX + queueId);
- }
-
- private final TypedTransactionBase ttx;
- private final String qid;
- private final Bytes bucketRow;
-
- static Bytes generateBucketRow(String qid, int bucket, int numBuckets) {
- return Bytes.of(qid + ":" + BucketUtil.genBucketId(bucket, numBuckets));
- }
-
- ExportBucket(TransactionBase tx, String qid, int bucket, int numBuckets) {
- // TODO encode in a more robust way... but for now fail early
- Preconditions.checkArgument(!qid.contains(":"), "Export QID can not contain :");
- this.ttx = new TypeLayer(new StringEncoder()).wrap(tx);
- this.qid = qid;
- this.bucketRow = generateBucketRow(qid, bucket, numBuckets);
- }
-
- ExportBucket(TransactionBase tx, Bytes bucketRow) {
- this.ttx = new TypeLayer(new StringEncoder()).wrap(tx);
-
- int colonLoc = -1;
-
- for (int i = 0; i < bucketRow.length(); i++) {
- if (bucketRow.byteAt(i) == ':') {
- colonLoc = i;
- break;
- }
- }
-
- Preconditions.checkArgument(colonLoc != -1 && colonLoc != bucketRow.length(),
- "Invalid bucket row " + bucketRow);
- Preconditions.checkArgument(bucketRow.byteAt(bucketRow.length() - 1) == ':',
- "Invalid bucket row " + bucketRow);
-
- this.bucketRow = bucketRow.subSequence(0, bucketRow.length() - 1);
- this.qid = bucketRow.subSequence(0, colonLoc).toString();
- }
-
- private static byte[] encSeq(long l) {
- byte[] ret = new byte[8];
- ret[0] = (byte) (l >>> 56);
- ret[1] = (byte) (l >>> 48);
- ret[2] = (byte) (l >>> 40);
- ret[3] = (byte) (l >>> 32);
- ret[4] = (byte) (l >>> 24);
- ret[5] = (byte) (l >>> 16);
- ret[6] = (byte) (l >>> 8);
- ret[7] = (byte) (l >>> 0);
- return ret;
- }
-
- private static long decodeSeq(Bytes seq) {
- return (((long) seq.byteAt(0) << 56) + ((long) (seq.byteAt(1) & 255) << 48)
- + ((long) (seq.byteAt(2) & 255) << 40) + ((long) (seq.byteAt(3) & 255) << 32)
- + ((long) (seq.byteAt(4) & 255) << 24) + ((seq.byteAt(5) & 255) << 16)
- + ((seq.byteAt(6) & 255) << 8) + ((seq.byteAt(7) & 255) << 0));
- }
-
-
- public void add(long seq, byte[] key, byte[] value) {
- Bytes row =
- Bytes.newBuilder(bucketRow.length() + 1 + key.length + 8).append(bucketRow).append(":")
- .append(key).append(encSeq(seq)).toBytes();
- ttx.set(row, EXPORT_COL, Bytes.of(value));
- }
-
- /**
- * Computes the minimial row for a bucket
- */
- private Bytes getMinimalRow() {
- return Bytes.newBuilder(bucketRow.length() + 1).append(bucketRow).append(":").toBytes();
- }
-
- public void notifyExportObserver() {
- ttx.mutate().row(getMinimalRow()).col(newNotificationColumn(qid)).weaklyNotify();
- }
-
- public Iterator<ExportEntry> getExportIterator(Bytes continueRow) {
- ScannerConfiguration sc = new ScannerConfiguration();
-
- if (continueRow != null) {
- Span tmpSpan = Span.prefix(bucketRow);
- Span nextSpan =
- new Span(new RowColumn(continueRow, EXPORT_COL), true, tmpSpan.getEnd(),
- tmpSpan.isEndInclusive());
- sc.setSpan(nextSpan);
- } else {
- sc.setSpan(Span.prefix(bucketRow));
- }
-
- sc.fetchColumn(EXPORT_COL.getFamily(), EXPORT_COL.getQualifier());
- RowIterator iter = ttx.get(sc);
-
- if (iter.hasNext()) {
- return new ExportIterator(iter);
- } else {
- return Collections.<ExportEntry>emptySet().iterator();
- }
- }
-
- private class ExportIterator implements Iterator<ExportEntry> {
-
- private RowIterator rowIter;
- private Bytes lastRow;
-
- public ExportIterator(RowIterator rowIter) {
- this.rowIter = rowIter;
- }
-
- @Override
- public boolean hasNext() {
- return rowIter.hasNext();
- }
-
- @Override
- public ExportEntry next() {
- Entry<Bytes, ColumnIterator> rowCol = rowIter.next();
- Bytes row = rowCol.getKey();
-
- Bytes keyBytes = row.subSequence(bucketRow.length() + 1, row.length() - 8);
- Bytes seqBytes = row.subSequence(row.length() - 8, row.length());
-
- ExportEntry ee = new ExportEntry();
-
- ee.key = keyBytes.toArray();
- ee.seq = decodeSeq(seqBytes);
- // TODO maybe leave as Bytes?
- ee.value = rowCol.getValue().next().getValue().toArray();
-
- lastRow = row;
-
- return ee;
- }
-
- @Override
- public void remove() {
- ttx.mutate().row(lastRow).col(EXPORT_COL).delete();
- }
- }
-
- public Bytes getContinueRow() {
- return ttx.get(getMinimalRow(), NEXT_COL);
- }
-
- public void setContinueRow(ExportEntry ee) {
- Bytes nextRow =
- Bytes.newBuilder(bucketRow.length() + 1 + ee.key.length + 8).append(bucketRow).append(":")
- .append(ee.key).append(encSeq(ee.seq)).toBytes();
-
- ttx.set(getMinimalRow(), NEXT_COL, nextRow);
- }
-
- public void clearContinueRow() {
- ttx.delete(getMinimalRow(), NEXT_COL);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/main/java/org/apache/fluo/recipes/export/ExportEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/export/ExportEntry.java b/modules/core/src/main/java/org/apache/fluo/recipes/export/ExportEntry.java
deleted file mode 100644
index 1b156b9..0000000
--- a/modules/core/src/main/java/org/apache/fluo/recipes/export/ExportEntry.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.fluo.recipes.export;
-
-class ExportEntry {
- byte[] key;
- long seq;
- byte[] value;
-}
http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/main/java/org/apache/fluo/recipes/export/ExportObserver.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/export/ExportObserver.java b/modules/core/src/main/java/org/apache/fluo/recipes/export/ExportObserver.java
deleted file mode 100644
index 972af6e..0000000
--- a/modules/core/src/main/java/org/apache/fluo/recipes/export/ExportObserver.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.fluo.recipes.export;
-
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-
-import com.google.common.collect.Iterators;
-import org.apache.fluo.api.client.TransactionBase;
-import org.apache.fluo.api.data.Bytes;
-import org.apache.fluo.api.data.Column;
-import org.apache.fluo.api.observer.AbstractObserver;
-import org.apache.fluo.recipes.serialization.SimpleSerializer;
-
-public class ExportObserver<K, V> extends AbstractObserver {
-
- private static class MemLimitIterator implements Iterator<ExportEntry> {
-
- private long memConsumed = 0;
- private long memLimit;
- private int extraPerKey;
- private Iterator<ExportEntry> source;
-
- public MemLimitIterator(Iterator<ExportEntry> input, long limit, int extraPerKey) {
- this.source = input;
- this.memLimit = limit;
- this.extraPerKey = extraPerKey;
- }
-
- @Override
- public boolean hasNext() {
- return memConsumed < memLimit && source.hasNext();
- }
-
- @Override
- public ExportEntry next() {
- if (!hasNext()) {
- throw new NoSuchElementException();
- }
- ExportEntry ee = source.next();
- memConsumed += ee.key.length + extraPerKey + ee.value.length;
- return ee;
- }
-
- @Override
- public void remove() {
- source.remove();
- }
- }
-
- private String queueId;
- private Class<K> keyType;
- private Class<V> valType;
- SimpleSerializer serializer;
- private Exporter<K, V> exporter;
-
- private long memLimit;
-
- protected String getQueueId() {
- return queueId;
- }
-
- SimpleSerializer getSerializer() {
- return serializer;
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public void init(Context context) throws Exception {
- queueId = context.getParameters().get("queueId");
- ExportQueue.Options opts = new ExportQueue.Options(queueId, context.getAppConfiguration());
-
- // TODO defer loading classes... so that not done during fluo init
- // TODO move class loading to centralized place... also attempt to check type params
- keyType = (Class<K>) getClass().getClassLoader().loadClass(opts.keyType);
- valType = (Class<V>) getClass().getClassLoader().loadClass(opts.valueType);
- exporter =
- getClass().getClassLoader().loadClass(opts.exporterType).asSubclass(Exporter.class)
- .newInstance();
-
- serializer = SimpleSerializer.getInstance(context.getAppConfiguration());
-
- memLimit = opts.getBufferSize();
-
- exporter.init(queueId, context);
- }
-
- @Override
- public ObservedColumn getObservedColumn() {
- return new ObservedColumn(ExportBucket.newNotificationColumn(queueId), NotificationType.WEAK);
- }
-
- @Override
- public void process(TransactionBase tx, Bytes row, Column column) throws Exception {
- ExportBucket bucket = new ExportBucket(tx, row);
-
- Bytes continueRow = bucket.getContinueRow();
-
- Iterator<ExportEntry> input = bucket.getExportIterator(continueRow);
- MemLimitIterator memLimitIter = new MemLimitIterator(input, memLimit, 8 + queueId.length());
-
- Iterator<SequencedExport<K, V>> exportIterator =
- Iterators.transform(
- memLimitIter,
- ee -> new SequencedExport<>(serializer.deserialize(ee.key, keyType), serializer
- .deserialize(ee.value, valType), ee.seq));
-
- exportIterator = Iterators.consumingIterator(exportIterator);
-
- exporter.processExports(exportIterator);
-
- if (input.hasNext()) {
- // not everything was processed so notify self
- bucket.notifyExportObserver();
-
- if (!memLimitIter.hasNext()) {
- // stopped because of mem limit... set continue key
- bucket.setContinueRow(input.next());
- continueRow = null;
- }
- }
-
- if (continueRow != null) {
- bucket.clearContinueRow();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/main/java/org/apache/fluo/recipes/export/ExportQueue.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/export/ExportQueue.java b/modules/core/src/main/java/org/apache/fluo/recipes/export/ExportQueue.java
deleted file mode 100644
index 13518e7..0000000
--- a/modules/core/src/main/java/org/apache/fluo/recipes/export/ExportQueue.java
+++ /dev/null
@@ -1,273 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.fluo.recipes.export;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-import java.util.regex.Pattern;
-
-import com.google.common.base.Preconditions;
-import com.google.common.hash.Hashing;
-import org.apache.fluo.api.client.TransactionBase;
-import org.apache.fluo.api.config.FluoConfiguration;
-import org.apache.fluo.api.config.ObserverConfiguration;
-import org.apache.fluo.api.config.SimpleConfiguration;
-import org.apache.fluo.api.data.Bytes;
-import org.apache.fluo.recipes.common.Pirtos;
-import org.apache.fluo.recipes.common.RowRange;
-import org.apache.fluo.recipes.common.TransientRegistry;
-import org.apache.fluo.recipes.serialization.SimpleSerializer;
-
-public class ExportQueue<K, V> {
-
- private static final String RANGE_BEGIN = "#";
- private static final String RANGE_END = ":~";
-
- private int numBuckets;
- private SimpleSerializer serializer;
- private String queueId;
-
- // usage hint : could be created once in an observers init method
- // usage hint : maybe have a queue for each type of data being exported???
- // maybe less queues are
- // more efficient though because more batching at export time??
- ExportQueue(Options opts, SimpleSerializer serializer) throws Exception {
- // TODO sanity check key type based on type params
- // TODO defer creating classes until needed.. so that its not done during Fluo init
- this.queueId = opts.queueId;
- this.numBuckets = opts.numBuckets;
- this.serializer = serializer;
- }
-
- public void add(TransactionBase tx, K key, V value) {
- addAll(tx, Collections.singleton(new Export<>(key, value)).iterator());
- }
-
- public void addAll(TransactionBase tx, Iterator<Export<K, V>> exports) {
-
- Set<Integer> bucketsNotified = new HashSet<>();
- while (exports.hasNext()) {
- Export<K, V> export = exports.next();
-
- byte[] k = serializer.serialize(export.getKey());
- byte[] v = serializer.serialize(export.getValue());
-
- int hash = Hashing.murmur3_32().hashBytes(k).asInt();
- int bucketId = Math.abs(hash % numBuckets);
-
- ExportBucket bucket = new ExportBucket(tx, queueId, bucketId, numBuckets);
- bucket.add(tx.getStartTimestamp(), k, v);
-
- if (!bucketsNotified.contains(bucketId)) {
- bucket.notifyExportObserver();
- bucketsNotified.add(bucketId);
- }
- }
- }
-
- public static <K2, V2> ExportQueue<K2, V2> getInstance(String exportQueueId,
- SimpleConfiguration appConfig) {
- Options opts = new Options(exportQueueId, appConfig);
- try {
- return new ExportQueue<>(opts, SimpleSerializer.getInstance(appConfig));
- } catch (Exception e) {
- // TODO
- throw new RuntimeException(e);
- }
- }
-
- /**
- * Call this method before initializing Fluo.
- *
- * @param fluoConfig The configuration that will be used to initialize fluo.
- */
- public static void configure(FluoConfiguration fluoConfig, Options opts) {
- SimpleConfiguration appConfig = fluoConfig.getAppConfiguration();
- opts.save(appConfig);
-
- fluoConfig.addObserver(new ObserverConfiguration(ExportObserver.class.getName())
- .setParameters(Collections.singletonMap("queueId", opts.queueId)));
-
- Bytes exportRangeStart = Bytes.of(opts.queueId + RANGE_BEGIN);
- Bytes exportRangeStop = Bytes.of(opts.queueId + RANGE_END);
-
- new TransientRegistry(fluoConfig.getAppConfiguration()).addTransientRange("exportQueue."
- + opts.queueId, new RowRange(exportRangeStart, exportRangeStop));
- }
-
- /**
- * Return suggested Fluo table optimizations for all previously configured export queues.
- *
- * @param appConfig Must pass in the application configuration obtained from
- * {@code FluoClient.getAppConfiguration()} or
- * {@code FluoConfiguration.getAppConfiguration()}
- */
-
- public static Pirtos getTableOptimizations(SimpleConfiguration appConfig) {
- HashSet<String> queueIds = new HashSet<>();
- appConfig.getKeys(Options.PREFIX.substring(0, Options.PREFIX.length() - 1)).forEachRemaining(
- k -> queueIds.add(k.substring(Options.PREFIX.length()).split("\\.", 2)[0]));
-
- Pirtos pirtos = new Pirtos();
- queueIds.forEach(qid -> pirtos.merge(getTableOptimizations(qid, appConfig)));
-
- return pirtos;
- }
-
- /**
- * Return suggested Fluo table optimizations for the specified export queue.
- *
- * @param appConfig Must pass in the application configuration obtained from
- * {@code FluoClient.getAppConfiguration()} or
- * {@code FluoConfiguration.getAppConfiguration()}
- */
- public static Pirtos getTableOptimizations(String queueId, SimpleConfiguration appConfig) {
- Options opts = new Options(queueId, appConfig);
-
- List<Bytes> splits = new ArrayList<>();
-
- Bytes exportRangeStart = Bytes.of(opts.queueId + RANGE_BEGIN);
- Bytes exportRangeStop = Bytes.of(opts.queueId + RANGE_END);
-
- splits.add(exportRangeStart);
- splits.add(exportRangeStop);
-
- List<Bytes> exportSplits = new ArrayList<>();
- for (int i = opts.getBucketsPerTablet(); i < opts.numBuckets; i += opts.getBucketsPerTablet()) {
- exportSplits.add(ExportBucket.generateBucketRow(opts.queueId, i, opts.numBuckets));
- }
- Collections.sort(exportSplits);
- splits.addAll(exportSplits);
-
- Pirtos pirtos = new Pirtos();
- pirtos.setSplits(splits);
-
- // the tablet with end row <queueId># does not contain any data for the export queue and
- // should not be grouped with the export queue
- pirtos.setTabletGroupingRegex(Pattern.quote(queueId + ":"));
-
- return pirtos;
- }
-
- public static class Options {
-
- private static final String PREFIX = "recipes.exportQueue.";
- static final long DEFAULT_BUFFER_SIZE = 1 << 20;
- static final int DEFAULT_BUCKETS_PER_TABLET = 10;
-
- int numBuckets;
- Integer bucketsPerTablet = null;
- Long bufferSize;
-
- String keyType;
- String valueType;
- String exporterType;
- String queueId;
-
- Options(String queueId, SimpleConfiguration appConfig) {
- this.queueId = queueId;
-
- this.numBuckets = appConfig.getInt(PREFIX + queueId + ".buckets");
- this.exporterType = appConfig.getString(PREFIX + queueId + ".exporter");
- this.keyType = appConfig.getString(PREFIX + queueId + ".key");
- this.valueType = appConfig.getString(PREFIX + queueId + ".val");
- this.bufferSize = appConfig.getLong(PREFIX + queueId + ".bufferSize", DEFAULT_BUFFER_SIZE);
- this.bucketsPerTablet =
- appConfig.getInt(PREFIX + queueId + ".bucketsPerTablet", DEFAULT_BUCKETS_PER_TABLET);
- }
-
- public Options(String queueId, String exporterType, String keyType, String valueType,
- int buckets) {
- Preconditions.checkArgument(buckets > 0);
-
- this.queueId = queueId;
- this.numBuckets = buckets;
- this.exporterType = exporterType;
- this.keyType = keyType;
- this.valueType = valueType;
- }
-
-
- public <K, V> Options(String queueId, Class<? extends Exporter<K, V>> exporter,
- Class<K> keyType, Class<V> valueType, int buckets) {
- this(queueId, exporter.getName(), keyType.getName(), valueType.getName(), buckets);
- }
-
- /**
- * Sets a limit on the amount of serialized updates to read into memory. Additional memory will
- * be used to actually deserialize and process the updates. This limit does not account for
- * object overhead in java, which can be significant.
- *
- * <p>
- * The way memory read is calculated is by summing the length of serialized key and value byte
- * arrays. Once this sum exceeds the configured memory limit, no more export key values are
- * processed in the current transaction. When not everything is processed, the observer
- * processing exports will notify itself causing another transaction to continue processing
- * later.
- */
- public Options setBufferSize(long bufferSize) {
- Preconditions.checkArgument(bufferSize > 0, "Buffer size must be positive");
- this.bufferSize = bufferSize;
- return this;
- }
-
- long getBufferSize() {
- if (bufferSize == null) {
- return DEFAULT_BUFFER_SIZE;
- }
-
- return bufferSize;
- }
-
- /**
- * Sets the number of buckets per tablet to generate. This affects how many split points will be
- * generated when optimizing the Accumulo table.
- *
- */
- public Options setBucketsPerTablet(int bucketsPerTablet) {
- Preconditions.checkArgument(bucketsPerTablet > 0, "bucketsPerTablet is <= 0 : "
- + bucketsPerTablet);
- this.bucketsPerTablet = bucketsPerTablet;
- return this;
- }
-
- int getBucketsPerTablet() {
- if (bucketsPerTablet == null) {
- return DEFAULT_BUCKETS_PER_TABLET;
- }
-
- return bucketsPerTablet;
- }
-
- void save(SimpleConfiguration appConfig) {
- appConfig.setProperty(PREFIX + queueId + ".buckets", numBuckets + "");
- appConfig.setProperty(PREFIX + queueId + ".exporter", exporterType + "");
- appConfig.setProperty(PREFIX + queueId + ".key", keyType);
- appConfig.setProperty(PREFIX + queueId + ".val", valueType);
-
- if (bufferSize != null) {
- appConfig.setProperty(PREFIX + queueId + ".bufferSize", bufferSize);
- }
- if (bucketsPerTablet != null) {
- appConfig.setProperty(PREFIX + queueId + ".bucketsPerTablet", bucketsPerTablet);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/main/java/org/apache/fluo/recipes/export/Exporter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/export/Exporter.java b/modules/core/src/main/java/org/apache/fluo/recipes/export/Exporter.java
deleted file mode 100644
index b81e9d1..0000000
--- a/modules/core/src/main/java/org/apache/fluo/recipes/export/Exporter.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.fluo.recipes.export;
-
-import java.util.Iterator;
-
-import org.apache.fluo.api.observer.Observer.Context;
-
-public abstract class Exporter<K, V> {
-
- public void init(String queueId, Context observerContext) throws Exception {}
-
- /**
- * Must be able to handle same key being exported multiple times and key being exported out of
- * order. The sequence number is meant to help with this.
- *
- * <p>
- * If multiple export entries with the same key are passed in, then the entries with the same key
- * will be consecutive and in ascending sequence order.
- *
- * <p>
- * If the call to process exports is unexpectedly terminated, it will be called again later with
- * at least the same data. For example suppose an exporter was passed the following entries.
- *
- * <ul>
- * <li>key=0 sequence=9 value=abc
- * <li>key=1 sequence=13 value=d
- * <li>key=1 sequence=17 value=e
- * <li>key=1 sequence=23 value=f
- * <li>key=2 sequence=19 value=x
- * </ul>
- *
- * <p>
- * Assume the exporter exports some of these and then fails before completing all of them. The
- * next time its called it will be passed what it saw before, but it could also be passed more.
- *
- * <ul>
- * <li>key=0 sequence=9 value=abc
- * <li>key=1 sequence=13 value=d
- * <li>key=1 sequence=17 value=e
- * <li>key=1 sequence=23 value=f
- * <li>key=1 sequence=29 value=g
- * <li>key=2 sequence=19 value=x
- * <li>key=2 sequence=77 value=y
- * </ul>
- *
- */
- protected abstract void processExports(Iterator<SequencedExport<K, V>> exports);
-
- // TODO add close
-}
http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/main/java/org/apache/fluo/recipes/export/SequencedExport.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/export/SequencedExport.java b/modules/core/src/main/java/org/apache/fluo/recipes/export/SequencedExport.java
deleted file mode 100644
index a862a8e..0000000
--- a/modules/core/src/main/java/org/apache/fluo/recipes/export/SequencedExport.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.fluo.recipes.export;
-
-public class SequencedExport<K, V> extends Export<K, V> {
- private final long seq;
-
- SequencedExport(K k, V v, long seq) {
- super(k, v);
- this.seq = seq;
- }
-
- public long getSequence() {
- return seq;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/main/java/org/apache/fluo/recipes/impl/BucketUtil.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/impl/BucketUtil.java b/modules/core/src/main/java/org/apache/fluo/recipes/impl/BucketUtil.java
deleted file mode 100644
index ded289c..0000000
--- a/modules/core/src/main/java/org/apache/fluo/recipes/impl/BucketUtil.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.fluo.recipes.impl;
-
-public class BucketUtil {
- public static String genBucketId(int bucket, int maxBucket) {
- int bucketLen = Integer.toHexString(maxBucket).length();
- // TODO printf is slow
- return String.format("%0" + bucketLen + "x", bucket);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/main/java/org/apache/fluo/recipes/map/CollisionFreeMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/map/CollisionFreeMap.java b/modules/core/src/main/java/org/apache/fluo/recipes/map/CollisionFreeMap.java
deleted file mode 100644
index bc7bffd..0000000
--- a/modules/core/src/main/java/org/apache/fluo/recipes/map/CollisionFreeMap.java
+++ /dev/null
@@ -1,657 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.fluo.recipes.map;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Optional;
-import java.util.Set;
-import java.util.regex.Pattern;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Iterators;
-import com.google.common.collect.Sets;
-import com.google.common.hash.Hashing;
-import org.apache.fluo.api.client.SnapshotBase;
-import org.apache.fluo.api.client.TransactionBase;
-import org.apache.fluo.api.config.FluoConfiguration;
-import org.apache.fluo.api.config.ObserverConfiguration;
-import org.apache.fluo.api.config.ScannerConfiguration;
-import org.apache.fluo.api.config.SimpleConfiguration;
-import org.apache.fluo.api.data.Bytes;
-import org.apache.fluo.api.data.BytesBuilder;
-import org.apache.fluo.api.data.Column;
-import org.apache.fluo.api.data.RowColumn;
-import org.apache.fluo.api.data.RowColumnValue;
-import org.apache.fluo.api.data.Span;
-import org.apache.fluo.api.iterator.ColumnIterator;
-import org.apache.fluo.api.iterator.RowIterator;
-import org.apache.fluo.recipes.common.Pirtos;
-import org.apache.fluo.recipes.common.RowRange;
-import org.apache.fluo.recipes.common.TransientRegistry;
-import org.apache.fluo.recipes.impl.BucketUtil;
-import org.apache.fluo.recipes.serialization.SimpleSerializer;
-
-/**
- * See the project level documentation for information about this recipe.
- */
-public class CollisionFreeMap<K, V> {
-
- private static final String UPDATE_RANGE_END = ":u:~";
-
- private static final String DATA_RANGE_END = ":d:~";
-
- private String mapId;
-
- private Class<K> keyType;
- private Class<V> valType;
- private SimpleSerializer serializer;
- private Combiner<K, V> combiner;
- UpdateObserver<K, V> updateObserver;
- private long bufferSize;
-
- static final Column UPDATE_COL = new Column("u", "v");
- static final Column NEXT_COL = new Column("u", "next");
-
- private int numBuckets = -1;
-
- @SuppressWarnings("unchecked")
- CollisionFreeMap(Options opts, SimpleSerializer serializer) throws Exception {
-
- this.mapId = opts.mapId;
- // TODO defer loading classes
- // TODO centralize class loading
- // TODO try to check type params
- this.numBuckets = opts.numBuckets;
- this.keyType = (Class<K>) getClass().getClassLoader().loadClass(opts.keyType);
- this.valType = (Class<V>) getClass().getClassLoader().loadClass(opts.valueType);
- this.combiner =
- (Combiner<K, V>) getClass().getClassLoader().loadClass(opts.combinerType).newInstance();
- this.serializer = serializer;
- if (opts.updateObserverType != null) {
- this.updateObserver =
- getClass().getClassLoader().loadClass(opts.updateObserverType)
- .asSubclass(UpdateObserver.class).newInstance();
- } else {
- this.updateObserver = new NullUpdateObserver<>();
- }
- this.bufferSize = opts.getBufferSize();
- }
-
- private V deserVal(Bytes val) {
- return serializer.deserialize(val.toArray(), valType);
- }
-
- private Bytes getKeyFromUpdateRow(Bytes prefix, Bytes row) {
- return row.subSequence(prefix.length(), row.length() - 8);
- }
-
- void process(TransactionBase tx, Bytes ntfyRow, Column col) throws Exception {
-
- Bytes nextKey = tx.get(ntfyRow, NEXT_COL);
-
- ScannerConfiguration sc = new ScannerConfiguration();
-
- if (nextKey != null) {
- Bytes startRow =
- Bytes.newBuilder(ntfyRow.length() + nextKey.length()).append(ntfyRow).append(nextKey)
- .toBytes();
- Span tmpSpan = Span.prefix(ntfyRow);
- Span nextSpan =
- new Span(new RowColumn(startRow, UPDATE_COL), false, tmpSpan.getEnd(),
- tmpSpan.isEndInclusive());
- sc.setSpan(nextSpan);
- } else {
- sc.setSpan(Span.prefix(ntfyRow));
- }
-
- sc.setSpan(Span.prefix(ntfyRow));
- sc.fetchColumn(UPDATE_COL.getFamily(), UPDATE_COL.getQualifier());
- RowIterator iter = tx.get(sc);
-
- Map<Bytes, List<Bytes>> updates = new HashMap<>();
-
- long approxMemUsed = 0;
-
- Bytes partiallyReadKey = null;
-
- if (iter.hasNext()) {
- Bytes lastKey = null;
- while (iter.hasNext() && approxMemUsed < bufferSize) {
- Entry<Bytes, ColumnIterator> rowCol = iter.next();
- Bytes curRow = rowCol.getKey();
-
- tx.delete(curRow, UPDATE_COL);
-
- Bytes serializedKey = getKeyFromUpdateRow(ntfyRow, curRow);
- lastKey = serializedKey;
-
- List<Bytes> updateList = updates.get(serializedKey);
- if (updateList == null) {
- updateList = new ArrayList<>();
- updates.put(serializedKey, updateList);
- }
-
- Bytes val = rowCol.getValue().next().getValue();
- updateList.add(val);
-
- approxMemUsed += curRow.length();
- approxMemUsed += val.length();
- }
-
- if (iter.hasNext()) {
- Entry<Bytes, ColumnIterator> rowCol = iter.next();
- Bytes curRow = rowCol.getKey();
-
- // check if more updates for last key
- if (getKeyFromUpdateRow(ntfyRow, curRow).equals(lastKey)) {
- // there are still more updates for this key
- partiallyReadKey = lastKey;
-
- // start next time at the current key
- tx.set(ntfyRow, NEXT_COL, partiallyReadKey);
- } else {
- // start next time at the next possible key
- Bytes nextPossible =
- Bytes.newBuilder(lastKey.length() + 1).append(lastKey).append(new byte[] {0})
- .toBytes();
- tx.set(ntfyRow, NEXT_COL, nextPossible);
- }
-
- // may not read all data because of mem limit, so notify self
- tx.setWeakNotification(ntfyRow, col);
- } else if (nextKey != null) {
- // clear nextKey
- tx.delete(ntfyRow, NEXT_COL);
- }
- } else if (nextKey != null) {
- tx.delete(ntfyRow, NEXT_COL);
- }
-
- byte[] dataPrefix = ntfyRow.toArray();
- // TODO this is awful... no sanity check... hard to read
- dataPrefix[Bytes.of(mapId).length() + 1] = 'd';
-
- BytesBuilder rowBuilder = Bytes.newBuilder();
- rowBuilder.append(dataPrefix);
- int rowPrefixLen = rowBuilder.getLength();
-
- Set<Bytes> keysToFetch = updates.keySet();
- if (partiallyReadKey != null) {
- final Bytes prk = partiallyReadKey;
- keysToFetch = Sets.filter(keysToFetch, b -> !b.equals(prk));
- }
- Map<Bytes, Map<Column, Bytes>> currentVals = getCurrentValues(tx, rowBuilder, keysToFetch);
-
- ArrayList<Update<K, V>> updatesToReport = new ArrayList<>(updates.size());
-
- for (Entry<Bytes, List<Bytes>> entry : updates.entrySet()) {
- rowBuilder.setLength(rowPrefixLen);
- Bytes currentValueRow = rowBuilder.append(entry.getKey()).toBytes();
- Bytes currVal =
- currentVals.getOrDefault(currentValueRow, Collections.emptyMap()).get(DATA_COLUMN);
-
- Iterator<V> ui = Iterators.transform(entry.getValue().iterator(), this::deserVal);
-
- K kd = serializer.deserialize(entry.getKey().toArray(), keyType);
-
- if (partiallyReadKey != null && partiallyReadKey.equals(entry.getKey())) {
- // not all updates were read for this key, so requeue the combined updates as an update
- Optional<V> nv = combiner.combine(kd, ui);
- if (nv.isPresent()) {
- update(tx, Collections.singletonMap(kd, nv.get()));
- }
- } else {
- Optional<V> nv = combiner.combine(kd, concat(ui, currVal));
- Bytes newVal = nv.isPresent() ? Bytes.of(serializer.serialize(nv.get())) : null;
- if (newVal != null ^ currVal != null || (currVal != null && !currVal.equals(newVal))) {
- if (newVal == null) {
- tx.delete(currentValueRow, DATA_COLUMN);
- } else {
- tx.set(currentValueRow, DATA_COLUMN, newVal);
- }
-
- Optional<V> cvd = Optional.ofNullable(currVal).map(this::deserVal);
- updatesToReport.add(new Update<>(kd, cvd, nv));
- }
- }
- }
-
- // TODO could clear these as converted to objects to avoid double memory usage
- updates.clear();
- currentVals.clear();
-
- if (updatesToReport.size() > 0) {
- updateObserver.updatingValues(tx, updatesToReport.iterator());
- }
- }
-
- private static final Column DATA_COLUMN = new Column("data", "current");
-
- private Map<Bytes, Map<Column, Bytes>> getCurrentValues(TransactionBase tx, BytesBuilder prefix,
- Set<Bytes> keySet) {
-
- Set<Bytes> rows = new HashSet<>();
-
- int prefixLen = prefix.getLength();
- for (Bytes key : keySet) {
- prefix.setLength(prefixLen);
- rows.add(prefix.append(key).toBytes());
- }
-
- try {
- return tx.get(rows, Collections.singleton(DATA_COLUMN));
- } catch (IllegalArgumentException e) {
- System.out.println(rows.size());
- throw e;
- }
- }
-
- private Iterator<V> concat(Iterator<V> updates, Bytes currentVal) {
- if (currentVal == null) {
- return updates;
- }
-
- return Iterators.concat(updates, Iterators.singletonIterator(deserVal(currentVal)));
- }
-
- /**
- * This method will retrieve the current value for key and any outstanding updates and combine
- * them using the configured {@link Combiner}. The result from the combiner is returned.
- */
- public V get(SnapshotBase tx, K key) {
-
- byte[] k = serializer.serialize(key);
-
- int hash = Hashing.murmur3_32().hashBytes(k).asInt();
- String bucketId = BucketUtil.genBucketId(Math.abs(hash % numBuckets), numBuckets);
-
-
- BytesBuilder rowBuilder = Bytes.newBuilder();
- rowBuilder.append(mapId).append(":u:").append(bucketId).append(":").append(k);
-
- ScannerConfiguration sc = new ScannerConfiguration();
- sc.setSpan(Span.prefix(rowBuilder.toBytes()));
-
- RowIterator iter = tx.get(sc);
-
- Iterator<V> ui;
-
- if (iter.hasNext()) {
- ui = Iterators.transform(iter, e -> deserVal(e.getValue().next().getValue()));
- } else {
- ui = Collections.<V>emptyList().iterator();
- }
-
- rowBuilder.setLength(mapId.length());
- rowBuilder.append(":d:").append(bucketId).append(":").append(k);
-
- Bytes dataRow = rowBuilder.toBytes();
-
- Bytes cv = tx.get(dataRow, DATA_COLUMN);
-
- if (!ui.hasNext()) {
- if (cv == null) {
- return null;
- } else {
- return deserVal(cv);
- }
- }
-
- return combiner.combine(key, concat(ui, cv)).orElse(null);
- }
-
- String getId() {
- return mapId;
- }
-
- /**
- * Queues updates for a collision free map. These updates will be made by an Observer executing
- * another transaction. This method will not collide with other transaction queuing updates for
- * the same keys.
- *
- * @param tx This transaction will be used to make the updates.
- * @param updates The keys in the map should correspond to keys in the collision free map being
- * updated. The values in the map will be queued for updating.
- */
- public void update(TransactionBase tx, Map<K, V> updates) {
- Preconditions.checkState(numBuckets > 0, "Not initialized");
-
- Set<String> buckets = new HashSet<>();
-
- BytesBuilder rowBuilder = Bytes.newBuilder();
- rowBuilder.append(mapId).append(":u:");
- int prefixLength = rowBuilder.getLength();
-
- byte[] startTs = encSeq(tx.getStartTimestamp());
-
- for (Entry<K, V> entry : updates.entrySet()) {
- byte[] k = serializer.serialize(entry.getKey());
- int hash = Hashing.murmur3_32().hashBytes(k).asInt();
- String bucketId = BucketUtil.genBucketId(Math.abs(hash % numBuckets), numBuckets);
-
- // reset to the common row prefix
- rowBuilder.setLength(prefixLength);
-
- Bytes row = rowBuilder.append(bucketId).append(":").append(k).append(startTs).toBytes();
- Bytes val = Bytes.of(serializer.serialize(entry.getValue()));
-
- // TODO set if not exists would be comforting here.... but
- // collisions on bucketId+key+uuid should never occur
- tx.set(row, UPDATE_COL, val);
-
- buckets.add(bucketId);
- }
-
- for (String bucketId : buckets) {
- rowBuilder.setLength(prefixLength);
- rowBuilder.append(bucketId).append(":");
-
- Bytes row = rowBuilder.toBytes();
-
- tx.setWeakNotification(row, new Column("fluoRecipes", "cfm:" + mapId));
- }
- }
-
-
- public static <K2, V2> CollisionFreeMap<K2, V2> getInstance(String mapId,
- SimpleConfiguration appConf) {
- Options opts = new Options(mapId, appConf);
- try {
- return new CollisionFreeMap<>(opts, SimpleSerializer.getInstance(appConf));
- } catch (Exception e) {
- // TODO
- throw new RuntimeException(e);
- }
- }
-
- /**
- * A @link {@link CollisionFreeMap} stores data in its own data format in the Fluo table. When
- * initializing a Fluo table with something like Map Reduce or Spark, data will need to be written
- * in this format. Thats the purpose of this method, it provide a simple class that can do this
- * conversion.
- *
- */
- public static <K2, V2> Initializer<K2, V2> getInitializer(String mapId, int numBuckets,
- SimpleSerializer serializer) {
- return new Initializer<>(mapId, numBuckets, serializer);
- }
-
-
- /**
- * @see CollisionFreeMap#getInitializer(String, int, SimpleSerializer)
- */
- public static class Initializer<K2, V2> implements Serializable {
-
- private static final long serialVersionUID = 1L;
-
- private String mapId;
-
- private SimpleSerializer serializer;
-
- private int numBuckets = -1;
-
- private Initializer(String mapId, int numBuckets, SimpleSerializer serializer) {
- this.mapId = mapId;
- this.numBuckets = numBuckets;
- this.serializer = serializer;
- }
-
- public RowColumnValue convert(K2 key, V2 val) {
- byte[] k = serializer.serialize(key);
- int hash = Hashing.murmur3_32().hashBytes(k).asInt();
- String bucketId = BucketUtil.genBucketId(Math.abs(hash % numBuckets), numBuckets);
-
- BytesBuilder bb = Bytes.newBuilder();
- Bytes row = bb.append(mapId).append(":d:").append(bucketId).append(":").append(k).toBytes();
- byte[] v = serializer.serialize(val);
-
- return new RowColumnValue(row, DATA_COLUMN, Bytes.of(v));
- }
- }
-
- public static class Options {
-
- static final long DEFAULT_BUFFER_SIZE = 1 << 22;
- static final int DEFAULT_BUCKETS_PER_TABLET = 10;
-
- int numBuckets;
- Integer bucketsPerTablet = null;
-
- Long bufferSize;
-
- String keyType;
- String valueType;
- String combinerType;
- String updateObserverType;
- String mapId;
-
- private static final String PREFIX = "recipes.cfm.";
-
- Options(String mapId, SimpleConfiguration appConfig) {
- this.mapId = mapId;
-
- this.numBuckets = appConfig.getInt(PREFIX + mapId + ".buckets");
- this.combinerType = appConfig.getString(PREFIX + mapId + ".combiner");
- this.keyType = appConfig.getString(PREFIX + mapId + ".key");
- this.valueType = appConfig.getString(PREFIX + mapId + ".val");
- this.updateObserverType = appConfig.getString(PREFIX + mapId + ".updateObserver", null);
- this.bufferSize = appConfig.getLong(PREFIX + mapId + ".bufferSize", DEFAULT_BUFFER_SIZE);
- this.bucketsPerTablet =
- appConfig.getInt(PREFIX + mapId + ".bucketsPerTablet", DEFAULT_BUCKETS_PER_TABLET);
- }
-
- public Options(String mapId, String combinerType, String keyType, String valType, int buckets) {
- Preconditions.checkArgument(buckets > 0);
- Preconditions.checkArgument(!mapId.contains(":"), "Map id cannot contain ':'");
-
- this.mapId = mapId;
- this.numBuckets = buckets;
- this.combinerType = combinerType;
- this.updateObserverType = null;
- this.keyType = keyType;
- this.valueType = valType;
- }
-
- public Options(String mapId, String combinerType, String updateObserverType, String keyType,
- String valueType, int buckets) {
- Preconditions.checkArgument(buckets > 0);
- Preconditions.checkArgument(!mapId.contains(":"), "Map id cannot contain ':'");
-
- this.mapId = mapId;
- this.numBuckets = buckets;
- this.combinerType = combinerType;
- this.updateObserverType = updateObserverType;
- this.keyType = keyType;
- this.valueType = valueType;
- }
-
- /**
- * Sets a limit on the amount of serialized updates to read into memory. Additional memory will
- * be used to actually deserialize and process the updates. This limit does not account for
- * object overhead in java, which can be significant.
- *
- * <p>
- * The way memory read is calculated is by summing the length of serialized key and value byte
- * arrays. Once this sum exceeds the configured memory limit, no more update key values are
- * processed in the current transaction. When not everything is processed, the observer
- * processing updates will notify itself causing another transaction to continue processing
- * later
- */
- public Options setBufferSize(long bufferSize) {
- Preconditions.checkArgument(bufferSize > 0, "Buffer size must be positive");
- this.bufferSize = bufferSize;
- return this;
- }
-
- long getBufferSize() {
- if (bufferSize == null) {
- return DEFAULT_BUFFER_SIZE;
- }
-
- return bufferSize;
- }
-
- /**
- * Sets the number of buckets per tablet to generate. This affects how many split points will be
- * generated when optimizing the Accumulo table.
- *
- */
- public Options setBucketsPerTablet(int bucketsPerTablet) {
- Preconditions.checkArgument(bucketsPerTablet > 0, "bucketsPerTablet is <= 0 : "
- + bucketsPerTablet);
- this.bucketsPerTablet = bucketsPerTablet;
- return this;
- }
-
- int getBucketsPerTablet() {
- if (bucketsPerTablet == null) {
- return DEFAULT_BUCKETS_PER_TABLET;
- }
-
- return bucketsPerTablet;
- }
-
- public <K, V> Options(String mapId, Class<? extends Combiner<K, V>> combiner, Class<K> keyType,
- Class<V> valueType, int buckets) {
- this(mapId, combiner.getName(), keyType.getName(), valueType.getName(), buckets);
- }
-
- public <K, V> Options(String mapId, Class<? extends Combiner<K, V>> combiner,
- Class<? extends UpdateObserver<K, V>> updateObserver, Class<K> keyType, Class<V> valueType,
- int buckets) {
- this(mapId, combiner.getName(), updateObserver.getName(), keyType.getName(), valueType
- .getName(), buckets);
- }
-
- void save(SimpleConfiguration appConfig) {
- appConfig.setProperty(PREFIX + mapId + ".buckets", numBuckets + "");
- appConfig.setProperty(PREFIX + mapId + ".combiner", combinerType + "");
- appConfig.setProperty(PREFIX + mapId + ".key", keyType);
- appConfig.setProperty(PREFIX + mapId + ".val", valueType);
- if (updateObserverType != null) {
- appConfig.setProperty(PREFIX + mapId + ".updateObserver", updateObserverType + "");
- }
- if (bufferSize != null) {
- appConfig.setProperty(PREFIX + mapId + ".bufferSize", bufferSize);
- }
- if (bucketsPerTablet != null) {
- appConfig.setProperty(PREFIX + mapId + ".bucketsPerTablet", bucketsPerTablet);
- }
- }
- }
-
- /**
- * This method configures a collision free map for use. It must be called before initializing
- * Fluo.
- */
- public static void configure(FluoConfiguration fluoConfig, Options opts) {
- opts.save(fluoConfig.getAppConfiguration());
- fluoConfig.addObserver(new ObserverConfiguration(CollisionFreeMapObserver.class.getName())
- .setParameters(ImmutableMap.of("mapId", opts.mapId)));
-
- Bytes dataRangeEnd = Bytes.of(opts.mapId + DATA_RANGE_END);
- Bytes updateRangeEnd = Bytes.of(opts.mapId + UPDATE_RANGE_END);
-
- new TransientRegistry(fluoConfig.getAppConfiguration()).addTransientRange("cfm." + opts.mapId,
- new RowRange(dataRangeEnd, updateRangeEnd));
- }
-
- /**
- * Return suggested Fluo table optimizations for all previously configured collision free maps.
- *
- * @param appConfig Must pass in the application configuration obtained from
- * {@code FluoClient.getAppConfiguration()} or
- * {@code FluoConfiguration.getAppConfiguration()}
- */
- public static Pirtos getTableOptimizations(SimpleConfiguration appConfig) {
- HashSet<String> mapIds = new HashSet<>();
- appConfig.getKeys(Options.PREFIX.substring(0, Options.PREFIX.length() - 1)).forEachRemaining(
- k -> mapIds.add(k.substring(Options.PREFIX.length()).split("\\.", 2)[0]));
-
- Pirtos pirtos = new Pirtos();
- mapIds.forEach(mid -> pirtos.merge(getTableOptimizations(mid, appConfig)));
-
- return pirtos;
- }
-
- /**
- * Return suggested Fluo table optimizations for the specified collisiong free map.
- *
- * @param appConfig Must pass in the application configuration obtained from
- * {@code FluoClient.getAppConfiguration()} or
- * {@code FluoConfiguration.getAppConfiguration()}
- */
- public static Pirtos getTableOptimizations(String mapId, SimpleConfiguration appConfig) {
- Options opts = new Options(mapId, appConfig);
-
- BytesBuilder rowBuilder = Bytes.newBuilder();
- rowBuilder.append(mapId);
-
- List<Bytes> dataSplits = new ArrayList<>();
- for (int i = opts.getBucketsPerTablet(); i < opts.numBuckets; i += opts.getBucketsPerTablet()) {
- String bucketId = BucketUtil.genBucketId(i, opts.numBuckets);
- rowBuilder.setLength(mapId.length());
- dataSplits.add(rowBuilder.append(":d:").append(bucketId).toBytes());
- }
- Collections.sort(dataSplits);
-
- List<Bytes> updateSplits = new ArrayList<>();
- for (int i = opts.getBucketsPerTablet(); i < opts.numBuckets; i += opts.getBucketsPerTablet()) {
- String bucketId = BucketUtil.genBucketId(i, opts.numBuckets);
- rowBuilder.setLength(mapId.length());
- updateSplits.add(rowBuilder.append(":u:").append(bucketId).toBytes());
- }
- Collections.sort(updateSplits);
-
- Bytes dataRangeEnd = Bytes.of(opts.mapId + DATA_RANGE_END);
- Bytes updateRangeEnd = Bytes.of(opts.mapId + UPDATE_RANGE_END);
-
- List<Bytes> splits = new ArrayList<>();
- splits.add(dataRangeEnd);
- splits.add(updateRangeEnd);
- splits.addAll(dataSplits);
- splits.addAll(updateSplits);
-
- Pirtos pirtos = new Pirtos();
- pirtos.setSplits(splits);
-
- pirtos.setTabletGroupingRegex(Pattern.quote(mapId + ":") + "[du]:");
-
- return pirtos;
- }
-
- private static byte[] encSeq(long l) {
- byte[] ret = new byte[8];
- ret[0] = (byte) (l >>> 56);
- ret[1] = (byte) (l >>> 48);
- ret[2] = (byte) (l >>> 40);
- ret[3] = (byte) (l >>> 32);
- ret[4] = (byte) (l >>> 24);
- ret[5] = (byte) (l >>> 16);
- ret[6] = (byte) (l >>> 8);
- ret[7] = (byte) (l >>> 0);
- return ret;
- }
-}