You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@fluo.apache.org by mw...@apache.org on 2016/07/07 20:44:02 UTC
[1/3] incubator-fluo git commit: Fixes #696 - Moved type layer from
API to Fluo Recipes
Repository: incubator-fluo
Updated Branches:
refs/heads/master 7f85e2827 -> b2c91b95b
Fixes #696 - Moved type layer from API to Fluo Recipes
Project: http://git-wip-us.apache.org/repos/asf/incubator-fluo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-fluo/commit/e84b4f1e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-fluo/tree/e84b4f1e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-fluo/diff/e84b4f1e
Branch: refs/heads/master
Commit: e84b4f1e671ee6ae7bb919934a3c54f1931e32a1
Parents: 7f85e28
Author: Mike Walch <mw...@gmail.com>
Authored: Fri Jul 1 12:46:35 2016 -0400
Committer: Mike Walch <mw...@gmail.com>
Committed: Thu Jul 7 15:55:56 2016 -0400
----------------------------------------------------------------------
.../java/org/apache/fluo/api/types/Encoder.java | 86 ---
.../apache/fluo/api/types/StringEncoder.java | 86 ---
.../org/apache/fluo/api/types/TypeLayer.java | 488 ----------------
.../org/apache/fluo/api/types/TypedLoader.java | 45 --
.../apache/fluo/api/types/TypedObserver.java | 46 --
.../apache/fluo/api/types/TypedSnapshot.java | 38 --
.../fluo/api/types/TypedSnapshotBase.java | 562 -------------------
.../apache/fluo/api/types/TypedTransaction.java | 46 --
.../fluo/api/types/TypedTransactionBase.java | 278 ---------
.../apache/fluo/core/types/MockSnapshot.java | 30 -
.../fluo/core/types/MockSnapshotBase.java | 202 -------
.../apache/fluo/core/types/MockTransaction.java | 36 --
.../fluo/core/types/MockTransactionBase.java | 90 ---
.../apache/fluo/core/types/TypeLayerTest.java | 498 ----------------
14 files changed, 2531 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/e84b4f1e/modules/api/src/main/java/org/apache/fluo/api/types/Encoder.java
----------------------------------------------------------------------
diff --git a/modules/api/src/main/java/org/apache/fluo/api/types/Encoder.java b/modules/api/src/main/java/org/apache/fluo/api/types/Encoder.java
deleted file mode 100644
index 886a03c..0000000
--- a/modules/api/src/main/java/org/apache/fluo/api/types/Encoder.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.fluo.api.types;
-
-import org.apache.fluo.api.data.Bytes;
-
-/**
- * Transforms Java primitives to and from bytes using desired encoding
- *
- * @since 1.0.0
- */
-public interface Encoder {
-
- /**
- * Encodes an integer to {@link Bytes}
- */
- Bytes encode(int i);
-
- /**
- * Encodes a long to {@link Bytes}
- */
- Bytes encode(long l);
-
- /**
- * Encodes a String to {@link Bytes}
- */
- Bytes encode(String s);
-
- /**
- * Encodes a float to {@link Bytes}
- */
- Bytes encode(float f);
-
- /**
- * Encodes a double to {@link Bytes}
- */
- Bytes encode(double d);
-
- /**
- * Encodes a boolean to {@link Bytes}
- */
- Bytes encode(boolean b);
-
- /**
- * Decodes an integer from {@link Bytes}
- */
- int decodeInteger(Bytes b);
-
- /**
- * Decodes a long from {@link Bytes}
- */
- long decodeLong(Bytes b);
-
- /**
- * Decodes a String from {@link Bytes}
- */
- String decodeString(Bytes b);
-
- /**
- * Decodes a float from {@link Bytes}
- */
- float decodeFloat(Bytes b);
-
- /**
- * Decodes a double from {@link Bytes}
- */
- double decodeDouble(Bytes b);
-
- /**
- * Decodes a boolean from {@link Bytes}
- */
- boolean decodeBoolean(Bytes b);
-}
http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/e84b4f1e/modules/api/src/main/java/org/apache/fluo/api/types/StringEncoder.java
----------------------------------------------------------------------
diff --git a/modules/api/src/main/java/org/apache/fluo/api/types/StringEncoder.java b/modules/api/src/main/java/org/apache/fluo/api/types/StringEncoder.java
deleted file mode 100644
index 4393f7f..0000000
--- a/modules/api/src/main/java/org/apache/fluo/api/types/StringEncoder.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.fluo.api.types;
-
-import org.apache.fluo.api.data.Bytes;
-
-/**
- * Transforms Java primitives to and from bytes using a String encoding
- *
- * @since 1.0.0
- */
-public class StringEncoder implements Encoder {
-
- @Override
- public Bytes encode(int i) {
- return encode(Integer.toString(i));
- }
-
- @Override
- public Bytes encode(long l) {
- return encode(Long.toString(l));
- }
-
- @Override
- public Bytes encode(String s) {
- return Bytes.of(s);
- }
-
- @Override
- public Bytes encode(float f) {
- return encode(Float.toString(f));
- }
-
- @Override
- public Bytes encode(double d) {
- return encode(Double.toString(d));
- }
-
- @Override
- public Bytes encode(boolean b) {
- return encode(Boolean.toString(b));
- }
-
- @Override
- public int decodeInteger(Bytes b) {
- return Integer.parseInt(decodeString(b));
- }
-
- @Override
- public long decodeLong(Bytes b) {
- return Long.parseLong(decodeString(b));
- }
-
- @Override
- public String decodeString(Bytes b) {
- return b.toString();
- }
-
- @Override
- public float decodeFloat(Bytes b) {
- return Float.parseFloat(decodeString(b));
- }
-
- @Override
- public double decodeDouble(Bytes b) {
- return Double.parseDouble(decodeString(b));
- }
-
- @Override
- public boolean decodeBoolean(Bytes b) {
- return Boolean.parseBoolean(decodeString(b));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/e84b4f1e/modules/api/src/main/java/org/apache/fluo/api/types/TypeLayer.java
----------------------------------------------------------------------
diff --git a/modules/api/src/main/java/org/apache/fluo/api/types/TypeLayer.java b/modules/api/src/main/java/org/apache/fluo/api/types/TypeLayer.java
deleted file mode 100644
index ca20485..0000000
--- a/modules/api/src/main/java/org/apache/fluo/api/types/TypeLayer.java
+++ /dev/null
@@ -1,488 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.fluo.api.types;
-
-import java.nio.ByteBuffer;
-
-import org.apache.fluo.api.client.Snapshot;
-import org.apache.fluo.api.client.Transaction;
-import org.apache.fluo.api.client.TransactionBase;
-import org.apache.fluo.api.data.Bytes;
-import org.apache.fluo.api.data.Column;
-
-/**
- * A simple convenience layer for Fluo. This layer attempts to make the following common operations
- * easier.
- *
- * <UL>
- * <LI>Working with different types.
- * <LI>Supplying default values
- * <LI>Dealing with null return types.
- * <LI>Working with row/column and column maps
- * </UL>
- *
- * <p>
- * This layer was intentionally loosely coupled with the basic API. This allows other convenience
- * layers for Fluo to build directly on the basic API w/o having to consider the particulars of this
- * layer. Also its expected that integration with other languages may only use the basic API.
- * </p>
- *
- * <h3>Using</h3>
- *
- * <p>
- * A TypeLayer is created with a certain encoder that is used for converting from bytes to
- * primitives and visa versa. In order to ensure that all of your code uses the same encoder, its
- * probably best to centralize the choice of an encoder within your project. There are many ways do
- * to this, below is an example of one way to centralize and use.
- * </p>
- *
- * <pre>
- * <code>
- *
- * public class MyTypeLayer extends TypeLayer {
- * public MyTypeLayer() {
- * super(new MyEncoder());
- * }
- * }
- *
- * public class MyObserver extends TypedObserver {
- * MyObserver(){
- * super(new MyTypeLayer());
- * }
- *
- * public abstract void process(TypedTransaction tx, Bytes row, Column col){
- * //do something w/ typed transaction
- * }
- * }
- *
- * public class MyUtil {
- * //A little util to print out some stuff
- * public void printStuff(Snapshot snap, byte[] row){
- * TypedSnapshot tsnap = new MytTypeLayer().wrap(snap);
- *
- * System.out.println(tsnap.get().row(row).fam("b90000").qual(137).toString("NP"));
- * }
- * }
- * </code>
- * </pre>
- *
- * <h3>Working with different types</h3>
- *
- * <p>
- * The following example code shows using the basic fluo API with different types.
- * </p>
- *
- * <pre>
- * <code>
- *
- * void process(Transaction tx, byte[] row, byte[] cf, int cq, long val){
- * tx.set(Bytes.of(row), new Column(Bytes.of(cf), Bytes.of(Integer.toString(cq))),
- * Bytes.of(Long.toString(val));
- * }
- * </code>
- * </pre>
- *
- * <p>
- * Alternatively, the same thing can be written using a {@link TypedTransactionBase} in the
- * following way. Because row(), fam(), qual(), and set() each take many different types, this
- * enables many different permutations that would not be achievable with overloading.
- * </p>
- *
- * <pre>
- * <code>
- *
- * void process(TypedTransaction tx, byte[] r, byte[] cf, int cq, long v){
- * tx.mutate().row(r).fam(cf).qual(cq).set(v);
- * }
- * </code>
- * </pre>
- *
- * <h3>Default values</h3>
- *
- * <p>
- * The following example code shows using the basic fluo API to read a value and default to zero if
- * it does not exist.
- * </p>
- *
- * <pre>
- * <code>
- *
- * void add(Transaction tx, byte[] row, Column col, long amount){
- *
- * long balance = 0;
- * Bytes bval = tx.get(Bytes.of(row), col);
- * if(bval != null)
- * balance = Long.parseLong(bval.toString());
- *
- * balance += amount;
- *
- * tx.set(Bytes.of(row), col, Bytes.of(Long.toString(amount)));
- *
- * }
- * </code>
- * </pre>
- *
- * <p>
- * Alternatively, the same thing can be written using a {@link TypedTransactionBase} in the
- * following way. This code avoids the null check by supplying a default value of zero.
- * </p>
- *
- * <pre>
- * <code>
- *
- * void add(TypedTransaction tx, byte[] r, Column c, long amount){
- * long balance = tx.get().row(r).col(c).toLong(0);
- * balance += amount;
- * tx.mutate().row(r).col(c).set(balance);
- * }
- * </code>
- * </pre>
- *
- * <p>
- * For this particular case, shorter code can be written by using the increment method.
- * </p>
- *
- * <pre>
- * <code>
- *
- * void add(TypedTransaction tx, byte[] r, Column c, long amount){
- * tx.mutate().row(r).col(c).increment(amount);
- * }
- * </code>
- * </pre>
- *
- * <h3>Null return types</h3>
- *
- * <p>
- * When using the basic API, you must ensure the return type is not null before converting a string
- * or long.
- * </p>
- *
- * <pre>
- * <code>
- *
- * void process(Transaction tx, byte[] row, Column col, long amount) {
- * Bytes val = tx.get(Bytes.of(row), col);
- * if(val == null)
- * return;
- * long balance = Long.parseLong(val.toString());
- * }
- * </code>
- * </pre>
- *
- * <p>
- * With {@link TypedTransactionBase} if no default value is supplied, then the null is passed
- * through.
- * </p>
- *
- * <pre>
- * <code>
- *
- * void process(TypedTransaction tx, byte[] r, Column c, long amount){
- * Long balance = tx.get().row(r).col(c).toLong();
- * if(balance == null)
- * return;
- * }
- * </code>
- * </pre>
- *
- * <h3>Defaulted maps</h3>
- *
- * <p>
- * The operations that return maps, return defaulted maps which make it easy to specify defaults and
- * avoid null.
- * </p>
- *
- * <pre>
- * {@code
- * // pretend this method has curly braces. javadoc has issues with less than.
- *
- * void process(TypedTransaction tx, byte[] r, Column c1, Column c2, Column c3, long amount)
- *
- * Map<Column, Value> columns = tx.get().row(r).columns(c1,c2,c3);
- *
- * // If c1 does not exist in map, a Value that wraps null will be returned.
- * // When c1 does not exist val1 will be set to null and no NPE will be thrown.
- * String val1 = columns.get(c1).toString();
- *
- * // If c2 does not exist in map, then val2 will be set to empty string.
- * String val2 = columns.get(c2).toString("");
- *
- * // If c3 does not exist in map, then val9 will be set to 9.
- * Long val3 = columns.get(c3).toLong(9);
- * }
- * </pre>
- *
- * <p>
- * This also applies to getting sets of rows.
- * </p>
- *
- * <pre>
- * {@code
- * // pretend this method has curly braces. javadoc has issues with less than.
- *
- * void process(TypedTransaction tx, List<String> rows, Column c1, Column c2, Column c3,
- * long amount)
- *
- * Map<String,Map<Column,Value>> rowCols =
- * tx.get().rowsString(rows).columns(c1,c2,c3).toStringMap();
- *
- * // this will set val1 to null if row does not exist in map and/or column does not
- * // exist in child map
- * String val1 = rowCols.get("row1").get(c1).toString();
- * }
- * </pre>
- *
- * @since 1.0.0
- */
-public class TypeLayer {
-
- private Encoder encoder;
-
- static class Data {
- Bytes row;
- Bytes family;
- Bytes qual;
- Bytes vis;
-
- Column getCol() {
- if (qual == null) {
- return new Column(family);
- } else if (vis == null) {
- return new Column(family, qual);
- } else {
- return new Column(family, qual, vis);
- }
- }
- }
-
- /**
- * @since 1.0.0
- */
- public abstract class RowMethods<R> {
-
- abstract R create(Data data);
-
- public R row(String row) {
- return row(encoder.encode(row));
- }
-
- public R row(int row) {
- return row(encoder.encode(row));
- }
-
- public R row(long row) {
- return row(encoder.encode(row));
- }
-
- public R row(byte[] row) {
- return row(Bytes.of(row));
- }
-
- public R row(ByteBuffer row) {
- return row(Bytes.of(row));
- }
-
- public R row(Bytes row) {
- Data data = new Data();
- data.row = row;
- R result = create(data);
- return result;
- }
- }
-
- /**
- * @since 1.0.0
- */
- public abstract class SimpleFamilyMethods<R1> {
-
- Data data;
-
- SimpleFamilyMethods(Data data) {
- this.data = data;
- }
-
- abstract R1 create1(Data data);
-
- public R1 fam(String family) {
- return fam(encoder.encode(family));
- }
-
- public R1 fam(int family) {
- return fam(encoder.encode(family));
- }
-
- public R1 fam(long family) {
- return fam(encoder.encode(family));
- }
-
- public R1 fam(byte[] family) {
- return fam(Bytes.of(family));
- }
-
- public R1 fam(ByteBuffer family) {
- return fam(Bytes.of(family));
- }
-
- public R1 fam(Bytes family) {
- data.family = family;
- return create1(data);
- }
- }
-
- /**
- * @since 1.0.0
- */
- public abstract class FamilyMethods<R1, R2> extends SimpleFamilyMethods<R1> {
-
- FamilyMethods(Data data) {
- super(data);
- }
-
- abstract R2 create2(Data data);
-
- public R2 col(Column col) {
- data.family = col.getFamily();
- data.qual = col.getQualifier();
- data.vis = col.getVisibility();
- return create2(data);
- }
- }
-
- /**
- * @since 1.0.0
- */
- public abstract class QualifierMethods<R> {
-
- private Data data;
-
- QualifierMethods(Data data) {
- this.data = data;
- }
-
- abstract R create(Data data);
-
- public R qual(String qualifier) {
- return qual(encoder.encode(qualifier));
- }
-
- public R qual(int qualifier) {
- return qual(encoder.encode(qualifier));
- }
-
- public R qual(long qualifier) {
- return qual(encoder.encode(qualifier));
- }
-
- public R qual(byte[] qualifier) {
- return qual(Bytes.of(qualifier));
- }
-
- public R qual(ByteBuffer qualifier) {
- return qual(Bytes.of(qualifier));
- }
-
- public R qual(Bytes qualifier) {
- data.qual = qualifier;
- return create(data);
- }
- }
-
- /**
- * @since 1.0.0
- */
- public static class VisibilityMethods {
-
- private Data data;
-
- VisibilityMethods(Data data) {
- this.data = data;
- }
-
- public Column vis() {
- return new Column(data.family, data.qual);
- }
-
- public Column vis(String cv) {
- return vis(Bytes.of(cv));
- }
-
- public Column vis(Bytes cv) {
- return new Column(data.family, data.qual, cv);
- }
-
- public Column vis(ByteBuffer cv) {
- return vis(Bytes.of(cv));
- }
-
- public Column vis(byte[] cv) {
- return vis(Bytes.of(cv));
- }
- }
-
- /**
- * @since 1.0.0
- */
- public class CQB extends QualifierMethods<VisibilityMethods> {
- CQB(Data data) {
- super(data);
- }
-
- @Override
- VisibilityMethods create(Data data) {
- return new VisibilityMethods(data);
- }
- }
-
- /**
- * @since 1.0.0
- */
- public class CFB extends SimpleFamilyMethods<CQB> {
- CFB() {
- super(new Data());
- }
-
- @Override
- CQB create1(Data data) {
- return new CQB(data);
- }
- }
-
- public TypeLayer(Encoder encoder) {
- this.encoder = encoder;
- }
-
- /**
- * Initiates the chain of calls needed to build a column.
- *
- * @return a column builder
- */
- public CFB bc() {
- return new CFB();
- }
-
- public TypedSnapshot wrap(Snapshot snap) {
- return new TypedSnapshot(snap, encoder, this);
- }
-
- public TypedTransactionBase wrap(TransactionBase tx) {
- return new TypedTransactionBase(tx, encoder, this);
- }
-
- public TypedTransaction wrap(Transaction tx) {
- return new TypedTransaction(tx, encoder, this);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/e84b4f1e/modules/api/src/main/java/org/apache/fluo/api/types/TypedLoader.java
----------------------------------------------------------------------
diff --git a/modules/api/src/main/java/org/apache/fluo/api/types/TypedLoader.java b/modules/api/src/main/java/org/apache/fluo/api/types/TypedLoader.java
deleted file mode 100644
index 86c31bb..0000000
--- a/modules/api/src/main/java/org/apache/fluo/api/types/TypedLoader.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.fluo.api.types;
-
-import org.apache.fluo.api.client.Loader;
-import org.apache.fluo.api.client.TransactionBase;
-
-/**
- * A {@link Loader} that uses a {@link TypeLayer}
- *
- * @since 1.0.0
- */
-public abstract class TypedLoader implements Loader {
-
- private final TypeLayer tl;
-
- public TypedLoader() {
- tl = new TypeLayer(new StringEncoder());
- }
-
- public TypedLoader(TypeLayer tl) {
- this.tl = tl;
- }
-
- @Override
- public void load(TransactionBase tx, Context context) throws Exception {
- load(tl.wrap(tx), context);
- }
-
- public abstract void load(TypedTransactionBase tx, Context context) throws Exception;
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/e84b4f1e/modules/api/src/main/java/org/apache/fluo/api/types/TypedObserver.java
----------------------------------------------------------------------
diff --git a/modules/api/src/main/java/org/apache/fluo/api/types/TypedObserver.java b/modules/api/src/main/java/org/apache/fluo/api/types/TypedObserver.java
deleted file mode 100644
index 302f382..0000000
--- a/modules/api/src/main/java/org/apache/fluo/api/types/TypedObserver.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.fluo.api.types;
-
-import org.apache.fluo.api.client.TransactionBase;
-import org.apache.fluo.api.data.Bytes;
-import org.apache.fluo.api.data.Column;
-import org.apache.fluo.api.observer.AbstractObserver;
-
-/**
- * An {@link AbstractObserver} that uses a {@link TypeLayer}
- *
- * @since 1.0.0
- */
-public abstract class TypedObserver extends AbstractObserver {
-
- private final TypeLayer tl;
-
- public TypedObserver() {
- tl = new TypeLayer(new StringEncoder());
- }
-
- public TypedObserver(TypeLayer tl) {
- this.tl = tl;
- }
-
- @Override
- public void process(TransactionBase tx, Bytes row, Column col) {
- process(tl.wrap(tx), row, col);
- }
-
- public abstract void process(TypedTransactionBase tx, Bytes row, Column col);
-}
http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/e84b4f1e/modules/api/src/main/java/org/apache/fluo/api/types/TypedSnapshot.java
----------------------------------------------------------------------
diff --git a/modules/api/src/main/java/org/apache/fluo/api/types/TypedSnapshot.java b/modules/api/src/main/java/org/apache/fluo/api/types/TypedSnapshot.java
deleted file mode 100644
index 97c7f1c..0000000
--- a/modules/api/src/main/java/org/apache/fluo/api/types/TypedSnapshot.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.fluo.api.types;
-
-import org.apache.fluo.api.client.Snapshot;
-
-/**
- * A {@link Snapshot} that uses a {@link TypeLayer}
- *
- * @since 1.0.0
- */
-public class TypedSnapshot extends TypedSnapshotBase implements Snapshot {
-
- private final Snapshot closeSnapshot;
-
- TypedSnapshot(Snapshot snapshot, Encoder encoder, TypeLayer tl) {
- super(snapshot, encoder, tl);
- closeSnapshot = snapshot;
- }
-
- @Override
- public void close() {
- closeSnapshot.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/e84b4f1e/modules/api/src/main/java/org/apache/fluo/api/types/TypedSnapshotBase.java
----------------------------------------------------------------------
diff --git a/modules/api/src/main/java/org/apache/fluo/api/types/TypedSnapshotBase.java b/modules/api/src/main/java/org/apache/fluo/api/types/TypedSnapshotBase.java
deleted file mode 100644
index 95c46a1..0000000
--- a/modules/api/src/main/java/org/apache/fluo/api/types/TypedSnapshotBase.java
+++ /dev/null
@@ -1,562 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.fluo.api.types;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-
-import com.google.common.base.Function;
-import com.google.common.collect.Maps;
-import org.apache.commons.collections.map.DefaultedMap;
-import org.apache.fluo.api.client.SnapshotBase;
-import org.apache.fluo.api.config.ScannerConfiguration;
-import org.apache.fluo.api.data.Bytes;
-import org.apache.fluo.api.data.Column;
-import org.apache.fluo.api.data.RowColumn;
-import org.apache.fluo.api.iterator.RowIterator;
-import org.apache.fluo.api.types.TypeLayer.Data;
-import org.apache.fluo.api.types.TypeLayer.FamilyMethods;
-import org.apache.fluo.api.types.TypeLayer.QualifierMethods;
-import org.apache.fluo.api.types.TypeLayer.RowMethods;
-
-// TODO need to refactor column to use Encoder
-
-/**
- * A {@link SnapshotBase} that uses a {@link TypeLayer}
- *
- * @since 1.0.0
- */
-public class TypedSnapshotBase implements SnapshotBase {
-
- private SnapshotBase snapshot;
- private Encoder encoder;
- private TypeLayer tl;
-
- /**
- * @since 1.0.0
- */
- public class VisibilityMethods extends Value {
-
- VisibilityMethods(Data data) {
- super(data);
- }
-
- public Value vis(Bytes cv) {
- data.vis = cv;
- return new Value(data);
- }
-
- public Value vis(byte[] cv) {
- data.vis = Bytes.of(cv);
- return new Value(data);
- }
-
- public Value vis(ByteBuffer bb) {
- data.vis = Bytes.of(bb);
- return new Value(data);
- }
-
- public Value vis(String cv) {
- data.vis = Bytes.of(cv);
- return new Value(data);
- }
- }
-
- /**
- * @since 1.0.0
- */
- public class Value {
- private Bytes bytes;
- private boolean gotBytes = false;
- Data data;
-
- public Bytes getBytes() {
- if (!gotBytes) {
- try {
- bytes = snapshot.get(data.row, data.getCol());
- gotBytes = true;
- } catch (Exception e) {
- if (e instanceof RuntimeException) {
- throw (RuntimeException) e;
- }
- throw new RuntimeException(e);
- }
- }
-
- return bytes;
- }
-
- private Value(Bytes bytes) {
- this.bytes = bytes;
- this.gotBytes = true;
- }
-
- private Value(Data data) {
- this.data = data;
- this.gotBytes = false;
- }
-
- public Integer toInteger() {
- if (getBytes() == null) {
- return null;
- }
- return encoder.decodeInteger(getBytes());
- }
-
- public int toInteger(int defaultValue) {
- if (getBytes() == null) {
- return defaultValue;
- }
- return encoder.decodeInteger(getBytes());
- }
-
- public Long toLong() {
- if (getBytes() == null) {
- return null;
- }
- return encoder.decodeLong(getBytes());
- }
-
- public long toLong(long defaultValue) {
- if (getBytes() == null) {
- return defaultValue;
- }
- return encoder.decodeLong(getBytes());
- }
-
- @Override
- public String toString() {
- if (getBytes() == null) {
- return null;
- }
- return encoder.decodeString(getBytes());
- }
-
- public String toString(String defaultValue) {
- if (getBytes() == null) {
- return defaultValue;
- }
- return encoder.decodeString(getBytes());
- }
-
- public Float toFloat() {
- if (getBytes() == null) {
- return null;
- }
- return encoder.decodeFloat(getBytes());
- }
-
- public float toFloat(float defaultValue) {
- if (getBytes() == null) {
- return defaultValue;
- }
- return encoder.decodeFloat(getBytes());
- }
-
- public Double toDouble() {
- if (getBytes() == null) {
- return null;
- }
- return encoder.decodeDouble(getBytes());
- }
-
- public double toDouble(double defaultValue) {
- if (getBytes() == null) {
- return defaultValue;
- }
- return encoder.decodeDouble(getBytes());
- }
-
- public Boolean toBoolean() {
- if (getBytes() == null) {
- return null;
- }
- return encoder.decodeBoolean(getBytes());
- }
-
- public boolean toBoolean(boolean defaultValue) {
- if (getBytes() == null) {
- return defaultValue;
- }
- return encoder.decodeBoolean(getBytes());
- }
-
- public byte[] toBytes() {
- if (getBytes() == null) {
- return null;
- }
- return getBytes().toArray();
- }
-
- public byte[] toBytes(byte[] defaultValue) {
- if (getBytes() == null) {
- return defaultValue;
- }
- return getBytes().toArray();
- }
-
- public ByteBuffer toByteBuffer() {
- if (getBytes() == null) {
- return null;
- }
- return ByteBuffer.wrap(getBytes().toArray());
- }
-
- public ByteBuffer toByteBuffer(ByteBuffer defaultValue) {
- if (getBytes() == null) {
- return defaultValue;
- }
- return toByteBuffer();
- }
-
- @Override
- public int hashCode() {
- if (getBytes() == null) {
- return 0;
- }
-
- return getBytes().hashCode();
- }
-
- @Override
- public boolean equals(Object o) {
- if (o instanceof Value) {
- Value ov = (Value) o;
- if (getBytes() == null) {
- return ov.getBytes() == null;
- } else {
- return getBytes().equals(ov.getBytes());
- }
- }
-
- return false;
- }
- }
-
- /**
- * @since 1.0.0
- */
- public class ValueQualifierBuilder extends QualifierMethods<VisibilityMethods> {
-
- ValueQualifierBuilder(Data data) {
- tl.super(data);
- }
-
- @Override
- VisibilityMethods create(Data data) {
- return new VisibilityMethods(data);
- }
- }
-
- /**
- * @since 1.0.0
- */
- public class ValueFamilyMethods extends FamilyMethods<ValueQualifierBuilder, Value> {
-
- ValueFamilyMethods(Data data) {
- tl.super(data);
- }
-
- @Override
- ValueQualifierBuilder create1(Data data) {
- return new ValueQualifierBuilder(data);
- }
-
- @Override
- Value create2(Data data) {
- return new Value(data);
- }
-
- public Map<Column, Value> columns(Set<Column> columns) {
- try {
- return wrap(snapshot.get(data.row, columns));
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- public Map<Column, Value> columns(Column... columns) {
- try {
- return wrap(snapshot.get(data.row, new HashSet<>(Arrays.asList(columns))));
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- }
-
- /**
- * @since 1.0.0
- */
- public class MapConverter {
- private Collection<Bytes> rows;
- private Set<Column> columns;
-
- public MapConverter(Collection<Bytes> rows, Set<Column> columns) {
- this.rows = rows;
- this.columns = columns;
- }
-
- private Map<Bytes, Map<Column, Bytes>> getInput() {
- try {
- return snapshot.get(rows, columns);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- @SuppressWarnings({"rawtypes", "unchecked"})
- private Map wrap2(Map m) {
- return Collections.unmodifiableMap(DefaultedMap.decorate(m, new DefaultedMap(new Value(
- (Bytes) null))));
- }
-
- @SuppressWarnings("unchecked")
- public Map<String, Map<Column, Value>> toStringMap() {
- Map<Bytes, Map<Column, Bytes>> in = getInput();
- Map<String, Map<Column, Value>> out = new HashMap<>();
-
- for (Entry<Bytes, Map<Column, Bytes>> rowEntry : in.entrySet()) {
- out.put(encoder.decodeString(rowEntry.getKey()), wrap(rowEntry.getValue()));
- }
-
- return wrap2(out);
- }
-
- @SuppressWarnings("unchecked")
- public Map<Long, Map<Column, Value>> toLongMap() {
- Map<Bytes, Map<Column, Bytes>> in = getInput();
- Map<Long, Map<Column, Value>> out = new HashMap<>();
-
- for (Entry<Bytes, Map<Column, Bytes>> rowEntry : in.entrySet()) {
- out.put(encoder.decodeLong(rowEntry.getKey()), wrap(rowEntry.getValue()));
- }
-
- return wrap2(out);
- }
-
- @SuppressWarnings("unchecked")
- public Map<Integer, Map<Column, Value>> toIntegerMap() {
- Map<Bytes, Map<Column, Bytes>> in = getInput();
- Map<Integer, Map<Column, Value>> out = new HashMap<>();
-
- for (Entry<Bytes, Map<Column, Bytes>> rowEntry : in.entrySet()) {
- out.put(encoder.decodeInteger(rowEntry.getKey()), wrap(rowEntry.getValue()));
- }
-
- return wrap2(out);
- }
-
- @SuppressWarnings("unchecked")
- public Map<Bytes, Map<Column, Value>> toBytesMap() {
- Map<Bytes, Map<Column, Bytes>> in = getInput();
- Map<Bytes, Map<Column, Value>> out = new HashMap<>();
-
- for (Entry<Bytes, Map<Column, Bytes>> rowEntry : in.entrySet()) {
- out.put(rowEntry.getKey(), wrap(rowEntry.getValue()));
- }
-
- return wrap2(out);
- }
- }
-
- /**
- * @since 1.0.0
- */
- public class ColumnsMethods {
- private Collection<Bytes> rows;
-
- public ColumnsMethods(Collection<Bytes> rows) {
- this.rows = rows;
- }
-
- public MapConverter columns(Set<Column> columns) {
- return new MapConverter(rows, columns);
- }
-
- public MapConverter columns(Column... columns) {
- return columns(new HashSet<>(Arrays.asList(columns)));
- }
- }
-
- /**
- * @since 1.0.0
- */
- public class ValueRowMethods extends RowMethods<ValueFamilyMethods> {
-
- ValueRowMethods() {
- tl.super();
- }
-
- @Override
- ValueFamilyMethods create(Data data) {
- return new ValueFamilyMethods(data);
- }
-
- public ColumnsMethods rows(Collection<Bytes> rows) {
- return new ColumnsMethods(rows);
- }
-
- public ColumnsMethods rows(Bytes... rows) {
- return new ColumnsMethods(Arrays.asList(rows));
- }
-
- public ColumnsMethods rowsString(String... rows) {
- return rowsString(Arrays.asList(rows));
- }
-
- public ColumnsMethods rowsString(Collection<String> rows) {
- ArrayList<Bytes> conv = new ArrayList<>();
- for (String row : rows) {
- conv.add(encoder.encode(row));
- }
-
- return rows(conv);
- }
-
- public ColumnsMethods rowsLong(Long... rows) {
- return rowsLong(Arrays.asList(rows));
- }
-
- public ColumnsMethods rowsLong(Collection<Long> rows) {
- ArrayList<Bytes> conv = new ArrayList<>();
- for (Long row : rows) {
- conv.add(encoder.encode(row));
- }
-
- return rows(conv);
- }
-
- public ColumnsMethods rowsInteger(Integer... rows) {
- return rowsInteger(Arrays.asList(rows));
- }
-
- public ColumnsMethods rowsInteger(Collection<Integer> rows) {
- ArrayList<Bytes> conv = new ArrayList<>();
- for (Integer row : rows) {
- conv.add(encoder.encode(row));
- }
-
- return rows(conv);
- }
-
- public ColumnsMethods rowsBytes(byte[]... rows) {
- return rowsBytes(Arrays.asList(rows));
- }
-
- public ColumnsMethods rowsBytes(Collection<byte[]> rows) {
- ArrayList<Bytes> conv = new ArrayList<>();
- for (byte[] row : rows) {
- conv.add(Bytes.of(row));
- }
-
- return rows(conv);
- }
-
- public ColumnsMethods rowsByteBuffers(ByteBuffer... rows) {
- return rowsByteBuffers(Arrays.asList(rows));
- }
-
- public ColumnsMethods rowsByteBuffers(Collection<ByteBuffer> rows) {
- ArrayList<Bytes> conv = new ArrayList<>();
- for (ByteBuffer row : rows) {
- conv.add(Bytes.of(row));
- }
-
- return rows(conv);
- }
-
- }
-
- TypedSnapshotBase(SnapshotBase snapshot, Encoder encoder, TypeLayer tl) {
- this.snapshot = snapshot;
- this.encoder = encoder;
- this.tl = tl;
- }
-
- @Override
- public Bytes get(Bytes row, Column column) {
- return snapshot.get(row, column);
- }
-
- @Override
- public Map<Column, Bytes> get(Bytes row, Set<Column> columns) {
- return snapshot.get(row, columns);
- }
-
- @Override
- public Map<Bytes, Map<Column, Bytes>> get(Collection<RowColumn> rowColumns) {
- return snapshot.get(rowColumns);
- }
-
- @Override
- public RowIterator get(ScannerConfiguration config) {
- return snapshot.get(config);
- }
-
- @Override
- public Map<Bytes, Map<Column, Bytes>> get(Collection<Bytes> rows, Set<Column> columns) {
- return snapshot.get(rows, columns);
- }
-
- public ValueRowMethods get() {
- return new ValueRowMethods();
- }
-
- @SuppressWarnings({"unchecked"})
- private Map<Column, Value> wrap(Map<Column, Bytes> map) {
- Map<Column, Value> ret = Maps.transformValues(map, new Function<Bytes, Value>() {
- @Override
- public Value apply(Bytes input) {
- return new Value(input);
- }
- });
-
- return Collections.unmodifiableMap(DefaultedMap.decorate(ret, new Value((Bytes) null)));
- }
-
- @Override
- public long getStartTimestamp() {
- return snapshot.getStartTimestamp();
- }
-
- @Override
- public String gets(String row, Column column) {
- return snapshot.gets(row, column);
- }
-
- @Override
- public Map<Column, String> gets(String row, Set<Column> columns) {
- return snapshot.gets(row, columns);
- }
-
- @Override
- public Map<String, Map<Column, String>> gets(Collection<String> rows, Set<Column> columns) {
- return snapshot.gets(rows, columns);
- }
-
- @Override
- public Map<String, Map<Column, String>> gets(Collection<RowColumn> rowColumns) {
- return snapshot.gets(rowColumns);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/e84b4f1e/modules/api/src/main/java/org/apache/fluo/api/types/TypedTransaction.java
----------------------------------------------------------------------
diff --git a/modules/api/src/main/java/org/apache/fluo/api/types/TypedTransaction.java b/modules/api/src/main/java/org/apache/fluo/api/types/TypedTransaction.java
deleted file mode 100644
index 62b32c2..0000000
--- a/modules/api/src/main/java/org/apache/fluo/api/types/TypedTransaction.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.fluo.api.types;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.fluo.api.client.Transaction;
-import org.apache.fluo.api.exceptions.CommitException;
-
-/**
- * A {@link Transaction} that uses a {@link TypeLayer}
- *
- * @since 1.0.0
- */
-public class TypedTransaction extends TypedTransactionBase implements Transaction {
-
- private final Transaction closeTx;
-
- @VisibleForTesting
- protected TypedTransaction(Transaction tx, Encoder encoder, TypeLayer tl) {
- super(tx, encoder, tl);
- closeTx = tx;
- }
-
- @Override
- public void commit() throws CommitException {
- closeTx.commit();
- }
-
- @Override
- public void close() {
- closeTx.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/e84b4f1e/modules/api/src/main/java/org/apache/fluo/api/types/TypedTransactionBase.java
----------------------------------------------------------------------
diff --git a/modules/api/src/main/java/org/apache/fluo/api/types/TypedTransactionBase.java b/modules/api/src/main/java/org/apache/fluo/api/types/TypedTransactionBase.java
deleted file mode 100644
index a45bff1..0000000
--- a/modules/api/src/main/java/org/apache/fluo/api/types/TypedTransactionBase.java
+++ /dev/null
@@ -1,278 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.fluo.api.types;
-
-import java.nio.ByteBuffer;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.fluo.api.client.TransactionBase;
-import org.apache.fluo.api.data.Bytes;
-import org.apache.fluo.api.data.Column;
-import org.apache.fluo.api.exceptions.AlreadySetException;
-import org.apache.fluo.api.types.TypeLayer.Data;
-import org.apache.fluo.api.types.TypeLayer.FamilyMethods;
-import org.apache.fluo.api.types.TypeLayer.QualifierMethods;
-import org.apache.fluo.api.types.TypeLayer.RowMethods;
-
-/**
- * A {@link TransactionBase} that uses a {@link TypeLayer}
- *
- * @since 1.0.0
- */
-public class TypedTransactionBase extends TypedSnapshotBase implements TransactionBase {
-
- private final TransactionBase tx;
- private final Encoder encoder;
- private final TypeLayer tl;
-
- /**
- * @since 1.0.0
- */
- public class Mutator {
-
- private boolean set = false;
- Data data;
-
- Mutator(Data data) {
- this.data = data;
- }
-
- void checkNotSet() {
- if (set) {
- throw new IllegalStateException("Already set value");
- }
- }
-
- public void set(Bytes bytes) throws AlreadySetException {
- checkNotSet();
- tx.set(data.row, data.getCol(), bytes);
- set = true;
- }
-
- public void set(String s) throws AlreadySetException {
- set(encoder.encode(s));
- }
-
- public void set(int i) throws AlreadySetException {
- set(encoder.encode(i));
- }
-
- public void set(long l) throws AlreadySetException {
- set(encoder.encode(l));
- }
-
- public void set(float f) throws AlreadySetException {
- set(encoder.encode(f));
- }
-
- public void set(double d) throws AlreadySetException {
- set(encoder.encode(d));
- }
-
- public void set(boolean b) throws AlreadySetException {
- set(encoder.encode(b));
- }
-
- public void set(byte[] ba) throws AlreadySetException {
- set(Bytes.of(ba));
- }
-
- public void set(ByteBuffer bb) throws AlreadySetException {
- set(Bytes.of(bb));
- }
-
- /**
- * Set an empty value
- */
- public void set() throws AlreadySetException {
- set(Bytes.EMPTY);
- }
-
- /**
- * Reads the current value of the row/column, adds i, sets the sum. If the row/column does not
- * have a current value, then it defaults to zero.
- *
- * @param i Integer increment amount
- * @throws AlreadySetException if value was previously set in transaction
- */
- public void increment(int i) throws AlreadySetException {
- checkNotSet();
- Bytes val = tx.get(data.row, data.getCol());
- int v = 0;
- if (val != null) {
- v = encoder.decodeInteger(val);
- }
- tx.set(data.row, data.getCol(), encoder.encode(v + i));
- }
-
- /**
- * Reads the current value of the row/column, adds l, sets the sum. If the row/column does not
- * have a current value, then it defaults to zero.
- *
- * @param l Long increment amount
- * @throws AlreadySetException if value was previously set in transaction
- */
- public void increment(long l) throws AlreadySetException {
- checkNotSet();
- Bytes val = tx.get(data.row, data.getCol());
- long v = 0;
- if (val != null) {
- v = encoder.decodeLong(val);
- }
- tx.set(data.row, data.getCol(), encoder.encode(v + l));
- }
-
- public void delete() throws AlreadySetException {
- checkNotSet();
- tx.delete(data.row, data.getCol());
- set = true;
- }
-
- public void weaklyNotify() {
- checkNotSet();
- tx.setWeakNotification(data.row, data.getCol());
- set = true;
- }
-
- }
-
- /**
- * @since 1.0.0
- */
- public class VisibilityMutator extends Mutator {
-
- VisibilityMutator(Data data) {
- super(data);
- }
-
- public Mutator vis(String cv) {
- checkNotSet();
- data.vis = Bytes.of(cv);
- return new Mutator(data);
- }
-
- public Mutator vis(Bytes cv) {
- checkNotSet();
- data.vis = cv;
- return new Mutator(data);
- }
-
- public Mutator vis(byte[] cv) {
- checkNotSet();
- data.vis = Bytes.of(cv);
- return new Mutator(data);
- }
-
- public Mutator vis(ByteBuffer cv) {
- checkNotSet();
- data.vis = Bytes.of(cv);
- return new Mutator(data);
- }
- }
-
- /**
- * @since 1.0.0
- */
- public class MutatorQualifierMethods extends QualifierMethods<VisibilityMutator> {
-
- MutatorQualifierMethods(Data data) {
- tl.super(data);
- }
-
- @Override
- VisibilityMutator create(Data data) {
- return new VisibilityMutator(data);
- }
- }
-
- /**
- * @since 1.0.0
- */
- public class MutatorFamilyMethods extends FamilyMethods<MutatorQualifierMethods, Mutator> {
-
- MutatorFamilyMethods(Data data) {
- tl.super(data);
- }
-
- @Override
- MutatorQualifierMethods create1(Data data) {
- return new MutatorQualifierMethods(data);
- }
-
- @Override
- Mutator create2(Data data) {
- return new Mutator(data);
- }
- }
-
- /**
- * @since 1.0.0
- */
- public class MutatorRowMethods extends RowMethods<MutatorFamilyMethods> {
-
- MutatorRowMethods() {
- tl.super();
- }
-
- @Override
- MutatorFamilyMethods create(Data data) {
- return new MutatorFamilyMethods(data);
- }
-
- }
-
- @VisibleForTesting
- protected TypedTransactionBase(TransactionBase tx, Encoder encoder, TypeLayer tl) {
- super(tx, encoder, tl);
- this.tx = tx;
- this.encoder = encoder;
- this.tl = tl;
- }
-
- public MutatorRowMethods mutate() {
- return new MutatorRowMethods();
- }
-
- @Override
- public void set(Bytes row, Column col, Bytes value) throws AlreadySetException {
- tx.set(row, col, value);
- }
-
- @Override
- public void set(String row, Column col, String value) throws AlreadySetException {
- tx.set(row, col, value);
- }
-
- @Override
- public void setWeakNotification(Bytes row, Column col) {
- tx.setWeakNotification(row, col);
- }
-
- @Override
- public void setWeakNotification(String row, Column col) {
- tx.setWeakNotification(row, col);
- }
-
- @Override
- public void delete(Bytes row, Column col) throws AlreadySetException {
- tx.delete(row, col);
- }
-
- @Override
- public void delete(String row, Column col) {
- tx.delete(row, col);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/e84b4f1e/modules/core/src/test/java/org/apache/fluo/core/types/MockSnapshot.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/core/types/MockSnapshot.java b/modules/core/src/test/java/org/apache/fluo/core/types/MockSnapshot.java
deleted file mode 100644
index 6dfb9c4..0000000
--- a/modules/core/src/test/java/org/apache/fluo/core/types/MockSnapshot.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.fluo.core.types;
-
-import org.apache.fluo.api.client.Snapshot;
-
-public class MockSnapshot extends MockSnapshotBase implements Snapshot {
-
- MockSnapshot(String... entries) {
- super(entries);
- }
-
- @Override
- public void close() {
- // no resources need to be closed
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/e84b4f1e/modules/core/src/test/java/org/apache/fluo/core/types/MockSnapshotBase.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/core/types/MockSnapshotBase.java b/modules/core/src/test/java/org/apache/fluo/core/types/MockSnapshotBase.java
deleted file mode 100644
index 4ca5329..0000000
--- a/modules/core/src/test/java/org/apache/fluo/core/types/MockSnapshotBase.java
+++ /dev/null
@@ -1,202 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.fluo.core.types;
-
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.fluo.api.client.SnapshotBase;
-import org.apache.fluo.api.config.ScannerConfiguration;
-import org.apache.fluo.api.data.Bytes;
-import org.apache.fluo.api.data.Column;
-import org.apache.fluo.api.data.RowColumn;
-import org.apache.fluo.api.iterator.RowIterator;
-import org.apache.fluo.core.impl.TxStringUtil;
-
-public class MockSnapshotBase implements SnapshotBase {
-
- final Map<Bytes, Map<Column, Bytes>> getData;
-
- /**
- * Initializes {@link #getData} using {@link #toRCVM(String...)}
- */
- MockSnapshotBase(String... entries) {
- getData = toRCVM(entries);
- }
-
- @Override
- public Bytes get(Bytes row, Column column) {
- Map<Column, Bytes> cols = getData.get(row);
- if (cols != null) {
- return cols.get(column);
- }
-
- return null;
- }
-
- @Override
- public Map<Column, Bytes> get(Bytes row, Set<Column> columns) {
- Map<Column, Bytes> ret = new HashMap<>();
- Map<Column, Bytes> cols = getData.get(row);
- if (cols != null) {
- for (Column column : columns) {
- Bytes val = cols.get(column);
- if (val != null) {
- ret.put(column, val);
- }
- }
- }
- return ret;
- }
-
- @Override
- public Map<Bytes, Map<Column, Bytes>> get(Collection<Bytes> rows, Set<Column> columns) {
-
- Map<Bytes, Map<Column, Bytes>> ret = new HashMap<>();
-
- for (Bytes row : rows) {
- Map<Column, Bytes> colMap = get(row, columns);
- if (colMap != null && colMap.size() > 0) {
- ret.put(row, colMap);
- }
- }
-
- return ret;
- }
-
- @Override
- public RowIterator get(ScannerConfiguration config) {
- throw new UnsupportedOperationException();
- }
-
- /**
- * toRCVM stands for "To Row Column Value Map". This is a convenience function that takes strings
- * of the format {@code <row>,<col fam>:<col qual>[:col vis],
- * <value>} and generates a row, column, value map.
- */
- public static Map<Bytes, Map<Column, Bytes>> toRCVM(String... entries) {
- Map<Bytes, Map<Column, Bytes>> ret = new HashMap<>();
-
- for (String entry : entries) {
- String[] rcv = entry.split(",");
- if (rcv.length != 3 && !(rcv.length == 2 && entry.trim().endsWith(","))) {
- throw new IllegalArgumentException(
- "expected <row>,<col fam>:<col qual>[:col vis],<value> but saw : " + entry);
- }
-
- Bytes row = Bytes.of(rcv[0]);
- String[] colFields = rcv[1].split(":");
-
- Column col;
- if (colFields.length == 3) {
- col = new Column(colFields[0], colFields[1], colFields[2]);
- } else if (colFields.length == 2) {
- col = new Column(colFields[0], colFields[1]);
- } else {
- throw new IllegalArgumentException(
- "expected <row>,<col fam>:<col qual>[:col vis],<value> but saw : " + entry);
- }
-
- Bytes val;
- if (rcv.length == 2) {
- val = Bytes.EMPTY;
- } else {
- val = Bytes.of(rcv[2]);
- }
-
- Map<Column, Bytes> cols = ret.get(row);
- if (cols == null) {
- cols = new HashMap<>();
- ret.put(row, cols);
- }
-
- cols.put(col, val);
- }
- return ret;
- }
-
- /**
- * toRCM stands for "To Row Column Map". This is a convenience function that takes strings of the
- * format {@code <row>,<col fam>:<col qual>[:col vis]} and generates a row, column map.
- */
- public static Map<Bytes, Set<Column>> toRCM(String... entries) {
- Map<Bytes, Set<Column>> ret = new HashMap<>();
-
- for (String entry : entries) {
- String[] rcv = entry.split(",");
- if (rcv.length != 2) {
- throw new IllegalArgumentException(
- "expected <row>,<col fam>:<col qual>[:col vis] but saw : " + entry);
- }
-
- Bytes row = Bytes.of(rcv[0]);
- String[] colFields = rcv[1].split(":");
-
- Column col;
- if (colFields.length == 3) {
- col = new Column(colFields[0], colFields[1], colFields[2]);
- } else if (colFields.length == 2) {
- col = new Column(colFields[0], colFields[1]);
- } else {
- throw new IllegalArgumentException(
- "expected <row>,<col fam>:<col qual>[:col vis],<value> but saw : " + entry);
- }
-
- Set<Column> cols = ret.get(row);
- if (cols == null) {
- cols = new HashSet<>();
- ret.put(row, cols);
- }
-
- cols.add(col);
- }
- return ret;
- }
-
- @Override
- public long getStartTimestamp() {
- throw new UnsupportedOperationException();
- }
-
-
- @Override
- public String gets(String row, Column column) {
- return TxStringUtil.gets(this, row, column);
- }
-
- @Override
- public Map<Column, String> gets(String row, Set<Column> columns) {
- return TxStringUtil.gets(this, row, columns);
- }
-
- @Override
- public Map<String, Map<Column, String>> gets(Collection<String> rows, Set<Column> columns) {
- return TxStringUtil.gets(this, rows, columns);
- }
-
- @Override
- public Map<String, Map<Column, String>> gets(Collection<RowColumn> rowColumns) {
- return TxStringUtil.gets(this, rowColumns);
- }
-
- @Override
- public Map<Bytes, Map<Column, Bytes>> get(Collection<RowColumn> rowColumns) {
- throw new UnsupportedOperationException();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/e84b4f1e/modules/core/src/test/java/org/apache/fluo/core/types/MockTransaction.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/core/types/MockTransaction.java b/modules/core/src/test/java/org/apache/fluo/core/types/MockTransaction.java
deleted file mode 100644
index 0d6779e..0000000
--- a/modules/core/src/test/java/org/apache/fluo/core/types/MockTransaction.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.fluo.core.types;
-
-import org.apache.fluo.api.client.Transaction;
-import org.apache.fluo.api.exceptions.CommitException;
-
-public class MockTransaction extends MockTransactionBase implements Transaction {
-
- MockTransaction(String... entries) {
- super(entries);
- }
-
- @Override
- public void commit() throws CommitException {
- // does nothing
- }
-
- @Override
- public void close() {
- // no resources to close
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/e84b4f1e/modules/core/src/test/java/org/apache/fluo/core/types/MockTransactionBase.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/core/types/MockTransactionBase.java b/modules/core/src/test/java/org/apache/fluo/core/types/MockTransactionBase.java
deleted file mode 100644
index af9351d..0000000
--- a/modules/core/src/test/java/org/apache/fluo/core/types/MockTransactionBase.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.fluo.core.types;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.fluo.api.client.TransactionBase;
-import org.apache.fluo.api.data.Bytes;
-import org.apache.fluo.api.data.Column;
-import org.apache.fluo.api.exceptions.AlreadySetException;
-
-/**
- * A very simple implementation of {@link TransactionBase} used for testing. All reads are serviced
- * from {@link #getData}. Updates are stored in {@link #setData}, {@link #deletes}, or
- * {@link #weakNotifications} depending on the update type.
- */
-public class MockTransactionBase extends MockSnapshotBase implements TransactionBase {
-
- final Map<Bytes, Map<Column, Bytes>> setData = new HashMap<>();
- final Map<Bytes, Set<Column>> deletes = new HashMap<>();
- final Map<Bytes, Set<Column>> weakNotifications = new HashMap<>();
-
- MockTransactionBase(String... entries) {
- super(entries);
- }
-
- @Override
- public void setWeakNotification(Bytes row, Column col) {
- Set<Column> cols = weakNotifications.get(row);
- if (cols == null) {
- cols = new HashSet<>();
- weakNotifications.put(row, cols);
- }
-
- cols.add(col);
- }
-
- @Override
- public void set(Bytes row, Column col, Bytes value) {
- Map<Column, Bytes> cols = setData.get(row);
- if (cols == null) {
- cols = new HashMap<>();
- setData.put(row, cols);
- }
-
- cols.put(col, value);
- }
-
- @Override
- public void delete(Bytes row, Column col) {
- Set<Column> cols = deletes.get(row);
- if (cols == null) {
- cols = new HashSet<>();
- deletes.put(row, cols);
- }
-
- cols.add(col);
- }
-
- @Override
- public void setWeakNotification(String row, Column col) {
- setWeakNotification(Bytes.of(row), col);
- }
-
- @Override
- public void set(String row, Column col, String value) throws AlreadySetException {
- set(Bytes.of(row), col, Bytes.of(value));
- }
-
- @Override
- public void delete(String row, Column col) {
- delete(Bytes.of(row), col);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/e84b4f1e/modules/core/src/test/java/org/apache/fluo/core/types/TypeLayerTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/fluo/core/types/TypeLayerTest.java b/modules/core/src/test/java/org/apache/fluo/core/types/TypeLayerTest.java
deleted file mode 100644
index 2f5f1db..0000000
--- a/modules/core/src/test/java/org/apache/fluo/core/types/TypeLayerTest.java
+++ /dev/null
@@ -1,498 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.fluo.core.types;
-
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.Map;
-
-import com.google.common.collect.ImmutableSet;
-import org.apache.fluo.api.data.Bytes;
-import org.apache.fluo.api.data.Column;
-import org.apache.fluo.api.types.StringEncoder;
-import org.apache.fluo.api.types.TypeLayer;
-import org.apache.fluo.api.types.TypedSnapshot;
-import org.apache.fluo.api.types.TypedSnapshotBase.Value;
-import org.apache.fluo.api.types.TypedTransactionBase;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class TypeLayerTest {
-
- @Test
- public void testColumns() throws Exception {
- TypeLayer tl = new TypeLayer(new StringEncoder());
-
- MockTransactionBase tt =
- new MockTransactionBase("r1,cf1:cq1,v1", "r1,cf1:cq2,v2", "r1,cf1:cq3,9", "r2,cf2:7,12",
- "r2,cf2:8,13", "13,9:17,20", "13,9:18,20", "13,9:19,20", "13,9:20,20");
-
- TypedTransactionBase ttx = tl.wrap(tt);
-
- Map<Column, Value> results =
- ttx.get().row("r2")
- .columns(ImmutableSet.of(new Column("cf2", "6"), new Column("cf2", "7")));
-
- Assert.assertNull(results.get(new Column("cf2", "6")).toInteger());
- Assert.assertEquals(0, results.get(new Column("cf2", "6")).toInteger(0));
- Assert.assertEquals(12, (int) results.get(new Column("cf2", "7")).toInteger());
- Assert.assertEquals(12, results.get(new Column("cf2", "7")).toInteger(0));
-
- Assert.assertEquals(1, results.size());
-
- results =
- ttx.get()
- .row("r2")
- .columns(
- ImmutableSet.of(new Column("cf2", "6"), new Column("cf2", "7"), new Column("cf2",
- "8")));
-
- Assert.assertNull(results.get(new Column("cf2", "6")).toInteger());
- Assert.assertEquals(0, results.get(new Column("cf2", "6")).toInteger(0));
- Assert.assertEquals(12, (int) results.get(new Column("cf2", "7")).toInteger());
- Assert.assertEquals(12, results.get(new Column("cf2", "7")).toInteger(0));
- Assert.assertEquals(13, (int) results.get(new Column("cf2", "8")).toInteger());
- Assert.assertEquals(13, results.get(new Column("cf2", "8")).toInteger(0));
-
- Assert.assertEquals(2, results.size());
-
- // test var args
- Map<Column, Value> results2 =
- ttx.get().row("r2")
- .columns(new Column("cf2", "6"), new Column("cf2", "7"), new Column("cf2", "8"));
- Assert.assertEquals(results, results2);
- }
-
- @Test
- public void testVis() throws Exception {
- TypeLayer tl = new TypeLayer(new StringEncoder());
-
- MockTransactionBase tt = new MockTransactionBase("r1,cf1:cq1:A,v1", "r1,cf1:cq2:A&B,v2");
-
- TypedTransactionBase ttx = tl.wrap(tt);
-
- Assert.assertNull(ttx.get().row("r1").fam("cf1").qual("cq1").toString());
- Assert.assertEquals("v1", ttx.get().row("r1").fam("cf1").qual("cq1").vis("A").toString());
- Assert.assertEquals("v1", ttx.get().row("r1").fam("cf1").qual("cq1").vis("A".getBytes())
- .toString());
- Assert.assertEquals("v1", ttx.get().row("r1").fam("cf1").qual("cq1").vis(Bytes.of("A"))
- .toString());
- Assert.assertEquals("v1",
- ttx.get().row("r1").fam("cf1").qual("cq1").vis(ByteBuffer.wrap("A".getBytes())).toString());
-
- Assert.assertNull("v1", ttx.get().row("r1").fam("cf1").qual("cq1").vis("A&B").toString());
- Assert.assertNull("v1", ttx.get().row("r1").fam("cf1").qual("cq1").vis("A&B".getBytes())
- .toString());
- Assert.assertNull("v1", ttx.get().row("r1").fam("cf1").qual("cq1").vis(Bytes.of("A&B"))
- .toString());
- Assert.assertNull("v1",
- ttx.get().row("r1").fam("cf1").qual("cq1").vis(ByteBuffer.wrap("A&B".getBytes()))
- .toString());
-
- Assert.assertEquals("v3", ttx.get().row("r1").fam("cf1").qual("cq1").vis("A&B").toString("v3"));
- Assert.assertEquals("v3", ttx.get().row("r1").fam("cf1").qual("cq1").vis("A&B".getBytes())
- .toString("v3"));
- Assert.assertEquals("v3", ttx.get().row("r1").fam("cf1").qual("cq1").vis(Bytes.of("A&B"))
- .toString("v3"));
- Assert.assertEquals(
- "v3",
- ttx.get().row("r1").fam("cf1").qual("cq1").vis(ByteBuffer.wrap("A&B".getBytes()))
- .toString("v3"));
-
- ttx.mutate().row("r1").fam("cf1").qual("cq1").vis("A&B").set(3);
- ttx.mutate().row("r1").fam("cf1").qual("cq1").vis("A&C".getBytes()).set(4);
- ttx.mutate().row("r1").fam("cf1").qual("cq1").vis(Bytes.of("A&D")).set(5);
- ttx.mutate().row("r1").fam("cf1").qual("cq1").vis(ByteBuffer.wrap("A&F".getBytes())).set(7);
-
- Assert.assertEquals(MockTransactionBase.toRCVM("r1,cf1:cq1:A&B,3", "r1,cf1:cq1:A&C,4",
- "r1,cf1:cq1:A&D,5", "r1,cf1:cq1:A&F,7"), tt.setData);
- tt.setData.clear();
-
- ttx.mutate().row("r1").fam("cf1").qual("cq1").vis("A&B").delete();
- ttx.mutate().row("r1").fam("cf1").qual("cq1").vis("A&C".getBytes()).delete();
- ttx.mutate().row("r1").fam("cf1").qual("cq1").vis(Bytes.of("A&D")).delete();
- ttx.mutate().row("r1").fam("cf1").qual("cq1").vis(ByteBuffer.wrap("A&F".getBytes())).delete();
-
- Assert.assertEquals(MockTransactionBase.toRCM("r1,cf1:cq1:A&B", "r1,cf1:cq1:A&C",
- "r1,cf1:cq1:A&D", "r1,cf1:cq1:A&F"), tt.deletes);
- tt.deletes.clear();
- Assert.assertEquals(0, tt.setData.size());
- Assert.assertEquals(0, tt.weakNotifications.size());
-
- }
-
- @Test
- public void testBuildColumn() {
- TypeLayer tl = new TypeLayer(new StringEncoder());
-
- Assert.assertEquals(new Column("f0", "q0"), tl.bc().fam("f0".getBytes()).qual("q0".getBytes())
- .vis());
- Assert.assertEquals(new Column("f0", "q0"), tl.bc().fam("f0").qual("q0").vis());
- Assert.assertEquals(new Column("5", "7"), tl.bc().fam(5).qual(7).vis());
- Assert.assertEquals(new Column("5", "7"), tl.bc().fam(5l).qual(7l).vis());
- Assert.assertEquals(new Column("5", "7"), tl.bc().fam(Bytes.of("5")).qual(Bytes.of("7")).vis());
- Assert.assertEquals(new Column("5", "7"),
- tl.bc().fam(ByteBuffer.wrap("5".getBytes())).qual(ByteBuffer.wrap("7".getBytes())).vis());
-
- Assert.assertEquals(new Column("f0", "q0", "A&B"),
- tl.bc().fam("f0".getBytes()).qual("q0".getBytes()).vis("A&B"));
- Assert.assertEquals(new Column("f0", "q0", "A&C"),
- tl.bc().fam("f0").qual("q0").vis("A&C".getBytes()));
- Assert.assertEquals(new Column("5", "7", "A&D"), tl.bc().fam(5).qual(7).vis(Bytes.of("A&D")));
- Assert.assertEquals(new Column("5", "7", "A&D"),
- tl.bc().fam(5).qual(7).vis(ByteBuffer.wrap("A&D".getBytes())));
- }
-
- @Test
- public void testRead() throws Exception {
- TypeLayer tl = new TypeLayer(new StringEncoder());
-
- MockSnapshot ms =
- new MockSnapshot("r1,cf1:cq1,v1", "r1,cf1:cq2,v2", "r1,cf1:cq3,9", "r2,cf2:7,12",
- "r2,cf2:8,13", "13,9:17,20", "13,9:18,20", "13,9:19,20", "13,9:20,20",
- "r3,cf3:cq3,28.195", "r4,cf4:cq4,true");
-
- TypedSnapshot tts = tl.wrap(ms);
-
- Assert.assertEquals("v1", tts.get().row("r1").fam("cf1").qual("cq1").toString());
- Assert.assertEquals("v1", tts.get().row("r1").fam("cf1").qual("cq1").toString("b"));
- Assert.assertEquals("13", tts.get().row("r2").fam("cf2").qual("8").toString());
- Assert.assertEquals("13", tts.get().row("r2").fam("cf2").qual("8").toString("b"));
- Assert.assertEquals("28.195", tts.get().row("r3").fam("cf3").qual("cq3").toString());
- Assert.assertEquals("28.195", tts.get().row("r3").fam("cf3").qual("cq3").toString("b"));
- Assert.assertEquals("true", tts.get().row("r4").fam("cf4").qual("cq4").toString());
- Assert.assertEquals("true", tts.get().row("r4").fam("cf4").qual("cq4").toString("b"));
-
- // try converting to different types
- Assert.assertEquals("13", tts.get().row("r2").fam("cf2").qual(8).toString());
- Assert.assertEquals("13", tts.get().row("r2").fam("cf2").qual(8).toString("b"));
- Assert.assertEquals((Integer) 13, tts.get().row("r2").fam("cf2").qual(8).toInteger());
- Assert.assertEquals(13, tts.get().row("r2").fam("cf2").qual(8).toInteger(14));
- Assert.assertEquals((Long) 13l, tts.get().row("r2").fam("cf2").qual(8).toLong());
- Assert.assertEquals(13l, tts.get().row("r2").fam("cf2").qual(8).toLong(14l));
- Assert.assertEquals("13", new String(tts.get().row("r2").fam("cf2").qual(8).toBytes()));
- Assert.assertEquals("13",
- new String(tts.get().row("r2").fam("cf2").qual(8).toBytes("14".getBytes())));
- Assert
- .assertEquals("13", new String(tts.get().row("r2").col(new Column("cf2", "8")).toBytes()));
- Assert.assertEquals("13",
- new String(tts.get().row("r2").col(new Column("cf2", "8")).toBytes("14".getBytes())));
- Assert.assertEquals("13",
- Bytes.of(tts.get().row("r2").col(new Column("cf2", "8")).toByteBuffer()).toString());
- Assert.assertEquals(
- "13",
- Bytes.of(
- tts.get().row("r2").col(new Column("cf2", "8"))
- .toByteBuffer(ByteBuffer.wrap("14".getBytes()))).toString());
-
- // test non-existent
- Assert.assertNull(tts.get().row("r2").fam("cf3").qual(8).toInteger());
- Assert.assertEquals(14, tts.get().row("r2").fam("cf3").qual(8).toInteger(14));
- Assert.assertNull(tts.get().row("r2").fam("cf3").qual(8).toLong());
- Assert.assertEquals(14l, tts.get().row("r2").fam("cf3").qual(8).toLong(14l));
- Assert.assertNull(tts.get().row("r2").fam("cf3").qual(8).toString());
- Assert.assertEquals("14", tts.get().row("r2").fam("cf3").qual(8).toString("14"));
- Assert.assertNull(tts.get().row("r2").fam("cf3").qual(8).toBytes());
- Assert.assertEquals("14",
- new String(tts.get().row("r2").fam("cf3").qual(8).toBytes("14".getBytes())));
- Assert.assertNull(tts.get().row("r2").col(new Column("cf3", "8")).toBytes());
- Assert.assertEquals("14",
- new String(tts.get().row("r2").col(new Column("cf3", "8")).toBytes("14".getBytes())));
- Assert.assertNull(tts.get().row("r2").col(new Column("cf3", "8")).toByteBuffer());
- Assert.assertEquals(
- "14",
- Bytes.of(
- tts.get().row("r2").col(new Column("cf3", "8"))
- .toByteBuffer(ByteBuffer.wrap("14".getBytes()))).toString());
-
- // test float & double
- Assert.assertEquals((Float) 28.195f, tts.get().row("r3").fam("cf3").qual("cq3").toFloat());
- Assert.assertEquals(28.195f, tts.get().row("r3").fam("cf3").qual("cq3").toFloat(39.383f), 0.0);
- Assert.assertEquals((Double) 28.195d, tts.get().row("r3").fam("cf3").qual("cq3").toDouble());
- Assert.assertEquals(28.195d, tts.get().row("r3").fam("cf3").qual("cq3").toDouble(39.383d), 0.0);
-
- // test boolean
- Assert.assertEquals(true, tts.get().row("r4").fam("cf4").qual("cq4").toBoolean());
- Assert.assertEquals(true, tts.get().row("r4").fam("cf4").qual("cq4").toBoolean());
- Assert.assertEquals(true, tts.get().row("r4").fam("cf4").qual("cq4").toBoolean(false));
- Assert.assertEquals(true, tts.get().row("r4").fam("cf4").qual("cq4").toBoolean(false));
-
- // try different types for row
- Assert.assertEquals("20", tts.get().row(13).fam("9").qual("17").toString());
- Assert.assertEquals("20", tts.get().row(13l).fam("9").qual("17").toString());
- Assert.assertEquals("20", tts.get().row("13").fam("9").qual("17").toString());
- Assert.assertEquals("20", tts.get().row("13".getBytes()).fam("9").qual("17").toString());
- Assert.assertEquals("20", tts.get().row(ByteBuffer.wrap("13".getBytes())).fam("9").qual("17")
- .toString());
-
- // try different types for cf
- Assert.assertEquals("20", tts.get().row("13").fam(9).qual("17").toString());
- Assert.assertEquals("20", tts.get().row("13").fam(9l).qual("17").toString());
- Assert.assertEquals("20", tts.get().row("13").fam("9").qual("17").toString());
- Assert.assertEquals("20", tts.get().row("13").fam("9".getBytes()).qual("17").toString());
- Assert.assertEquals("20", tts.get().row("13").fam(ByteBuffer.wrap("9".getBytes())).qual("17")
- .toString());
-
- // try different types for cq
- Assert.assertEquals("20", tts.get().row("13").fam("9").qual("17").toString());
- Assert.assertEquals("20", tts.get().row("13").fam("9").qual(17l).toString());
- Assert.assertEquals("20", tts.get().row("13").fam("9").qual(17).toString());
- Assert.assertEquals("20", tts.get().row("13").fam("9").qual("17".getBytes()).toString());
- Assert.assertEquals("20", tts.get().row("13").fam("9").qual(ByteBuffer.wrap("17".getBytes()))
- .toString());
-
- ms.close();
- tts.close();
- }
-
- @Test
- public void testWrite() throws Exception {
-
- TypeLayer tl = new TypeLayer(new StringEncoder());
-
- MockTransactionBase tt =
- new MockTransactionBase("r1,cf1:cq1,v1", "r1,cf1:cq2,v2", "r1,cf1:cq3,9", "r2,cf2:7,12",
- "r2,cf2:8,13", "13,9:17,20", "13,9:18,20", "13,9:19,20", "13,9:20,20");
-
- TypedTransactionBase ttx = tl.wrap(tt);
-
- // test increments data
- ttx.mutate().row("13").fam("9").qual("17").increment(1);
- ttx.mutate().row("13").fam("9").qual(18).increment(2);
- ttx.mutate().row("13").fam("9").qual(19l).increment(3);
- ttx.mutate().row("13").fam("9").qual("20".getBytes()).increment(4);
- ttx.mutate().row("13").fam("9").qual(Bytes.of("21")).increment(5); // increment non existent
- ttx.mutate().row("13").col(new Column("9", "22")).increment(6); // increment non existent
- ttx.mutate().row("13").fam("9").qual(ByteBuffer.wrap("23".getBytes())).increment(7); // increment
- // non
- // existent
-
- Assert.assertEquals(MockTransactionBase.toRCVM("13,9:17,21", "13,9:18,22", "13,9:19,23",
- "13,9:20,24", "13,9:21,5", "13,9:22,6", "13,9:23,7"), tt.setData);
- tt.setData.clear();
-
- // test increments long
- ttx.mutate().row("13").fam("9").qual("17").increment(1l);
- ttx.mutate().row("13").fam("9").qual(18).increment(2l);
- ttx.mutate().row("13").fam("9").qual(19l).increment(3l);
- ttx.mutate().row("13").fam("9").qual("20".getBytes()).increment(4l);
- ttx.mutate().row("13").fam("9").qual(Bytes.of("21")).increment(5l); // increment non existent
- ttx.mutate().row("13").col(new Column("9", "22")).increment(6l); // increment non existent
- ttx.mutate().row("13").fam("9").qual(ByteBuffer.wrap("23".getBytes())).increment(7l); // increment
- // non
- // existent
-
- Assert.assertEquals(MockTransactionBase.toRCVM("13,9:17,21", "13,9:18,22", "13,9:19,23",
- "13,9:20,24", "13,9:21,5", "13,9:22,6", "13,9:23,7"), tt.setData);
- tt.setData.clear();
-
- // test setting data
- ttx.mutate().row("13").fam("9").qual("16").set();
- ttx.mutate().row("13").fam("9").qual("17").set(3);
- ttx.mutate().row("13").fam("9").qual(18).set(4l);
- ttx.mutate().row("13").fam("9").qual(19l).set("5");
- ttx.mutate().row("13").fam("9").qual("20".getBytes()).set("6".getBytes());
- ttx.mutate().row("13").col(new Column("9", "21")).set("7".getBytes());
- ttx.mutate().row("13").fam("9").qual(ByteBuffer.wrap("22".getBytes()))
- .set(ByteBuffer.wrap("8".getBytes()));
- ttx.mutate().row("13").fam("9").qual("23").set(2.54f);
- ttx.mutate().row("13").fam("9").qual("24").set(-6.135d);
- ttx.mutate().row("13").fam("9").qual("25").set(false);
-
- Assert.assertEquals(MockTransactionBase.toRCVM("13,9:16,", "13,9:17,3", "13,9:18,4",
- "13,9:19,5", "13,9:20,6", "13,9:21,7", "13,9:22,8", "13,9:23,2.54", "13,9:24,-6.135",
- "13,9:25,false"), tt.setData);
- tt.setData.clear();
-
- // test deleting data
- ttx.mutate().row("13").fam("9").qual("17").delete();
- ttx.mutate().row("13").fam("9").qual(18).delete();
- ttx.mutate().row("13").fam("9").qual(19l).delete();
- ttx.mutate().row("13").fam("9").qual("20".getBytes()).delete();
- ttx.mutate().row("13").col(new Column("9", "21")).delete();
- ttx.mutate().row("13").fam("9").qual(ByteBuffer.wrap("22".getBytes())).delete();
-
- Assert
- .assertEquals(MockTransactionBase.toRCM("13,9:17", "13,9:18", "13,9:19", "13,9:20",
- "13,9:21", "13,9:22"), tt.deletes);
- tt.deletes.clear();
- Assert.assertEquals(0, tt.setData.size());
- Assert.assertEquals(0, tt.weakNotifications.size());
-
- // test weak notifications
- ttx.mutate().row("13").fam("9").qual("17").weaklyNotify();
- ttx.mutate().row("13").fam("9").qual(18).weaklyNotify();
- ttx.mutate().row("13").fam("9").qual(19l).weaklyNotify();
- ttx.mutate().row("13").fam("9").qual("20".getBytes()).weaklyNotify();
- ttx.mutate().row("13").col(new Column("9", "21")).weaklyNotify();
- ttx.mutate().row("13").fam("9").qual(ByteBuffer.wrap("22".getBytes())).weaklyNotify();
-
- Assert
- .assertEquals(MockTransactionBase.toRCM("13,9:17", "13,9:18", "13,9:19", "13,9:20",
- "13,9:21", "13,9:22"), tt.weakNotifications);
- tt.weakNotifications.clear();
- Assert.assertEquals(0, tt.setData.size());
- Assert.assertEquals(0, tt.deletes.size());
- }
-
- @Test
- public void testMultiRow() throws Exception {
- TypeLayer tl = new TypeLayer(new StringEncoder());
-
- MockTransactionBase tt =
- new MockTransactionBase("11,cf1:cq1,1", "11,cf1:cq2,2", "12,cf1:cq1,3", "12,cf1:cq2,4",
- "13,cf1:cq1,5", "13,cf1:cq2,6");
-
- TypedTransactionBase ttx = tl.wrap(tt);
-
- Bytes br1 = Bytes.of("11");
- Bytes br2 = Bytes.of("12");
- Bytes br3 = Bytes.of("13");
-
- Column c1 = new Column("cf1", "cq1");
- Column c2 = new Column("cf1", "cq2");
-
- Map<Bytes, Map<Column, Value>> map1 =
- ttx.get().rows(Arrays.asList(br1, br2)).columns(c1).toBytesMap();
-
- Assert.assertEquals(map1, ttx.get().rows(br1, br2).columns(c1).toBytesMap());
-
- Assert.assertEquals("1", map1.get(br1).get(c1).toString());
- Assert.assertEquals("1", map1.get(br1).get(c1).toString("5"));
- Assert.assertEquals((Long) (1l), map1.get(br1).get(c1).toLong());
- Assert.assertEquals(1l, map1.get(br1).get(c1).toLong(5));
- Assert.assertEquals((Integer) (1), map1.get(br1).get(c1).toInteger());
- Assert.assertEquals(1, map1.get(br1).get(c1).toInteger(5));
-
- Assert.assertEquals("5", map1.get(br3).get(c1).toString("5"));
- Assert.assertNull(map1.get(br3).get(c1).toString());
- Assert.assertEquals(5l, map1.get(br3).get(c1).toLong(5l));
- Assert.assertNull(map1.get(br3).get(c1).toLong());
- Assert.assertEquals(5, map1.get(br1).get(c2).toInteger(5));
- Assert.assertNull(map1.get(br1).get(c2).toInteger());
-
- Assert.assertEquals(2, map1.size());
- Assert.assertEquals(1, map1.get(br1).size());
- Assert.assertEquals(1, map1.get(br2).size());
- Assert.assertEquals("3", map1.get(br2).get(c1).toString());
-
- Map<String, Map<Column, Value>> map2 =
- ttx.get().rowsString(Arrays.asList("11", "13")).columns(c1).toStringMap();
-
- Assert.assertEquals(map2, ttx.get().rowsString("11", "13").columns(c1).toStringMap());
-
- Assert.assertEquals(2, map2.size());
- Assert.assertEquals(1, map2.get("11").size());
- Assert.assertEquals(1, map2.get("13").size());
- Assert.assertEquals((Long) (1l), map2.get("11").get(c1).toLong());
- Assert.assertEquals(5l, map2.get("13").get(c1).toLong(6));
-
- Map<Long, Map<Column, Value>> map3 =
- ttx.get().rowsLong(Arrays.asList(11l, 13l)).columns(c1).toLongMap();
-
- Assert.assertEquals(map3, ttx.get().rowsLong(11l, 13l).columns(c1).toLongMap());
-
- Assert.assertEquals(2, map3.size());
- Assert.assertEquals(1, map3.get(11l).size());
- Assert.assertEquals(1, map3.get(13l).size());
- Assert.assertEquals((Long) (1l), map3.get(11l).get(c1).toLong());
- Assert.assertEquals(5l, map3.get(13l).get(c1).toLong(6));
-
- Map<Integer, Map<Column, Value>> map4 =
- ttx.get().rowsInteger(Arrays.asList(11, 13)).columns(c1).toIntegerMap();
-
- Assert.assertEquals(map4, ttx.get().rowsInteger(11, 13).columns(c1).toIntegerMap());
-
- Assert.assertEquals(2, map4.size());
- Assert.assertEquals(1, map4.get(11).size());
- Assert.assertEquals(1, map4.get(13).size());
- Assert.assertEquals((Long) (1l), map4.get(11).get(c1).toLong());
- Assert.assertEquals(5l, map4.get(13).get(c1).toLong(6));
-
- Map<Integer, Map<Column, Value>> map5 =
- ttx.get().rowsBytes(Arrays.asList("11".getBytes(), "13".getBytes())).columns(c1)
- .toIntegerMap();
-
- Assert.assertEquals(map5, ttx.get().rowsBytes("11".getBytes(), "13".getBytes()).columns(c1)
- .toIntegerMap());
-
- Assert.assertEquals(2, map5.size());
- Assert.assertEquals(1, map5.get(11).size());
- Assert.assertEquals(1, map5.get(13).size());
- Assert.assertEquals((Long) (1l), map5.get(11).get(c1).toLong());
- Assert.assertEquals(5l, map5.get(13).get(c1).toLong(6));
-
- Map<Integer, Map<Column, Value>> map6 =
- ttx.get()
- .rowsByteBuffers(
- Arrays.asList(ByteBuffer.wrap("11".getBytes()), ByteBuffer.wrap("13".getBytes())))
- .columns(c1).toIntegerMap();
-
- Assert.assertEquals(
- map6,
- ttx.get()
- .rowsByteBuffers(ByteBuffer.wrap("11".getBytes()), ByteBuffer.wrap("13".getBytes()))
- .columns(c1).toIntegerMap());
-
- Assert.assertEquals(2, map6.size());
- Assert.assertEquals(1, map6.get(11).size());
- Assert.assertEquals(1, map6.get(13).size());
- Assert.assertEquals((Long) (1l), map6.get(11).get(c1).toLong());
- Assert.assertEquals(5l, map6.get(13).get(c1).toLong(6));
-
- }
-
- @Test
- public void testBasic() throws Exception {
- TypeLayer tl = new TypeLayer(new StringEncoder());
-
- MockTransactionBase tt =
- new MockTransactionBase("r1,cf1:cq1,v1", "r1,cf1:cq2,v2", "r1,cf1:cq3,9", "r2,cf2:7,12",
- "r2,cf2:8,13", "13,9:17,20", "13,9:18,20", "13,9:19,20", "13,9:20,20");
-
- TypedTransactionBase ttx = tl.wrap(tt);
-
- Assert.assertEquals(Bytes.of("12"), ttx.get(Bytes.of("r2"), new Column("cf2", "7")));
- Assert.assertNull(ttx.get(Bytes.of("r2"), new Column("cf2", "9")));
-
- Map<Column, Bytes> map =
- ttx.get(Bytes.of("r2"), ImmutableSet.of(new Column("cf2", "7"), new Column("cf2", "8")));
- Assert.assertEquals(2, map.size());
- Assert.assertEquals("12", map.get(new Column("cf2", "7")).toString());
- Assert.assertEquals("13", map.get(new Column("cf2", "8")).toString());
-
- map = ttx.get(Bytes.of("r6"), ImmutableSet.of(new Column("cf2", "7"), new Column("cf2", "8")));
- Assert.assertEquals(0, map.size());
-
- ttx.set(Bytes.of("r6"), new Column("cf2", "7"), Bytes.of("3"));
- Assert.assertEquals(MockTransactionBase.toRCVM("r6,cf2:7,3"), tt.setData);
- tt.setData.clear();
-
- Map<Bytes, Map<Column, Bytes>> map2 =
- ttx.get(ImmutableSet.of(Bytes.of("r1"), Bytes.of("r2")),
- ImmutableSet.of(new Column("cf1", "cq1"), new Column("cf2", "8")));
- Assert.assertEquals(MockTransactionBase.toRCVM("r1,cf1:cq1,v1", "r2,cf2:8,13"), map2);
-
- ttx.delete(Bytes.of("r6"), new Column("cf2", "7"));
- Assert.assertEquals(MockTransactionBase.toRCM("r6,cf2:7"), tt.deletes);
- tt.deletes.clear();
-
- ttx.setWeakNotification(Bytes.of("r6"), new Column("cf2", "8"));
- Assert.assertEquals(MockTransactionBase.toRCM("r6,cf2:8"), tt.weakNotifications);
- tt.weakNotifications.clear();
-
- }
-}
[2/3] incubator-fluo git commit: #696 - Updated integration tests to
use core API instead of type layer
Posted by mw...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/b2c91b95/modules/integration/src/test/java/org/apache/fluo/integration/impl/ParallelScannerIT.java
----------------------------------------------------------------------
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/impl/ParallelScannerIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/impl/ParallelScannerIT.java
index 6264375..554a762 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/impl/ParallelScannerIT.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/impl/ParallelScannerIT.java
@@ -16,14 +16,13 @@
package org.apache.fluo.integration.impl;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Map;
import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
import org.apache.fluo.api.data.Column;
import org.apache.fluo.api.data.RowColumn;
-import org.apache.fluo.api.types.StringEncoder;
-import org.apache.fluo.api.types.TypeLayer;
-import org.apache.fluo.api.types.TypedSnapshotBase.Value;
import org.apache.fluo.core.impl.TransactionImpl.CommitData;
import org.apache.fluo.core.impl.TransactorNode;
import org.apache.fluo.core.oracle.Stamp;
@@ -33,7 +32,6 @@ import org.junit.Assert;
import org.junit.Test;
public class ParallelScannerIT extends ITBaseImpl {
- static TypeLayer typeLayer = new TypeLayer(new StringEncoder());
@Test
public void testRowColumn() {
@@ -76,21 +74,21 @@ public class ParallelScannerIT extends ITBaseImpl {
// parallel scan
TestTransaction tx1 = new TestTransaction(env);
- tx1.mutate().row("bob9").fam("vote").qual("election1").set("N");
- tx1.mutate().row("bob9").fam("vote").qual("election2").set("Y");
+ tx1.set("bob9", new Column("vote", "election1"), "N");
+ tx1.set("bob9", new Column("vote", "election2"), "Y");
- tx1.mutate().row("joe3").fam("vote").qual("election1").set("nay");
- tx1.mutate().row("joe3").fam("vote").qual("election2").set("nay");
+ tx1.set("joe3", new Column("vote", "election1"), "nay");
+ tx1.set("joe3", new Column("vote", "election2"), "nay");
tx1.done();
final TestTransaction tx2 = new TestTransaction(env);
- tx2.mutate().row("sue4").fam("vote").qual("election1").set("+1");
- tx2.mutate().row("sue4").fam("vote").qual("election2").set("-1");
+ tx2.set("sue4", new Column("vote", "election1"), "+1");
+ tx2.set("sue4", new Column("vote", "election2"), "-1");
- tx2.mutate().row("eve2").fam("vote").qual("election1").set("no");
- tx2.mutate().row("eve2").fam("vote").qual("election2").set("no");
+ tx2.set("eve2", new Column("vote", "election1"), "no");
+ tx2.set("eve2", new Column("vote", "election2"), "no");
final CommitData cd2 = tx2.createCommitData();
Assert.assertTrue(tx2.preCommit(cd2));
@@ -116,17 +114,17 @@ public class ParallelScannerIT extends ITBaseImpl {
TestTransaction tx3 = new TestTransaction(env);
- Column e1Col = typeLayer.bc().fam("vote").qual("election1").vis();
+ Column e1Col = new Column("vote", "election1");
// normally when this test runs, some of the row/columns being read below will be locked for a
// bit
- Map<String, Map<Column, Value>> votes =
- tx3.get().rowsString("bob9", "joe3", "sue4", "eve2").columns(e1Col).toStringMap();
+ Map<String, Map<Column, String>> votes =
+ tx3.gets(Arrays.asList("bob9", "joe3", "sue4", "eve2"), Sets.newHashSet(e1Col));
- Assert.assertEquals("N", votes.get("bob9").get(e1Col).toString(""));
- Assert.assertEquals("nay", votes.get("joe3").get(e1Col).toString(""));
- Assert.assertEquals("+1", votes.get("sue4").get(e1Col).toString(""));
- Assert.assertEquals("no", votes.get("eve2").get(e1Col).toString(""));
+ Assert.assertEquals("N", votes.get("bob9").get(e1Col));
+ Assert.assertEquals("nay", votes.get("joe3").get(e1Col));
+ Assert.assertEquals("+1", votes.get("sue4").get(e1Col));
+ Assert.assertEquals("no", votes.get("eve2").get(e1Col));
Assert.assertEquals(4, votes.size());
}
@@ -140,16 +138,18 @@ public class ParallelScannerIT extends ITBaseImpl {
runParallelRecoveryTest(false);
}
- void runParallelRecoveryTest(boolean closeTransID) throws Exception {
+ private static final Column COL = new Column("7", "7");
+
+ private void runParallelRecoveryTest(boolean closeTransID) throws Exception {
TestTransaction tx1 = new TestTransaction(env);
- tx1.mutate().row(5).fam(7).qual(7).set(3);
- tx1.mutate().row(12).fam(7).qual(7).set(10);
- tx1.mutate().row(19).fam(7).qual(7).set(17);
- tx1.mutate().row(26).fam(7).qual(7).set(24);
- tx1.mutate().row(33).fam(7).qual(7).set(31);
- tx1.mutate().row(40).fam(7).qual(7).set(38);
- tx1.mutate().row(47).fam(7).qual(7).set(45);
+ tx1.set("5", COL, "3");
+ tx1.set("12", COL, "10");
+ tx1.set("19", COL, "17");
+ tx1.set("26", COL, "24");
+ tx1.set("33", COL, "31");
+ tx1.set("40", COL, "38");
+ tx1.set("47", COL, "45");
tx1.done();
@@ -157,18 +157,18 @@ public class ParallelScannerIT extends ITBaseImpl {
TestTransaction tx2 = new TestTransaction(env, tNode1);
- tx2.mutate().row(5).fam(7).qual(7).set(7);
- tx2.mutate().row(12).fam(7).qual(7).set(14);
- tx2.mutate().row(19).fam(7).qual(7).set(21);
+ tx2.set("5", COL, "7");
+ tx2.set("12", COL, "14");
+ tx2.set("19", COL, "21");
CommitData cd2 = tx2.createCommitData();
Assert.assertTrue(tx2.preCommit(cd2));
TestTransaction tx3 = new TestTransaction(env, tNode1);
- tx3.mutate().row(26).fam(7).qual(7).set(28);
- tx3.mutate().row(33).fam(7).qual(7).set(35);
- tx3.mutate().row(40).fam(7).qual(7).set(42);
+ tx3.set("26", COL, "28");
+ tx3.set("33", COL, "35");
+ tx3.set("40", COL, "42");
CommitData cd3 = tx3.createCommitData();
Assert.assertTrue(tx3.preCommit(cd3));
@@ -187,24 +187,23 @@ public class ParallelScannerIT extends ITBaseImpl {
}
}
- void check() throws Exception {
+ private void check() throws Exception {
TestTransaction tx = new TestTransaction(env);
- Column scol = typeLayer.bc().fam(7).qual(7).vis();
- Map<String, Map<Column, Value>> votes =
- tx.get().rowsString("5", "12", "19", "26", "33", "40", "47").columns(scol).toStringMap();
+ Map<String, Map<Column, String>> votes =
+ tx.gets(Arrays.asList("5", "12", "19", "26", "33", "40", "47"), Sets.newHashSet(COL));
// following should be rolled back
- Assert.assertEquals(3, votes.get("5").get(scol).toInteger(0));
- Assert.assertEquals(10, votes.get("12").get(scol).toInteger(0));
- Assert.assertEquals(17, votes.get("19").get(scol).toInteger(0));
+ Assert.assertEquals("3", votes.get("5").get(COL));
+ Assert.assertEquals("10", votes.get("12").get(COL));
+ Assert.assertEquals("17", votes.get("19").get(COL));
// following should be rolled forward
- Assert.assertEquals(28, votes.get("26").get(scol).toInteger(0));
- Assert.assertEquals(35, votes.get("33").get(scol).toInteger(0));
- Assert.assertEquals(42, votes.get("40").get(scol).toInteger(0));
+ Assert.assertEquals("28", votes.get("26").get(COL));
+ Assert.assertEquals("35", votes.get("33").get(COL));
+ Assert.assertEquals("42", votes.get("40").get(COL));
// unchanged and not locked
- Assert.assertEquals(45, votes.get("47").get(scol).toInteger(0));
+ Assert.assertEquals("45", votes.get("47").get(COL));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/b2c91b95/modules/integration/src/test/java/org/apache/fluo/integration/impl/SelfNotificationIT.java
----------------------------------------------------------------------
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/impl/SelfNotificationIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/impl/SelfNotificationIT.java
index 1b99abc..4015339 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/impl/SelfNotificationIT.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/impl/SelfNotificationIT.java
@@ -20,15 +20,12 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import org.apache.fluo.api.client.Transaction;
import org.apache.fluo.api.client.TransactionBase;
import org.apache.fluo.api.config.ObserverConfiguration;
import org.apache.fluo.api.data.Bytes;
import org.apache.fluo.api.data.Column;
import org.apache.fluo.api.observer.AbstractObserver;
-import org.apache.fluo.api.types.StringEncoder;
-import org.apache.fluo.api.types.TypeLayer;
-import org.apache.fluo.api.types.TypedTransaction;
-import org.apache.fluo.api.types.TypedTransactionBase;
import org.apache.fluo.integration.ITBaseMini;
import org.junit.Assert;
import org.junit.Test;
@@ -38,43 +35,38 @@ import org.junit.Test;
*/
public class SelfNotificationIT extends ITBaseMini {
- static TypeLayer typeLayer = new TypeLayer(new StringEncoder());
-
- static final Column STAT_COUNT_COL = typeLayer.bc().fam("stat").qual("count").vis();
- static final Column EXPORT_CHECK_COL = typeLayer.bc().fam("export").qual("check").vis();
- static final Column EXPORT_COUNT_COL = typeLayer.bc().fam("export").qual("count").vis();
+ private static final Column STAT_COUNT_COL = new Column("stat", "count");
+ private static final Column EXPORT_CHECK_COL = new Column("export", "check");
+ private static final Column EXPORT_COUNT_COL = new Column("export", "count");
@Override
protected List<ObserverConfiguration> getObservers() {
return Collections.singletonList(new ObserverConfiguration(ExportingObserver.class.getName()));
}
- static List<Integer> exports = new ArrayList<>();
+ private static List<String> exports = new ArrayList<>();
public static class ExportingObserver extends AbstractObserver {
@Override
public void process(TransactionBase tx, Bytes row, Column col) throws Exception {
-
- TypedTransactionBase ttx = typeLayer.wrap(tx);
-
- Integer currentCount = ttx.get().row(row).col(STAT_COUNT_COL).toInteger();
- Integer exportCount = ttx.get().row(row).col(EXPORT_COUNT_COL).toInteger();
+ String r = row.toString();
+ String currentCount = tx.gets(r, STAT_COUNT_COL);
+ String exportCount = tx.gets(r, EXPORT_COUNT_COL);
if (exportCount != null) {
export(row, exportCount);
if (currentCount == null || exportCount.equals(currentCount)) {
- ttx.mutate().row(row).col(EXPORT_COUNT_COL).delete();
+ tx.delete(row, EXPORT_COUNT_COL);
} else {
- ttx.mutate().row(row).col(EXPORT_COUNT_COL).set(currentCount);
- ttx.mutate().row(row).col(EXPORT_CHECK_COL).set();
+ tx.set(r, EXPORT_COUNT_COL, currentCount);
+ tx.set(r, EXPORT_CHECK_COL, "");
}
-
}
}
- private void export(Bytes row, Integer exportCount) {
+ private void export(Bytes row, String exportCount) {
exports.add(exportCount);
}
@@ -87,36 +79,35 @@ public class SelfNotificationIT extends ITBaseMini {
@Test
public void test1() throws Exception {
- try (TypedTransaction tx1 = typeLayer.wrap(client.newTransaction())) {
- tx1.mutate().row("r1").col(STAT_COUNT_COL).set(3);
- tx1.mutate().row("r1").col(EXPORT_CHECK_COL).set();
- tx1.mutate().row("r1").col(EXPORT_COUNT_COL).set(3);
+ try (Transaction tx1 = client.newTransaction()) {
+ tx1.set("r1", STAT_COUNT_COL, "3");
+ tx1.set("r1", EXPORT_CHECK_COL, "");
+ tx1.set("r1", EXPORT_COUNT_COL, "3");
tx1.commit();
}
miniFluo.waitForObservers();
- Assert.assertEquals(Collections.singletonList(3), exports);
+ Assert.assertEquals(Collections.singletonList("3"), exports);
exports.clear();
miniFluo.waitForObservers();
Assert.assertEquals(0, exports.size());
- try (TypedTransaction tx2 = typeLayer.wrap(client.newTransaction())) {
- Assert.assertNull(tx2.get().row("r1").col(EXPORT_COUNT_COL).toInteger());
+ try (Transaction tx2 = client.newTransaction()) {
+ Assert.assertNull(tx2.gets("r1", EXPORT_COUNT_COL));
- tx2.mutate().row("r1").col(STAT_COUNT_COL).set(5);
- tx2.mutate().row("r1").col(EXPORT_CHECK_COL).set();
- tx2.mutate().row("r1").col(EXPORT_COUNT_COL).set(4);
+ tx2.set("r1", STAT_COUNT_COL, "5");
+ tx2.set("r1", EXPORT_CHECK_COL, "");
+ tx2.set("r1", EXPORT_COUNT_COL, "4");
tx2.commit();
}
miniFluo.waitForObservers();
- Assert.assertEquals(Arrays.asList(4, 5), exports);
+ Assert.assertEquals(Arrays.asList("4", "5"), exports);
exports.clear();
miniFluo.waitForObservers();
Assert.assertEquals(0, exports.size());
-
}
// TODO test self notification w/ weak notifications
http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/b2c91b95/modules/integration/src/test/java/org/apache/fluo/integration/impl/StochasticBankIT.java
----------------------------------------------------------------------
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/impl/StochasticBankIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/impl/StochasticBankIT.java
index 1b19143..720bfd8 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/impl/StochasticBankIT.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/impl/StochasticBankIT.java
@@ -40,8 +40,6 @@ import org.apache.fluo.api.data.Column;
import org.apache.fluo.api.exceptions.CommitException;
import org.apache.fluo.api.iterator.ColumnIterator;
import org.apache.fluo.api.iterator.RowIterator;
-import org.apache.fluo.api.types.StringEncoder;
-import org.apache.fluo.api.types.TypeLayer;
import org.apache.fluo.core.impl.Environment;
import org.apache.fluo.integration.ITBaseImpl;
import org.apache.fluo.integration.TestTransaction;
@@ -55,7 +53,6 @@ import org.junit.Test;
*/
public class StochasticBankIT extends ITBaseImpl {
- static TypeLayer typeLayer = new TypeLayer(new StringEncoder());
private static AtomicInteger txCount = new AtomicInteger();
@Test
@@ -100,13 +97,13 @@ public class StochasticBankIT extends ITBaseImpl {
runVerifier(env, numAccounts, 1);
}
- private static Column balanceCol = typeLayer.bc().fam("data").qual("balance").vis();
+ private static Column balanceCol = new Column("data", "balance");
private static void populate(Environment env, int numAccounts) throws Exception {
TestTransaction tx = new TestTransaction(env);
for (int i = 0; i < numAccounts; i++) {
- tx.mutate().row(fmtAcct(i)).col(balanceCol).set(1000);
+ tx.set(fmtAcct(i), balanceCol, "1000");
}
tx.done();
@@ -155,12 +152,12 @@ public class StochasticBankIT extends ITBaseImpl {
while (true) {
try {
TestTransaction tx = new TestTransaction(env);
- int bal1 = tx.get().row(from).col(balanceCol).toInteger();
- int bal2 = tx.get().row(to).col(balanceCol).toInteger();
+ int bal1 = Integer.parseInt(tx.gets(from, balanceCol));
+ int bal2 = Integer.parseInt(tx.gets(to, balanceCol));
if (bal1 - amt >= 0) {
- tx.mutate().row(from).col(balanceCol).set(bal1 - amt);
- tx.mutate().row(to).col(balanceCol).set(bal2 + amt);
+ tx.set(from, balanceCol, (bal1 - amt) + "");
+ tx.set(to, balanceCol, (bal2 + amt) + "");
} else {
break;
}
http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/b2c91b95/modules/integration/src/test/java/org/apache/fluo/integration/impl/WeakNotificationIT.java
----------------------------------------------------------------------
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/impl/WeakNotificationIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/impl/WeakNotificationIT.java
index 9142f8e..ea5921c 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/impl/WeakNotificationIT.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/impl/WeakNotificationIT.java
@@ -19,6 +19,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Map.Entry;
+import org.apache.fluo.api.client.Transaction;
import org.apache.fluo.api.client.TransactionBase;
import org.apache.fluo.api.config.ObserverConfiguration;
import org.apache.fluo.api.config.ScannerConfiguration;
@@ -28,31 +29,28 @@ import org.apache.fluo.api.data.Span;
import org.apache.fluo.api.iterator.ColumnIterator;
import org.apache.fluo.api.iterator.RowIterator;
import org.apache.fluo.api.observer.AbstractObserver;
-import org.apache.fluo.api.types.StringEncoder;
-import org.apache.fluo.api.types.TypeLayer;
-import org.apache.fluo.api.types.TypedTransaction;
-import org.apache.fluo.api.types.TypedTransactionBase;
import org.apache.fluo.core.impl.Environment;
import org.apache.fluo.core.impl.TransactionImpl.CommitData;
import org.apache.fluo.core.oracle.Stamp;
import org.apache.fluo.integration.ITBaseMini;
import org.apache.fluo.integration.TestTransaction;
+import org.apache.fluo.integration.TestUtil;
import org.junit.Assert;
import org.junit.Test;
public class WeakNotificationIT extends ITBaseMini {
- private static TypeLayer tl = new TypeLayer(new StringEncoder());
+ private static final Column STAT_COUNT = new Column("stat", "count");
+ private static final Column STAT_CHECK = new Column("stat", "check");
public static class SimpleObserver extends AbstractObserver {
@Override
public void process(TransactionBase tx, Bytes row, Column col) throws Exception {
- TypedTransactionBase ttx = tl.wrap(tx);
ScannerConfiguration sc = new ScannerConfiguration();
sc.setSpan(Span.exact(row, new Column(Bytes.of("stats"))));
- RowIterator rowIter = ttx.get(sc);
+ RowIterator rowIter = tx.get(sc);
int sum = 0;
@@ -61,19 +59,19 @@ public class WeakNotificationIT extends ITBaseMini {
while (colIter.hasNext()) {
Entry<Column, Bytes> colVal = colIter.next();
sum += Integer.parseInt(colVal.getValue().toString());
- ttx.delete(row, colVal.getKey());
+ tx.delete(row, colVal.getKey());
}
}
if (sum != 0) {
- sum += ttx.get().row(row).fam("stat").qual("count").toInteger(0);
- ttx.mutate().row(row).fam("stat").qual("count").set(sum);
+ sum += TestUtil.getOrDefault(tx, row.toString(), STAT_COUNT, 0);
+ tx.set(row.toString(), STAT_COUNT, sum + "");
}
}
@Override
public ObservedColumn getObservedColumn() {
- return new ObservedColumn(tl.bc().fam("stat").qual("check").vis(), NotificationType.WEAK);
+ return new ObservedColumn(STAT_CHECK, NotificationType.WEAK);
}
}
@@ -87,34 +85,34 @@ public class WeakNotificationIT extends ITBaseMini {
Environment env = new Environment(config);
TestTransaction tx1 = new TestTransaction(env);
- tx1.mutate().row("r1").fam("stat").qual("count").set(3);
+ tx1.set("r1", STAT_COUNT, "3");
tx1.done();
TestTransaction tx2 = new TestTransaction(env);
- tx2.mutate().row("r1").fam("stats").qual("af89").set(5);
- tx2.mutate().row("r1").fam("stat").qual("check").weaklyNotify();
+ tx2.set("r1", new Column("stats", "af89"), "5");
+ tx2.setWeakNotification("r1", STAT_CHECK);
tx2.done();
TestTransaction tx3 = new TestTransaction(env);
- tx3.mutate().row("r1").fam("stats").qual("af99").set(7);
- tx3.mutate().row("r1").fam("stat").qual("check").weaklyNotify();
+ tx3.set("r1", new Column("stats", "af99"), "7");
+ tx3.setWeakNotification("r1", STAT_CHECK);
tx3.done();
miniFluo.waitForObservers();
TestTransaction tx4 = new TestTransaction(env);
- Assert.assertEquals(15, tx4.get().row("r1").fam("stat").qual("count").toInteger(0));
+ Assert.assertEquals("15", tx4.gets("r1", STAT_COUNT));
// overlapping transactions that set a weak notification should commit w/ no problem
TestTransaction tx5 = new TestTransaction(env);
- tx5.mutate().row("r1").fam("stats").qual("bff7").set(11);
- tx5.mutate().row("r1").fam("stat").qual("check").weaklyNotify();
+ tx5.set("r1", new Column("stats", "bff7"), "11");
+ tx5.setWeakNotification("r1", STAT_CHECK);
CommitData cd5 = tx5.createCommitData();
Assert.assertTrue(tx5.preCommit(cd5));
TestTransaction tx6 = new TestTransaction(env);
- tx6.mutate().row("r1").fam("stats").qual("bff0").set(13);
- tx6.mutate().row("r1").fam("stat").qual("check").weaklyNotify();
+ tx6.set("r1", new Column("stats", "bff0"), "13");
+ tx6.setWeakNotification("r1", STAT_CHECK);
CommitData cd6 = tx6.createCommitData();
Assert.assertTrue(tx6.preCommit(cd6));
@@ -130,7 +128,7 @@ public class WeakNotificationIT extends ITBaseMini {
miniFluo.waitForObservers();
TestTransaction tx7 = new TestTransaction(env);
- Assert.assertEquals(39, tx7.get().row("r1").fam("stat").qual("count").toInteger(0));
+ Assert.assertEquals("39", tx7.gets("r1", STAT_COUNT));
env.close();
}
@@ -139,9 +137,9 @@ public class WeakNotificationIT extends ITBaseMini {
public void testNOOP() throws Exception {
// if an observer makes not updates in a transaction, it should still delete the weak
// notification
- try (TypedTransaction tx1 = tl.wrap(client.newTransaction())) {
- tx1.mutate().row("r1").fam("stat").qual("count").set(3);
- tx1.mutate().row("r1").fam("stat").qual("check").weaklyNotify();
+ try (Transaction tx1 = client.newTransaction()) {
+ tx1.set("r1", STAT_COUNT, "3");
+ tx1.setWeakNotification("r1", STAT_CHECK);
tx1.commit();
}
@@ -151,9 +149,9 @@ public class WeakNotificationIT extends ITBaseMini {
@Test(expected = IllegalArgumentException.class)
public void testBadColumn() throws Exception {
- try (TypedTransaction tx1 = tl.wrap(client.newTransaction())) {
- tx1.mutate().row("r1").fam("stat").qual("count").set(3);
- tx1.mutate().row("r1").fam("stat").qual("foo").weaklyNotify();
+ try (Transaction tx1 = client.newTransaction()) {
+ tx1.set("r1", STAT_COUNT, "3");
+ tx1.setWeakNotification("r1", new Column("stat", "foo"));
tx1.commit();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/b2c91b95/modules/integration/src/test/java/org/apache/fluo/integration/impl/WeakNotificationOverlapIT.java
----------------------------------------------------------------------
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/impl/WeakNotificationOverlapIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/impl/WeakNotificationOverlapIT.java
index 4ccfd3d..07ff45f 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/impl/WeakNotificationOverlapIT.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/impl/WeakNotificationOverlapIT.java
@@ -15,59 +15,63 @@
package org.apache.fluo.integration.impl;
-import java.util.Arrays;
+import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
+import com.google.common.primitives.Ints;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.security.Authorizations;
+import org.apache.fluo.api.client.Snapshot;
+import org.apache.fluo.api.client.TransactionBase;
import org.apache.fluo.api.config.ObserverConfiguration;
import org.apache.fluo.api.data.Bytes;
import org.apache.fluo.api.data.Column;
-import org.apache.fluo.api.types.StringEncoder;
-import org.apache.fluo.api.types.TypeLayer;
-import org.apache.fluo.api.types.TypedObserver;
-import org.apache.fluo.api.types.TypedSnapshot;
-import org.apache.fluo.api.types.TypedTransactionBase;
+import org.apache.fluo.api.observer.AbstractObserver;
import org.apache.fluo.core.impl.Notification;
import org.apache.fluo.core.impl.TransactionImpl.CommitData;
import org.apache.fluo.core.oracle.Stamp;
import org.apache.fluo.integration.ITBaseImpl;
import org.apache.fluo.integration.TestTransaction;
+import org.apache.fluo.integration.TestUtil;
import org.junit.Assert;
import org.junit.Test;
public class WeakNotificationOverlapIT extends ITBaseImpl {
- static TypeLayer typeLayer = new TypeLayer(new StringEncoder());
+ private static final Column STAT_TOTAL = new Column("stat", "total");
+ private static final Column STAT_PROCESSED = new Column("stat", "processed");
+ private static final Column STAT_CHANGED = new Column("stat", "changed");
- public static class TotalObserver extends TypedObserver {
+ public static class TotalObserver extends AbstractObserver {
@Override
public ObservedColumn getObservedColumn() {
- return new ObservedColumn(typeLayer.bc().fam("stat").qual("changed").vis(),
- NotificationType.WEAK);
+ return new ObservedColumn(STAT_CHANGED, NotificationType.WEAK);
}
@Override
- public void process(TypedTransactionBase tx, Bytes row, Column col) {
- Integer total = tx.get().row(row).fam("stat").qual("total").toInteger();
- if (total == null) {
+ public void process(TransactionBase tx, Bytes row, Column col) {
+ String r = row.toString();
+ String totalStr = tx.gets(r, STAT_TOTAL);
+ if (totalStr == null) {
return;
}
- int processed = tx.get().row(row).fam("stat").qual("processed").toInteger(0);
-
- tx.mutate().row(row).fam("stat").qual("processed").set(total);
- tx.mutate().row("all").fam("stat").qual("total").increment(total - processed);
+ Integer total = Integer.parseInt(totalStr);
+ int processed = TestUtil.getOrDefault(tx, r, STAT_PROCESSED, 0);
+ tx.set(r, new Column("stat", "processed"), total + "");
+ TestUtil.increment(tx, "all", new Column("stat", "total"), total - processed);
}
}
+
+
@Override
protected List<ObserverConfiguration> getObservers() {
- return Arrays.asList(new ObserverConfiguration(TotalObserver.class.getName()));
+ return Collections.singletonList(new ObserverConfiguration(TotalObserver.class.getName()));
}
@Test
@@ -75,80 +79,78 @@ public class WeakNotificationOverlapIT extends ITBaseImpl {
// this test ensures that processing of weak notification deletes based on startTs and not
// commitTs
- Column ntfyCol = typeLayer.bc().fam("stat").qual("changed").vis();
-
TestTransaction ttx1 = new TestTransaction(env);
- ttx1.mutate().row(1).fam("stat").qual("total").increment(1);
- ttx1.mutate().row(1).col(ntfyCol).weaklyNotify();
+ TestUtil.increment(ttx1, "1", STAT_TOTAL, 1);
+ ttx1.setWeakNotification("1", STAT_CHANGED);
ttx1.done();
- TestTransaction ttx2 = new TestTransaction(env, "1", ntfyCol);
+ TestTransaction ttx2 = new TestTransaction(env, "1", STAT_CHANGED);
TestTransaction ttx3 = new TestTransaction(env);
- ttx3.mutate().row(1).fam("stat").qual("total").increment(1);
- ttx3.mutate().row(1).col(ntfyCol).weaklyNotify();
+ TestUtil.increment(ttx3, "1", STAT_TOTAL, 1);
+ ttx3.setWeakNotification("1", STAT_CHANGED);
ttx3.done();
Assert.assertEquals(1, countNotifications());
- new TotalObserver().process(ttx2, Bytes.of("1"), ntfyCol);
+ new TotalObserver().process(ttx2, Bytes.of("1"), STAT_CHANGED);
// should not delete notification created by ttx3
ttx2.done();
TestTransaction snap1 = new TestTransaction(env);
- Assert.assertEquals(1, snap1.get().row("all").fam("stat").qual("total").toInteger(-1));
+ Assert.assertEquals("1", snap1.gets("all", STAT_TOTAL));
snap1.done();
Assert.assertEquals(1, countNotifications());
- TestTransaction ttx4 = new TestTransaction(env, "1", ntfyCol);
- new TotalObserver().process(ttx4, Bytes.of("1"), ntfyCol);
+ TestTransaction ttx4 = new TestTransaction(env, "1", STAT_CHANGED);
+ new TotalObserver().process(ttx4, Bytes.of("1"), STAT_CHANGED);
ttx4.done();
Assert.assertEquals(0, countNotifications());
TestTransaction snap2 = new TestTransaction(env);
- Assert.assertEquals(2, snap2.get().row("all").fam("stat").qual("total").toInteger(-1));
+ Assert.assertEquals("2", snap2.gets("all", STAT_TOTAL));
snap2.done();
// the following code is a repeat of the above with a slight diff. The following tx creates a
// notification, but deletes the data so there is no work for the
// observer. This test the case where a observer deletes a notification w/o making any updates.
TestTransaction ttx5 = new TestTransaction(env);
- ttx5.mutate().row(1).fam("stat").qual("total").delete();
- ttx5.mutate().row(1).fam("stat").qual("processed").delete();
- ttx5.mutate().row(1).col(ntfyCol).weaklyNotify();
+ ttx5.delete("1", STAT_TOTAL);
+ ttx5.delete("1", STAT_PROCESSED);
+ ttx5.setWeakNotification("1", STAT_CHANGED);
ttx5.done();
Assert.assertEquals(1, countNotifications());
- TestTransaction ttx6 = new TestTransaction(env, "1", ntfyCol);
+ TestTransaction ttx6 = new TestTransaction(env, "1", STAT_CHANGED);
TestTransaction ttx7 = new TestTransaction(env);
- ttx7.mutate().row(1).fam("stat").qual("total").increment(1);
- ttx7.mutate().row(1).col(ntfyCol).weaklyNotify();
+ TestUtil.increment(ttx7, "1", STAT_TOTAL, 1);
+ ttx7.setWeakNotification("1", STAT_CHANGED);
ttx7.done();
Assert.assertEquals(1, countNotifications());
- new TotalObserver().process(ttx6, Bytes.of("1"), ntfyCol);
+ new TotalObserver().process(ttx6, Bytes.of("1"), STAT_CHANGED);
// should not delete notification created by ttx7
ttx6.done();
Assert.assertEquals(1, countNotifications());
TestTransaction snap3 = new TestTransaction(env);
- Assert.assertEquals(2, snap3.get().row("all").fam("stat").qual("total").toInteger(-1));
+ Assert.assertEquals("2", snap3.gets("all", STAT_TOTAL));
snap3.done();
- TestTransaction ttx8 = new TestTransaction(env, "1", ntfyCol);
- new TotalObserver().process(ttx8, Bytes.of("1"), ntfyCol);
+ TestTransaction ttx8 = new TestTransaction(env, "1", STAT_CHANGED);
+ new TotalObserver().process(ttx8, Bytes.of("1"), STAT_CHANGED);
ttx8.done();
Assert.assertEquals(0, countNotifications());
TestTransaction snap4 = new TestTransaction(env);
- Assert.assertEquals(3, snap4.get().row("all").fam("stat").qual("total").toInteger(-1));
+ Assert.assertEquals("3", snap4.gets("all", STAT_TOTAL));
snap4.done();
}
@@ -156,25 +158,23 @@ public class WeakNotificationOverlapIT extends ITBaseImpl {
public void testOverlap2() throws Exception {
// this test ensures that setting weak notification is based on commitTs and not startTs
- Column ntfyCol = typeLayer.bc().fam("stat").qual("changed").vis();
-
TestTransaction ttx1 = new TestTransaction(env);
- ttx1.mutate().row(1).fam("stat").qual("total").increment(1);
- ttx1.mutate().row(1).col(ntfyCol).weaklyNotify();
+ TestUtil.increment(ttx1, "1", STAT_TOTAL, 1);
+ ttx1.setWeakNotification("1", STAT_CHANGED);
ttx1.done();
Assert.assertEquals(1, countNotifications());
TestTransaction ttx2 = new TestTransaction(env);
- ttx2.mutate().row(1).fam("stat").qual("total").increment(1);
- ttx2.mutate().row(1).col(ntfyCol).weaklyNotify();
+ TestUtil.increment(ttx2, "1", STAT_TOTAL, 1);
+ ttx2.setWeakNotification("1", STAT_CHANGED);
CommitData cd2 = ttx2.createCommitData();
Assert.assertTrue(ttx2.preCommit(cd2));
// simulate an observer processing the notification created by ttx1 while ttx2 is in the middle
// of committing. Processing this observer should not delete
// the notification for ttx2. It should delete the notification for ttx1.
- TestTransaction ttx3 = new TestTransaction(env, "1", ntfyCol);
+ TestTransaction ttx3 = new TestTransaction(env, "1", STAT_CHANGED);
Stamp commitTs = env.getSharedResources().getOracleClient().getStamp();
Assert.assertTrue(ttx2.commitPrimaryColumn(cd2, commitTs));
@@ -183,21 +183,21 @@ public class WeakNotificationOverlapIT extends ITBaseImpl {
Assert.assertEquals(1, countNotifications());
- new TotalObserver().process(ttx3, Bytes.of("1"), ntfyCol);
+ new TotalObserver().process(ttx3, Bytes.of("1"), STAT_CHANGED);
ttx3.done();
Assert.assertEquals(1, countNotifications());
- try (TypedSnapshot snapshot = typeLayer.wrap(client.newSnapshot())) {
- Assert.assertEquals(1, snapshot.get().row("all").fam("stat").qual("total").toInteger(-1));
+ try (Snapshot snapshot = client.newSnapshot()) {
+ Assert.assertEquals("1", snapshot.gets("all", STAT_TOTAL));
}
- TestTransaction ttx4 = new TestTransaction(env, "1", ntfyCol);
- new TotalObserver().process(ttx4, Bytes.of("1"), ntfyCol);
+ TestTransaction ttx4 = new TestTransaction(env, "1", STAT_CHANGED);
+ new TotalObserver().process(ttx4, Bytes.of("1"), STAT_CHANGED);
ttx4.done();
Assert.assertEquals(0, countNotifications());
- try (TypedSnapshot snapshot = typeLayer.wrap(client.newSnapshot())) {
- Assert.assertEquals(2, snapshot.get().row("all").fam("stat").qual("total").toInteger(-1));
+ try (Snapshot snapshot = client.newSnapshot()) {
+ Assert.assertEquals("2", snapshot.gets("all", STAT_TOTAL));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/b2c91b95/modules/integration/src/test/java/org/apache/fluo/integration/impl/WorkerIT.java
----------------------------------------------------------------------
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/impl/WorkerIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/impl/WorkerIT.java
index c89a91a..e4204aa 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/impl/WorkerIT.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/impl/WorkerIT.java
@@ -18,6 +18,8 @@ package org.apache.fluo.integration.impl;
import java.util.Collections;
import java.util.List;
+import org.apache.fluo.api.client.Snapshot;
+import org.apache.fluo.api.client.Transaction;
import org.apache.fluo.api.client.TransactionBase;
import org.apache.fluo.api.config.ObserverConfiguration;
import org.apache.fluo.api.config.ScannerConfiguration;
@@ -28,11 +30,6 @@ import org.apache.fluo.api.data.Span;
import org.apache.fluo.api.iterator.ColumnIterator;
import org.apache.fluo.api.iterator.RowIterator;
import org.apache.fluo.api.observer.Observer;
-import org.apache.fluo.api.types.StringEncoder;
-import org.apache.fluo.api.types.TypeLayer;
-import org.apache.fluo.api.types.TypedSnapshot;
-import org.apache.fluo.api.types.TypedTransaction;
-import org.apache.fluo.api.types.TypedTransactionBase;
import org.apache.fluo.core.impl.Environment;
import org.apache.fluo.core.impl.TransactionImpl.CommitData;
import org.apache.fluo.core.worker.NotificationFinder;
@@ -50,9 +47,10 @@ import org.junit.Test;
*/
public class WorkerIT extends ITBaseMini {
- static TypeLayer typeLayer = new TypeLayer(new StringEncoder());
+ private static final Column LAST_UPDATE = new Column("attr", "lastupdate");
+ private static final Column DEGREE = new Column("attr", "degree");
- private static Column observedColumn = typeLayer.bc().fam("attr").qual("lastupdate").vis();
+ private static Column observedColumn = LAST_UPDATE;
@Override
protected List<ObserverConfiguration> getObservers() {
@@ -65,17 +63,17 @@ public class WorkerIT extends ITBaseMini {
public void init(Context context) {}
@Override
- public void process(TransactionBase tx, Bytes row, Column col) throws Exception {
+ public void process(TransactionBase tx, Bytes rowBytes, Column col) throws Exception {
- TypedTransactionBase ttx = typeLayer.wrap(tx);
+ String row = rowBytes.toString();
// get previously calculated degree
- String degree = ttx.get().row(row).fam("attr").qual("degree").toString();
+ String degree = tx.gets(row, DEGREE);
// calculate new degree
int count = 0;
RowIterator riter =
- ttx.get(new ScannerConfiguration().setSpan(Span.exact(row, new Column("link"))));
+ tx.get(new ScannerConfiguration().setSpan(Span.exact(row, new Column("link"))));
while (riter.hasNext()) {
ColumnIterator citer = riter.next().getValue();
while (citer.hasNext()) {
@@ -86,15 +84,15 @@ public class WorkerIT extends ITBaseMini {
String degree2 = "" + count;
if (degree == null || !degree.equals(degree2)) {
- ttx.mutate().row(row).fam("attr").qual("degree").set(degree2);
+ tx.set(row, DEGREE, degree2);
// put new entry in degree index
- ttx.mutate().row("IDEG" + degree2).fam("node").qual(row).set("");
+ tx.set("IDEG" + degree2, new Column("node", row), "");
}
if (degree != null) {
// delete old degree in index
- ttx.mutate().row("IDEG" + degree).fam("node").qual(row).delete();
+ tx.delete("IDEG" + degree, new Column("node", row));
}
}
@@ -119,12 +117,12 @@ public class WorkerIT extends ITBaseMini {
// verify observer updated degree index
TestTransaction tx3 = new TestTransaction(env);
- Assert.assertEquals(2, tx3.get().row("N0003").fam("attr").qual("degree").toInteger(0));
- Assert.assertEquals("", tx3.get().row("IDEG2").fam("node").qual("N0003").toString());
+ Assert.assertEquals("2", tx3.gets("N0003", DEGREE));
+ Assert.assertEquals("", tx3.gets("IDEG2", new Column("node", "N0003")));
// add a link between two nodes in a graph
- tx3.mutate().row("N0003").fam("link").qual("N0010").set("");
- tx3.mutate().row("N0003").fam("attr").qual("lastupdate").set(System.currentTimeMillis());
+ tx3.set("N0003", new Column("link", "N0010"), "");
+ tx3.set("N0003", LAST_UPDATE, System.currentTimeMillis() + "");
tx3.done();
miniFluo.waitForObservers();
@@ -132,28 +130,28 @@ public class WorkerIT extends ITBaseMini {
// verify observer updated degree index. Should have deleted old index entry
// and added a new one
TestTransaction tx4 = new TestTransaction(env);
- Assert.assertEquals(3, tx4.get().row("N0003").fam("attr").qual("degree").toInteger(0));
- Assert.assertNull("", tx4.get().row("IDEG2").fam("node").qual("N0003").toString());
- Assert.assertEquals("", tx4.get().row("IDEG3").fam("node").qual("N0003").toString());
+ Assert.assertEquals("3", tx4.gets("N0003", DEGREE));
+ Assert.assertNull("", tx4.gets("IDEG2", new Column("node", "N0003")));
+ Assert.assertEquals("", tx4.gets("IDEG3", new Column("node", "N0003")));
// test rollback
TestTransaction tx5 = new TestTransaction(env);
- tx5.mutate().row("N0003").fam("link").qual("N0030").set("");
- tx5.mutate().row("N0003").fam("attr").qual("lastupdate").set(System.currentTimeMillis());
+ tx5.set("N0003", new Column("link", "N0030"), "");
+ tx5.set("N0003", LAST_UPDATE, System.currentTimeMillis() + "");
tx5.done();
TestTransaction tx6 = new TestTransaction(env);
- tx6.mutate().row("N0003").fam("link").qual("N0050").set("");
- tx6.mutate().row("N0003").fam("attr").qual("lastupdate").set(System.currentTimeMillis());
+ tx6.set("N0003", new Column("link", "N0050"), "");
+ tx6.set("N0003", LAST_UPDATE, System.currentTimeMillis() + "");
CommitData cd = tx6.createCommitData();
- tx6.preCommit(cd, new RowColumn("N0003", new Column("attr", "lastupdate")));
+ tx6.preCommit(cd, new RowColumn("N0003", LAST_UPDATE));
miniFluo.waitForObservers();
TestTransaction tx7 = new TestTransaction(env);
- Assert.assertEquals(4, tx7.get().row("N0003").fam("attr").qual("degree").toInteger(0));
- Assert.assertNull("", tx7.get().row("IDEG3").fam("node").qual("N0003").toString());
- Assert.assertEquals("", tx7.get().row("IDEG4").fam("node").qual("N0003").toString());
+ Assert.assertEquals("4", tx7.gets("N0003", DEGREE));
+ Assert.assertNull("", tx7.gets("IDEG3", new Column("node", "N0003")));
+ Assert.assertEquals("", tx7.gets("IDEG4", new Column("node", "N0003")));
env.close();
}
@@ -163,11 +161,10 @@ public class WorkerIT extends ITBaseMini {
*/
@Test
public void testDiffObserverConfig() throws Exception {
- Column old = observedColumn;
- observedColumn = typeLayer.bc().fam("attr2").qual("lastupdate").vis();
+ observedColumn = new Column("attr2", "lastupdate");
try {
try (Environment env = new Environment(config); Observers observers = new Observers(env)) {
- observers.getObserver(typeLayer.bc().fam("attr").qual("lastupdate").vis());
+ observers.getObserver(LAST_UPDATE);
}
Assert.fail();
@@ -176,14 +173,14 @@ public class WorkerIT extends ITBaseMini {
Assert.assertTrue(ise.getMessage().contains(
"Mismatch between configured column and class column"));
} finally {
- observedColumn = old;
+ observedColumn = LAST_UPDATE;
}
}
private void addLink(String from, String to) {
- try (TypedTransaction tx = typeLayer.wrap(client.newTransaction())) {
- tx.mutate().row(from).fam("link").qual(to).set("");
- tx.mutate().row(from).fam("attr").qual("lastupdate").set(System.currentTimeMillis());
+ try (Transaction tx = client.newTransaction()) {
+ tx.set(from, new Column("link", to), "");
+ tx.set(from, LAST_UPDATE, System.currentTimeMillis() + "");
tx.commit();
}
}
@@ -207,9 +204,9 @@ public class WorkerIT extends ITBaseMini {
miniFluo.waitForObservers();
- try (TypedSnapshot snap = typeLayer.wrap(client.newSnapshot())) {
- Assert.assertEquals(10, snap.get().row("N0003").fam("attr").qual("degree").toInteger(0));
- Assert.assertEquals("", snap.get().row("IDEG10").fam("node").qual("N0003").toString());
+ try (Snapshot snap = client.newSnapshot()) {
+ Assert.assertEquals("10", snap.gets("N0003", DEGREE));
+ Assert.assertEquals("", snap.gets("IDEG10", new Column("node", "N0003")));
}
nf2.stop();
@@ -220,10 +217,10 @@ public class WorkerIT extends ITBaseMini {
miniFluo.waitForObservers();
- try (TypedSnapshot snap = typeLayer.wrap(client.newSnapshot())) {
- Assert.assertEquals(19, snap.get().row("N0003").fam("attr").qual("degree").toInteger(0));
- Assert.assertEquals("", snap.get().row("IDEG19").fam("node").qual("N0003").toString());
- Assert.assertNull(snap.get().row("IDEG10").fam("node").qual("N0003").toString());
+ try (Snapshot snap = client.newSnapshot()) {
+ Assert.assertEquals("19", snap.gets("N0003", DEGREE));
+ Assert.assertEquals("", snap.gets("IDEG19", new Column("node", "N0003")));
+ Assert.assertNull(snap.gets("IDEG10", new Column("node", "N0003")));
}
nf1.stop();
http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/b2c91b95/modules/integration/src/test/java/org/apache/fluo/integration/log/LogIT.java
----------------------------------------------------------------------
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/log/LogIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/log/LogIT.java
index 755fcfe..b26f0ee 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/log/LogIT.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/log/LogIT.java
@@ -20,20 +20,18 @@ import java.util.Arrays;
import java.util.List;
import com.google.common.collect.ImmutableSet;
+import org.apache.fluo.api.client.Loader;
import org.apache.fluo.api.client.LoaderExecutor;
import org.apache.fluo.api.client.Snapshot;
import org.apache.fluo.api.client.Transaction;
+import org.apache.fluo.api.client.TransactionBase;
import org.apache.fluo.api.config.ObserverConfiguration;
import org.apache.fluo.api.data.Bytes;
import org.apache.fluo.api.data.Column;
import org.apache.fluo.api.data.RowColumn;
-import org.apache.fluo.api.types.StringEncoder;
-import org.apache.fluo.api.types.TypeLayer;
-import org.apache.fluo.api.types.TypedLoader;
-import org.apache.fluo.api.types.TypedObserver;
-import org.apache.fluo.api.types.TypedSnapshot;
-import org.apache.fluo.api.types.TypedTransactionBase;
+import org.apache.fluo.api.observer.AbstractObserver;
import org.apache.fluo.integration.ITBaseMini;
+import org.apache.fluo.integration.TestUtil;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.log4j.PatternLayout;
@@ -43,16 +41,16 @@ import org.junit.Test;
public class LogIT extends ITBaseMini {
- private static TypeLayer tl = new TypeLayer(new StringEncoder());
+ private static final Column STAT_COUNT = new Column("stat", "count");
- static class SimpleLoader extends TypedLoader {
- @Override
- public void load(TypedTransactionBase tx, Context context) throws Exception {
- tx.mutate().row("r1").fam("a").qual("b").increment(1);
+ static class SimpleLoader implements Loader {
+
+ public void load(TransactionBase tx, Context context) throws Exception {
+ TestUtil.increment(tx, "r1", new Column("a", "b"), 1);
}
}
- static class TriggerLoader extends TypedLoader {
+ static class TriggerLoader implements Loader {
int r;
@@ -61,9 +59,9 @@ public class LogIT extends ITBaseMini {
}
@Override
- public void load(TypedTransactionBase tx, Context context) throws Exception {
- tx.mutate().row(r).fam("stat").qual("count").set(1);
- tx.mutate().row(r).fam("stat").qual("count").weaklyNotify();
+ public void load(TransactionBase tx, Context context) throws Exception {
+ tx.set(r + "", STAT_COUNT, "1");
+ tx.setWeakNotification(r + "", STAT_COUNT);
}
}
@@ -75,10 +73,10 @@ public class LogIT extends ITBaseMini {
private static Column bCol2 = new Column(Bytes.of(new byte[] {'c', 0x09, '2'}),
Bytes.of(new byte[] {'c', (byte) 0xe5, '2'}));
- static class BinaryLoader1 extends TypedLoader {
+ static class BinaryLoader1 implements Loader {
@Override
- public void load(TypedTransactionBase tx, Context context) throws Exception {
+ public void load(TransactionBase tx, Context context) throws Exception {
tx.delete(bRow1, bCol1);
tx.get(bRow2, bCol1);
@@ -92,7 +90,7 @@ public class LogIT extends ITBaseMini {
}
}
- public static class BinaryObserver extends TypedObserver {
+ public static class BinaryObserver extends AbstractObserver {
@Override
public ObservedColumn getObservedColumn() {
@@ -100,23 +98,23 @@ public class LogIT extends ITBaseMini {
}
@Override
- public void process(TypedTransactionBase tx, Bytes row, Column col) {
+ public void process(TransactionBase tx, Bytes row, Column col) {
tx.get(bRow1, bCol2);
tx.get(bRow2, ImmutableSet.of(bCol1, bCol2));
tx.get(ImmutableSet.of(bRow1, bRow2), ImmutableSet.of(bCol1, bCol2));
}
}
- public static class TestObserver extends TypedObserver {
+ public static class TestObserver extends AbstractObserver {
@Override
public ObservedColumn getObservedColumn() {
- return new ObservedColumn(tl.bc().fam("stat").qual("count").vis(), NotificationType.WEAK);
+ return new ObservedColumn(STAT_COUNT, NotificationType.WEAK);
}
@Override
- public void process(TypedTransactionBase tx, Bytes row, Column col) {
- tx.mutate().row("all").col(col).increment(tx.get().row(row).col(col).toInteger());
+ public void process(TransactionBase tx, Bytes row, Column col) {
+ TestUtil.increment(tx, "all", col, Integer.parseInt(tx.gets(row.toString(), col)));
}
}
@@ -245,9 +243,9 @@ public class LogIT extends ITBaseMini {
}
miniFluo.waitForObservers();
- try (TypedSnapshot snap = tl.wrap(client.newSnapshot())) {
- Assert.assertTrue(snap.get().row("all").fam("stat").qual("count").toInteger(-1) >= 1);
- Assert.assertEquals(1, snap.get().row("r1").fam("a").qual("b").toInteger(-1));
+ try (Snapshot snap = client.newSnapshot()) {
+ Assert.assertTrue(Integer.parseInt(snap.gets("all", STAT_COUNT)) >= 1);
+ Assert.assertEquals("1", snap.gets("r1", new Column("a", "b")));
}
} finally {
logger.removeAppender(appender);
http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/b2c91b95/modules/mapreduce/src/test/java/org/apache/fluo/mapreduce/it/FluoFileOutputFormatIT.java
----------------------------------------------------------------------
diff --git a/modules/mapreduce/src/test/java/org/apache/fluo/mapreduce/it/FluoFileOutputFormatIT.java b/modules/mapreduce/src/test/java/org/apache/fluo/mapreduce/it/FluoFileOutputFormatIT.java
index f67f453..d96dc1a 100644
--- a/modules/mapreduce/src/test/java/org/apache/fluo/mapreduce/it/FluoFileOutputFormatIT.java
+++ b/modules/mapreduce/src/test/java/org/apache/fluo/mapreduce/it/FluoFileOutputFormatIT.java
@@ -24,8 +24,6 @@ import org.apache.accumulo.core.client.mapreduce.AccumuloFileOutputFormat;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
import org.apache.fluo.api.data.Column;
-import org.apache.fluo.api.types.StringEncoder;
-import org.apache.fluo.api.types.TypeLayer;
import org.apache.fluo.integration.ITBaseImpl;
import org.apache.fluo.integration.TestTransaction;
import org.apache.fluo.mapreduce.FluoKeyValue;
@@ -45,8 +43,6 @@ import org.junit.rules.TemporaryFolder;
public class FluoFileOutputFormatIT extends ITBaseImpl {
- static final TypeLayer typeLayer = new TypeLayer(new StringEncoder());
-
public static class TestMapper extends Mapper<LongWritable, Text, Key, Value> {
private FluoKeyValueGenerator fkvg = new FluoKeyValueGenerator();
@@ -106,25 +102,25 @@ public class FluoFileOutputFormatIT extends ITBaseImpl {
TestTransaction tx1 = new TestTransaction(env);
TestTransaction tx2 = new TestTransaction(env);
- Assert.assertEquals(1, tx1.get().row("a").fam("b").qual("c").toInteger(0));
- Assert.assertEquals(2, tx1.get().row("d").fam("b").qual("c").toInteger(0));
- Assert.assertEquals(90, tx1.get().row("foo").fam("moo").qual("moo").toInteger(0));
+ Assert.assertEquals("1", tx1.gets("a", new Column("b", "c")));
+ Assert.assertEquals("2", tx1.gets("d", new Column("b", "c")));
+ Assert.assertEquals("90", tx1.gets("foo", new Column("moo", "moo")));
- tx1.mutate().row("a").fam("b").qual("c").set("3");
- tx1.mutate().row("d").fam("b").qual("c").delete();
+ tx1.set("a", new Column("b", "c"), "3");
+ tx1.delete("d", new Column("b", "c"));
tx1.done();
// should not see changes from tx1
- Assert.assertEquals(1, tx2.get().row("a").fam("b").qual("c").toInteger(0));
- Assert.assertEquals(2, tx2.get().row("d").fam("b").qual("c").toInteger(0));
- Assert.assertEquals(90, tx2.get().row("foo").fam("moo").qual("moo").toInteger(0));
+ Assert.assertEquals("1", tx2.gets("a", new Column("b", "c")));
+ Assert.assertEquals("2", tx2.gets("d", new Column("b", "c")));
+ Assert.assertEquals("90", tx2.gets("foo", new Column("moo", "moo")));
TestTransaction tx3 = new TestTransaction(env);
// should see changes from tx1
- Assert.assertEquals(3, tx3.get().row("a").fam("b").qual("c").toInteger(0));
- Assert.assertNull(tx3.get().row("d").fam("b").qual("c").toInteger());
- Assert.assertEquals(90, tx3.get().row("foo").fam("moo").qual("moo").toInteger(0));
+ Assert.assertEquals("3", tx3.gets("a", new Column("b", "c")));
+ Assert.assertNull(tx3.gets("d", new Column("b", "c")));
+ Assert.assertEquals("90", tx3.gets("foo", new Column("moo", "moo")));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/b2c91b95/modules/mapreduce/src/test/java/org/apache/fluo/mapreduce/it/MutationBuilderIT.java
----------------------------------------------------------------------
diff --git a/modules/mapreduce/src/test/java/org/apache/fluo/mapreduce/it/MutationBuilderIT.java b/modules/mapreduce/src/test/java/org/apache/fluo/mapreduce/it/MutationBuilderIT.java
index c515d7b..7f06bf7 100644
--- a/modules/mapreduce/src/test/java/org/apache/fluo/mapreduce/it/MutationBuilderIT.java
+++ b/modules/mapreduce/src/test/java/org/apache/fluo/mapreduce/it/MutationBuilderIT.java
@@ -18,8 +18,7 @@ package org.apache.fluo.mapreduce.it;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.BatchWriterConfig;
import org.apache.fluo.api.data.Bytes;
-import org.apache.fluo.api.types.StringEncoder;
-import org.apache.fluo.api.types.TypeLayer;
+import org.apache.fluo.api.data.Column;
import org.apache.fluo.integration.ITBaseImpl;
import org.apache.fluo.integration.TestTransaction;
import org.apache.fluo.mapreduce.FluoMutationGenerator;
@@ -28,8 +27,6 @@ import org.junit.Test;
public class MutationBuilderIT extends ITBaseImpl {
- static final TypeLayer tl = new TypeLayer(new StringEncoder());
-
@Test
public void testBatchWrite() throws Exception {
// test initializing a Fluo table by batch writing to it
@@ -39,15 +36,15 @@ public class MutationBuilderIT extends ITBaseImpl {
try {
FluoMutationGenerator mb1 = new FluoMutationGenerator(Bytes.of("row1"));
- mb1.put(tl.bc().fam("cf1").qual("cq1").vis(), Bytes.of("v1"));
- mb1.put(tl.bc().fam("cf1").qual("cq2").vis(), Bytes.of("v2"));
- mb1.put(tl.bc().fam("cf1").qual("cq3").vis(), Bytes.of("v3"));
+ mb1.put(new Column("cf1", "cq1"), Bytes.of("v1"));
+ mb1.put(new Column("cf1", "cq2"), Bytes.of("v2"));
+ mb1.put(new Column("cf1", "cq3"), Bytes.of("v3"));
bw.addMutation(mb1.build());
FluoMutationGenerator mb2 = new FluoMutationGenerator(Bytes.of("row2"));
- mb2.put(tl.bc().fam("cf1").qual("cq1").vis(), Bytes.of("v4"));
- mb2.put(tl.bc().fam("cf1").qual("cq2").vis(), Bytes.of("v5"));
+ mb2.put(new Column("cf1", "cq1"), Bytes.of("v4"));
+ mb2.put(new Column("cf1", "cq2"), Bytes.of("v5"));
bw.addMutation(mb2.build());
@@ -58,32 +55,32 @@ public class MutationBuilderIT extends ITBaseImpl {
TestTransaction tx1 = new TestTransaction(env);
TestTransaction tx2 = new TestTransaction(env);
- Assert.assertEquals("v1", tx1.get().row("row1").fam("cf1").qual("cq1").toString());
- Assert.assertEquals("v2", tx1.get().row("row1").fam("cf1").qual("cq2").toString());
- Assert.assertEquals("v3", tx1.get().row("row1").fam("cf1").qual("cq3").toString());
- Assert.assertEquals("v4", tx1.get().row("row2").fam("cf1").qual("cq1").toString());
- Assert.assertEquals("v5", tx1.get().row("row2").fam("cf1").qual("cq2").toString());
+ Assert.assertEquals("v1", tx1.gets("row1", new Column("cf1", "cq1")));
+ Assert.assertEquals("v2", tx1.gets("row1", new Column("cf1", "cq2")));
+ Assert.assertEquals("v3", tx1.gets("row1", new Column("cf1", "cq3")));
+ Assert.assertEquals("v4", tx1.gets("row2", new Column("cf1", "cq1")));
+ Assert.assertEquals("v5", tx1.gets("row2", new Column("cf1", "cq2")));
- tx1.mutate().row("row1").fam("cf1").qual("cq2").set("v6");
- tx1.mutate().row("row1").fam("cf1").qual("cq3").delete();
- tx1.mutate().row("row2").fam("cf1").qual("cq2").set("v7");
+ tx1.set("row1", new Column("cf1", "cq2"), "v6");
+ tx1.delete("row1", new Column("cf1", "cq3"));
+ tx1.set("row2", new Column("cf1", "cq2"), "v7");
tx1.done();
// tx2 should see not changes from tx1
- Assert.assertEquals("v1", tx2.get().row("row1").fam("cf1").qual("cq1").toString());
- Assert.assertEquals("v2", tx2.get().row("row1").fam("cf1").qual("cq2").toString());
- Assert.assertEquals("v3", tx2.get().row("row1").fam("cf1").qual("cq3").toString());
- Assert.assertEquals("v4", tx2.get().row("row2").fam("cf1").qual("cq1").toString());
- Assert.assertEquals("v5", tx2.get().row("row2").fam("cf1").qual("cq2").toString());
+ Assert.assertEquals("v1", tx2.gets("row1", new Column("cf1", "cq1")));
+ Assert.assertEquals("v2", tx2.gets("row1", new Column("cf1", "cq2")));
+ Assert.assertEquals("v3", tx2.gets("row1", new Column("cf1", "cq3")));
+ Assert.assertEquals("v4", tx2.gets("row2", new Column("cf1", "cq1")));
+ Assert.assertEquals("v5", tx2.gets("row2", new Column("cf1", "cq2")));
TestTransaction tx3 = new TestTransaction(env);
// should see changes from tx1
- Assert.assertEquals("v1", tx3.get().row("row1").fam("cf1").qual("cq1").toString());
- Assert.assertEquals("v6", tx3.get().row("row1").fam("cf1").qual("cq2").toString());
- Assert.assertNull(tx3.get().row("row1").fam("cf1").qual("cq3").toString());
- Assert.assertEquals("v4", tx3.get().row("row2").fam("cf1").qual("cq1").toString());
- Assert.assertEquals("v7", tx3.get().row("row2").fam("cf1").qual("cq2").toString());
+ Assert.assertEquals("v1", tx3.gets("row1", new Column("cf1", "cq1")));
+ Assert.assertEquals("v6", tx3.gets("row1", new Column("cf1", "cq2")));
+ Assert.assertNull(tx3.gets("row1", new Column("cf1", "cq3")));
+ Assert.assertEquals("v4", tx3.gets("row2", new Column("cf1", "cq1")));
+ Assert.assertEquals("v7", tx3.gets("row2", new Column("cf1", "cq2")));
}
}
[3/3] incubator-fluo git commit: #696 - Updated integration tests to
use core API instead of type layer
Posted by mw...@apache.org.
#696 - Updated integration tests to use core API instead of type layer
Project: http://git-wip-us.apache.org/repos/asf/incubator-fluo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-fluo/commit/b2c91b95
Tree: http://git-wip-us.apache.org/repos/asf/incubator-fluo/tree/b2c91b95
Diff: http://git-wip-us.apache.org/repos/asf/incubator-fluo/diff/b2c91b95
Branch: refs/heads/master
Commit: b2c91b95be1b665867b5cab74e80541cb700e57a
Parents: e84b4f1
Author: Mike Walch <mw...@gmail.com>
Authored: Wed Jul 6 15:18:52 2016 -0400
Committer: Mike Walch <mw...@gmail.com>
Committed: Thu Jul 7 15:57:06 2016 -0400
----------------------------------------------------------------------
.../org/apache/fluo/integration/BankUtil.java | 17 +-
.../fluo/integration/TestTransaction.java | 103 +++++-
.../org/apache/fluo/integration/TestUtil.java | 42 +++
.../fluo/integration/impl/AppConfigIT.java | 45 ++-
.../integration/impl/ClientExceptionIT.java | 52 +--
.../fluo/integration/impl/CollisionIT.java | 63 ++--
.../fluo/integration/impl/ColumnVisIT.java | 13 +-
.../apache/fluo/integration/impl/FailureIT.java | 239 +++++++-------
.../fluo/integration/impl/FaultyConfig.java | 6 +-
.../apache/fluo/integration/impl/FluoIT.java | 313 +++++++++----------
.../impl/GarbageCollectionIteratorIT.java | 17 +-
.../fluo/integration/impl/NotificationGcIT.java | 25 +-
.../fluo/integration/impl/ObserverConfigIT.java | 26 +-
.../apache/fluo/integration/impl/OracleIT.java | 3 -
.../integration/impl/ParallelScannerIT.java | 87 +++---
.../integration/impl/SelfNotificationIT.java | 55 ++--
.../fluo/integration/impl/StochasticBankIT.java | 15 +-
.../integration/impl/WeakNotificationIT.java | 54 ++--
.../impl/WeakNotificationOverlapIT.java | 110 +++----
.../apache/fluo/integration/impl/WorkerIT.java | 83 +++--
.../org/apache/fluo/integration/log/LogIT.java | 50 ++-
.../mapreduce/it/FluoFileOutputFormatIT.java | 26 +-
.../fluo/mapreduce/it/MutationBuilderIT.java | 51 ++-
23 files changed, 762 insertions(+), 733 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/b2c91b95/modules/integration/src/test/java/org/apache/fluo/integration/BankUtil.java
----------------------------------------------------------------------
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/BankUtil.java b/modules/integration/src/test/java/org/apache/fluo/integration/BankUtil.java
index 7937c5e..37e7df2 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/BankUtil.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/BankUtil.java
@@ -16,8 +16,6 @@
package org.apache.fluo.integration;
import org.apache.fluo.api.data.Column;
-import org.apache.fluo.api.types.StringEncoder;
-import org.apache.fluo.api.types.TypeLayer;
import org.apache.fluo.core.impl.Environment;
/**
@@ -25,28 +23,27 @@ import org.apache.fluo.core.impl.Environment;
*/
public class BankUtil {
- public static final TypeLayer typeLayer = new TypeLayer(new StringEncoder());
- public static final Column BALANCE = typeLayer.bc().fam("account").qual("balance").vis();
+ public static final Column BALANCE = new Column("account", "balance");
private BankUtil() {}
public static void transfer(Environment env, String from, String to, int amount) throws Exception {
TestTransaction tx = new TestTransaction(env);
- int bal1 = tx.get().row(from).col(BALANCE).toInteger();
- int bal2 = tx.get().row(to).col(BALANCE).toInteger();
+ int bal1 = Integer.parseInt(tx.gets(from, BALANCE));
+ int bal2 = Integer.parseInt(tx.gets(to, BALANCE));
- tx.mutate().row(from).col(BALANCE).set(bal1 - amount);
- tx.mutate().row(to).col(BALANCE).set(bal2 + amount);
+ tx.set(from, BALANCE, (bal1 - amount) + "");
+ tx.set(to, BALANCE, (bal2 + amount) + "");
tx.done();
}
public static void setBalance(TestTransaction tx, String user, int amount) {
- tx.mutate().row(user).col(BALANCE).set(amount);
+ tx.set(user, BALANCE, amount + "");
}
public static int getBalance(TestTransaction tx, String user) {
- return tx.get().row(user).col(BALANCE).toInteger();
+ return Integer.parseInt(tx.gets(user, BALANCE));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/b2c91b95/modules/integration/src/test/java/org/apache/fluo/integration/TestTransaction.java
----------------------------------------------------------------------
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/TestTransaction.java b/modules/integration/src/test/java/org/apache/fluo/integration/TestTransaction.java
index b80b980..b64cda5 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/TestTransaction.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/TestTransaction.java
@@ -15,7 +15,10 @@
package org.apache.fluo.integration;
+import java.util.Collection;
+import java.util.Map;
import java.util.Map.Entry;
+import java.util.Set;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
@@ -28,14 +31,14 @@ import org.apache.fluo.accumulo.iterators.NotificationIterator;
import org.apache.fluo.accumulo.util.ColumnConstants;
import org.apache.fluo.accumulo.util.NotificationUtil;
import org.apache.fluo.api.client.TransactionBase;
+import org.apache.fluo.api.config.ScannerConfiguration;
import org.apache.fluo.api.data.Bytes;
import org.apache.fluo.api.data.Column;
import org.apache.fluo.api.data.RowColumn;
import org.apache.fluo.api.data.Span;
+import org.apache.fluo.api.exceptions.AlreadySetException;
import org.apache.fluo.api.exceptions.CommitException;
-import org.apache.fluo.api.types.StringEncoder;
-import org.apache.fluo.api.types.TypeLayer;
-import org.apache.fluo.api.types.TypedTransactionBase;
+import org.apache.fluo.api.iterator.RowIterator;
import org.apache.fluo.core.exceptions.AlreadyAcknowledgedException;
import org.apache.fluo.core.impl.Environment;
import org.apache.fluo.core.impl.Notification;
@@ -48,7 +51,7 @@ import org.apache.fluo.core.util.ByteUtil;
import org.apache.fluo.core.util.SpanUtil;
import org.apache.hadoop.io.Text;
-public class TestTransaction extends TypedTransactionBase implements TransactionBase {
+public class TestTransaction implements TransactionBase {
private TransactionImpl tx;
private Environment env;
@@ -78,18 +81,15 @@ public class TestTransaction extends TypedTransactionBase implements Transaction
throw new RuntimeException("No notification found");
}
- @SuppressWarnings("resource")
public TestTransaction(Environment env, TransactorNode transactor) {
- this(new TransactionImpl(env).setTransactor(transactor), new StringEncoder(), env);
+ this(new TransactionImpl(env).setTransactor(transactor), env);
}
public TestTransaction(Environment env) {
- this(new TransactionImpl(env), new StringEncoder(), env);
+ this(new TransactionImpl(env), env);
}
- private TestTransaction(TransactionImpl transactionImpl, StringEncoder stringEncoder,
- Environment env) {
- super(transactionImpl, stringEncoder, new TypeLayer(stringEncoder));
+ private TestTransaction(TransactionImpl transactionImpl, Environment env) {
this.tx = transactionImpl;
this.env = env;
}
@@ -99,8 +99,7 @@ public class TestTransaction extends TypedTransactionBase implements Transaction
}
public TestTransaction(Environment env, String trow, Column tcol, long notificationTS) {
- this(new TransactionImpl(env, new Notification(Bytes.of(trow), tcol, notificationTS)),
- new StringEncoder(), env);
+ this(new TransactionImpl(env, new Notification(Bytes.of(trow), tcol, notificationTS)), env);
}
/**
@@ -154,4 +153,84 @@ public class TestTransaction extends TypedTransactionBase implements Transaction
public TxStats getStats() {
return tx.getStats();
}
+
+ @Override
+ public void delete(Bytes row, Column col) {
+ tx.delete(row, col);
+ }
+
+ @Override
+ public void delete(String row, Column col) {
+ tx.delete(row, col);
+ }
+
+ @Override
+ public void set(Bytes row, Column col, Bytes value) throws AlreadySetException {
+ tx.set(row, col, value);
+ }
+
+ @Override
+ public void set(String row, Column col, String value) throws AlreadySetException {
+ tx.set(row, col, value);
+ }
+
+ @Override
+ public void setWeakNotification(Bytes row, Column col) {
+ tx.setWeakNotification(row, col);
+ }
+
+ @Override
+ public void setWeakNotification(String row, Column col) {
+ tx.setWeakNotification(row, col);
+ }
+
+ @Override
+ public Bytes get(Bytes row, Column column) {
+ return tx.get(row, column);
+ }
+
+ @Override
+ public Map<Column, Bytes> get(Bytes row, Set<Column> columns) {
+ return tx.get(row, columns);
+ }
+
+ @Override
+ public Map<Bytes, Map<Column, Bytes>> get(Collection<Bytes> rows, Set<Column> columns) {
+ return tx.get(rows, columns);
+ }
+
+ @Override
+ public Map<Bytes, Map<Column, Bytes>> get(Collection<RowColumn> rowColumns) {
+ return tx.get(rowColumns);
+ }
+
+ @Override
+ public RowIterator get(ScannerConfiguration config) {
+ return tx.get(config);
+ }
+
+ @Override
+ public Map<String, Map<Column, String>> gets(Collection<RowColumn> rowColumns) {
+ return tx.gets(rowColumns);
+ }
+
+ @Override
+ public Map<String, Map<Column, String>> gets(Collection<String> rows, Set<Column> columns) {
+ return tx.gets(rows, columns);
+ }
+
+ @Override
+ public String gets(String row, Column column) {
+ return tx.gets(row, column);
+ }
+
+ @Override
+ public Map<Column, String> gets(String row, Set<Column> columns) {
+ return tx.gets(row, columns);
+ }
+
+ @Override
+ public long getStartTimestamp() {
+ return tx.getStartTimestamp();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/b2c91b95/modules/integration/src/test/java/org/apache/fluo/integration/TestUtil.java
----------------------------------------------------------------------
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/TestUtil.java b/modules/integration/src/test/java/org/apache/fluo/integration/TestUtil.java
new file mode 100644
index 0000000..fe9ee27
--- /dev/null
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/TestUtil.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.integration;
+
+import org.apache.fluo.api.client.SnapshotBase;
+import org.apache.fluo.api.client.TransactionBase;
+import org.apache.fluo.api.data.Column;
+
+public class TestUtil {
+
+ private TestUtil() {}
+
+ public static void increment(TransactionBase tx, String row, Column col, int val) {
+ int prev = 0;
+ String prevStr = tx.gets(row, col);
+ if (prevStr != null) {
+ prev = Integer.parseInt(prevStr);
+ }
+ tx.set(row, col, prev + val + "");
+ }
+
+ public static int getOrDefault(SnapshotBase snap, String row, Column col, int defaultVal) {
+ String val = snap.gets(row, col);
+ if (val == null) {
+ return defaultVal;
+ }
+ return Integer.parseInt(val);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/b2c91b95/modules/integration/src/test/java/org/apache/fluo/integration/impl/AppConfigIT.java
----------------------------------------------------------------------
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/impl/AppConfigIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/impl/AppConfigIT.java
index fedacbf..74f40e4 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/impl/AppConfigIT.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/impl/AppConfigIT.java
@@ -21,17 +21,15 @@ import java.util.List;
import org.apache.fluo.api.client.FluoAdmin;
import org.apache.fluo.api.client.FluoClient;
import org.apache.fluo.api.client.FluoFactory;
+import org.apache.fluo.api.client.Loader;
import org.apache.fluo.api.client.LoaderExecutor;
+import org.apache.fluo.api.client.Snapshot;
+import org.apache.fluo.api.client.TransactionBase;
import org.apache.fluo.api.config.ObserverConfiguration;
import org.apache.fluo.api.config.SimpleConfiguration;
import org.apache.fluo.api.data.Bytes;
import org.apache.fluo.api.data.Column;
-import org.apache.fluo.api.types.StringEncoder;
-import org.apache.fluo.api.types.TypeLayer;
-import org.apache.fluo.api.types.TypedLoader;
-import org.apache.fluo.api.types.TypedObserver;
-import org.apache.fluo.api.types.TypedSnapshot;
-import org.apache.fluo.api.types.TypedTransactionBase;
+import org.apache.fluo.api.observer.AbstractObserver;
import org.apache.fluo.integration.ITBaseMini;
import org.junit.Assert;
import org.junit.Test;
@@ -77,7 +75,7 @@ public class AppConfigIT extends ITBaseMini {
}
- public static class TestLoader extends TypedLoader {
+ private static class TestLoader implements Loader {
private String row;
private int data;
@@ -88,16 +86,15 @@ public class AppConfigIT extends ITBaseMini {
}
@Override
- public void load(TypedTransactionBase tx, Context context) throws Exception {
+ public void load(TransactionBase tx, Context context) throws Exception {
int limit = context.getAppConfiguration().getInt("myapp.sizeLimit");
if (data < limit) {
- tx.mutate().row(row).fam("data").qual("foo").set(data);
+ tx.set(row, new Column("data", "foo"), Integer.toString(data));
}
}
-
}
- public static class TestObserver extends TypedObserver {
+ public static class TestObserver extends AbstractObserver {
private int limit;
@@ -112,13 +109,12 @@ public class AppConfigIT extends ITBaseMini {
}
@Override
- public void process(TypedTransactionBase tx, Bytes row, Column col) {
- int d = tx.get().row(row).col(col).toInteger();
+ public void process(TransactionBase tx, Bytes row, Column col) throws Exception {
+ int d = Integer.parseInt(tx.gets(row.toString(), col));
if (2 * d < limit) {
- tx.mutate().row(row).fam("data").qual("bar").set(2 * d);
+ tx.set(row.toString(), new Column("data", "bar"), Integer.toString(2 * d));
}
}
-
}
@Test
@@ -130,21 +126,18 @@ public class AppConfigIT extends ITBaseMini {
le.execute(new TestLoader("r3", 60000));
}
- TypeLayer tl = new TypeLayer(new StringEncoder());
-
- try (TypedSnapshot snapshot = tl.wrap(client.newSnapshot())) {
- Assert.assertEquals(3, snapshot.get().row("r1").fam("data").qual("foo").toInteger(0));
- Assert.assertEquals(30000, snapshot.get().row("r2").fam("data").qual("foo").toInteger(0));
- Assert.assertEquals(0, snapshot.get().row("r3").fam("data").qual("foo").toInteger(0));
+ try (Snapshot snapshot = client.newSnapshot()) {
+ Assert.assertEquals("3", snapshot.gets("r1", new Column("data", "foo")));
+ Assert.assertEquals("30000", snapshot.gets("r2", new Column("data", "foo")));
+ Assert.assertNull(snapshot.gets("r3", new Column("data", "foo")));
}
miniFluo.waitForObservers();
- try (TypedSnapshot snapshot = tl.wrap(client.newSnapshot())) {
- Assert.assertEquals(6, snapshot.get().row("r1").fam("data").qual("bar").toInteger(0));
- Assert.assertEquals(0, snapshot.get().row("r2").fam("data").qual("bar").toInteger(0));
- Assert.assertEquals(0, snapshot.get().row("r3").fam("data").qual("bar").toInteger(0));
+ try (Snapshot snapshot = client.newSnapshot()) {
+ Assert.assertEquals("6", snapshot.gets("r1", new Column("data", "bar")));
+ Assert.assertNull(snapshot.gets("r2", new Column("data", "bar")));
+ Assert.assertNull(snapshot.gets("r3", new Column("data", "bar")));
}
-
}
}
http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/b2c91b95/modules/integration/src/test/java/org/apache/fluo/integration/impl/ClientExceptionIT.java
----------------------------------------------------------------------
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/impl/ClientExceptionIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/impl/ClientExceptionIT.java
index 30419ab..9807101 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/impl/ClientExceptionIT.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/impl/ClientExceptionIT.java
@@ -19,9 +19,6 @@ import org.apache.fluo.api.client.Transaction;
import org.apache.fluo.api.data.Bytes;
import org.apache.fluo.api.data.Column;
import org.apache.fluo.api.exceptions.AlreadySetException;
-import org.apache.fluo.api.types.StringEncoder;
-import org.apache.fluo.api.types.TypeLayer;
-import org.apache.fluo.api.types.TypedTransaction;
import org.apache.fluo.integration.ITBaseMini;
import org.junit.Assert;
import org.junit.Test;
@@ -31,8 +28,6 @@ import org.junit.Test;
*/
public class ClientExceptionIT extends ITBaseMini {
- static TypeLayer tl = new TypeLayer(new StringEncoder());
-
@Test
public void testAlreadySetException() {
@@ -42,54 +37,23 @@ public class ClientExceptionIT extends ITBaseMini {
tx.set(Bytes.of("row"), new Column("c1"), Bytes.of("val2"));
Assert.fail("exception not thrown");
} catch (AlreadySetException e) {
+ // do nothing
}
try (Transaction tx = client.newTransaction()) {
- tx.delete(Bytes.of("row"), new Column("c1"));
- tx.delete(Bytes.of("row"), new Column("c1"));
- Assert.fail("exception not thrown");
- } catch (AlreadySetException e) {
- }
-
- // test typed transactions
- // setting integer
- try (TypedTransaction tx = tl.wrap(client.newTransaction())) {
- tx.mutate().row("r1").col(new Column("c1")).set("a");
- tx.mutate().row("r1").col(new Column("c1")).set(6);
- Assert.fail("exception not thrown");
- } catch (AlreadySetException e) {
- }
-
- // test set setting empty twice
- try (TypedTransaction tx = tl.wrap(client.newTransaction())) {
- tx.mutate().row("r1").col(new Column("c1")).set();
- tx.mutate().row("r1").col(new Column("c1")).set();
+ tx.set("row", new Column("c2"), "a");
+ tx.set("row", new Column("c2"), "b");
Assert.fail("exception not thrown");
} catch (AlreadySetException e) {
+ // do nothing
}
- // test boolean and same value
- try (TypedTransaction tx = tl.wrap(client.newTransaction())) {
- tx.mutate().row("r1").col(new Column("c1")).set(true);
- tx.mutate().row("r1").col(new Column("c1")).set(true);
- Assert.fail("exception not thrown");
- } catch (AlreadySetException e) {
- }
-
- // test string
- try (TypedTransaction tx = tl.wrap(client.newTransaction())) {
- tx.mutate().row("r1").col(new Column("c1")).set("a");
- tx.mutate().row("r1").col(new Column("c1")).set("b");
- Assert.fail("exception not thrown");
- } catch (AlreadySetException e) {
- }
-
- // test two deletes
- try (TypedTransaction tx = tl.wrap(client.newTransaction())) {
- tx.mutate().row("r1").col(new Column("c1")).delete();
- tx.mutate().row("r1").col(new Column("c1")).delete();
+ try (Transaction tx = client.newTransaction()) {
+ tx.delete(Bytes.of("row"), new Column("c1"));
+ tx.delete(Bytes.of("row"), new Column("c1"));
Assert.fail("exception not thrown");
} catch (AlreadySetException e) {
+ // do nothing
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/b2c91b95/modules/integration/src/test/java/org/apache/fluo/integration/impl/CollisionIT.java
----------------------------------------------------------------------
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/impl/CollisionIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/impl/CollisionIT.java
index 543150c..4af93d1 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/impl/CollisionIT.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/impl/CollisionIT.java
@@ -15,7 +15,7 @@
package org.apache.fluo.integration.impl;
-import java.util.Arrays;
+import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map.Entry;
@@ -27,20 +27,20 @@ import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.fluo.accumulo.util.ColumnConstants;
import org.apache.fluo.accumulo.util.ZookeeperUtil;
+import org.apache.fluo.api.client.Loader;
import org.apache.fluo.api.client.LoaderExecutor;
+import org.apache.fluo.api.client.Snapshot;
+import org.apache.fluo.api.client.TransactionBase;
import org.apache.fluo.api.config.FluoConfiguration;
import org.apache.fluo.api.config.ObserverConfiguration;
import org.apache.fluo.api.data.Bytes;
import org.apache.fluo.api.data.Column;
-import org.apache.fluo.api.types.StringEncoder;
-import org.apache.fluo.api.types.TypeLayer;
-import org.apache.fluo.api.types.TypedLoader;
-import org.apache.fluo.api.types.TypedObserver;
-import org.apache.fluo.api.types.TypedSnapshot;
-import org.apache.fluo.api.types.TypedTransactionBase;
+import org.apache.fluo.api.observer.AbstractObserver;
+import org.apache.fluo.api.observer.Observer;
import org.apache.fluo.core.impl.FluoConfigurationImpl;
import org.apache.fluo.core.util.UtilWaitThread;
import org.apache.fluo.integration.ITBaseMini;
+import org.apache.fluo.integration.TestUtil;
import org.junit.Assert;
import org.junit.Test;
@@ -56,9 +56,12 @@ import org.junit.Test;
*
*/
public class CollisionIT extends ITBaseMini {
- static TypeLayer typeLayer = new TypeLayer(new StringEncoder());
- static class NumLoader extends TypedLoader {
+ private static final Column STAT_TOTAL = new Column("stat", "total");
+ private static final Column STAT_CHANGED = new Column("stat", "changed");
+ private static final Column STAT_PROCESSED = new Column("stat", "processed");
+
+ private static class NumLoader implements Loader {
int num;
@@ -67,33 +70,33 @@ public class CollisionIT extends ITBaseMini {
}
@Override
- public void load(TypedTransactionBase tx, Context context) throws Exception {
- tx.mutate().row(num).fam("stat").qual("total").increment(1);
- tx.mutate().row(num).fam("stat").qual("changed").weaklyNotify();
+ public void load(TransactionBase tx, Context context) throws Exception {
+ TestUtil.increment(tx, num + "", STAT_TOTAL, 1);
+ tx.setWeakNotification(num + "", STAT_CHANGED);
}
}
- public static class TotalObserver extends TypedObserver {
+ public static class TotalObserver extends AbstractObserver {
@Override
- public ObservedColumn getObservedColumn() {
- return new ObservedColumn(typeLayer.bc().fam("stat").qual("changed").vis(),
- NotificationType.WEAK);
+ public Observer.ObservedColumn getObservedColumn() {
+ return new Observer.ObservedColumn(STAT_CHANGED, NotificationType.WEAK);
}
@Override
- public void process(TypedTransactionBase tx, Bytes row, Column col) {
- int total = tx.get().row(row).fam("stat").qual("total").toInteger();
- int processed = tx.get().row(row).fam("stat").qual("processed").toInteger(0);
+ public void process(TransactionBase tx, Bytes rowBytes, Column col) throws Exception {
+ String row = rowBytes.toString();
+ int total = Integer.parseInt(tx.gets(row, STAT_TOTAL));
+ int processed = TestUtil.getOrDefault(tx, row, STAT_PROCESSED, 0);
- tx.mutate().row(row).fam("stat").qual("processed").set(total);
- tx.mutate().row("all").fam("stat").qual("total").increment(total - processed);
+ tx.set(row, STAT_PROCESSED, total + "");
+ TestUtil.increment(tx, "all", STAT_TOTAL, total - processed);
}
}
@Override
protected List<ObserverConfiguration> getObservers() {
- return Arrays.asList(new ObserverConfiguration(TotalObserver.class.getName()));
+ return Collections.singletonList(new ObserverConfiguration(TotalObserver.class.getName()));
}
@Override
@@ -127,16 +130,20 @@ public class CollisionIT extends ITBaseMini {
miniFluo.waitForObservers();
- try (TypedSnapshot snapshot = typeLayer.wrap(client.newSnapshot())) {
+ try (Snapshot snapshot = client.newSnapshot()) {
for (int i = 0; i < expectedCounts.length; i++) {
- Assert.assertEquals(expectedCounts[i], snapshot.get().row(i).fam("stat").qual("total")
- .toInteger(-1));
- Assert.assertEquals(expectedCounts[i], snapshot.get().row(i).fam("stat").qual("processed")
- .toInteger(-1));
+ String total = snapshot.gets(i + "", STAT_TOTAL);
+ Assert.assertNotNull(total);
+ Assert.assertEquals(expectedCounts[i], Integer.parseInt(total));
+ String processed = snapshot.gets(i + "", STAT_PROCESSED);
+ Assert.assertNotNull(processed);
+ Assert.assertEquals(expectedCounts[i], Integer.parseInt(processed));
}
- Assert.assertEquals(1000, snapshot.get().row("all").fam("stat").qual("total").toInteger(-1));
+ String allTotal = snapshot.gets("all", STAT_TOTAL);
+ Assert.assertNotNull(allTotal);
+ Assert.assertEquals(1000, Integer.parseInt(allTotal));
}
long oldestTS = ZookeeperUtil.getGcTimestamp(config.getAppZookeepers());
http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/b2c91b95/modules/integration/src/test/java/org/apache/fluo/integration/impl/ColumnVisIT.java
----------------------------------------------------------------------
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/impl/ColumnVisIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/impl/ColumnVisIT.java
index 27646c9..2f94b26 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/impl/ColumnVisIT.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/impl/ColumnVisIT.java
@@ -17,6 +17,7 @@ package org.apache.fluo.integration.impl;
import java.util.Arrays;
+import com.google.common.collect.Sets;
import org.apache.fluo.api.data.Column;
import org.apache.fluo.integration.ITBaseImpl;
import org.apache.fluo.integration.TestTransaction;
@@ -29,7 +30,7 @@ public class ColumnVisIT extends ITBaseImpl {
TestTransaction tx1 = new TestTransaction(env);
// expect set w/ bad col vis to fail fast
- tx1.mutate().row("r").fam("f").qual("q").vis("A&").set("v");
+ tx1.set("r", new Column("f", "q", "A&"), "v");
}
@Test(expected = Exception.class)
@@ -37,7 +38,7 @@ public class ColumnVisIT extends ITBaseImpl {
TestTransaction tx1 = new TestTransaction(env);
// expect delete w/ bad col vis to fail fast
- tx1.mutate().row("r").fam("f").qual("q").vis("A&").delete();
+ tx1.delete("r", new Column("f", "q", "A&"));
}
@Test(expected = Exception.class)
@@ -45,7 +46,7 @@ public class ColumnVisIT extends ITBaseImpl {
TestTransaction tx1 = new TestTransaction(env);
// expect weaknotify w/ bad col vis to fail fast
- tx1.mutate().row("r").fam("f").qual("q").vis("A&").weaklyNotify();
+ tx1.setWeakNotification("r", new Column("f", "q", "A&"));
}
@Test(expected = Exception.class)
@@ -53,7 +54,7 @@ public class ColumnVisIT extends ITBaseImpl {
TestTransaction tx1 = new TestTransaction(env);
// expect get w/ bad col vis to fail fast
- tx1.get().row("r").fam("f").qual("q").vis("A&").toString();
+ tx1.gets("r", new Column("f", "q", "A&"));
}
@Test(expected = Exception.class)
@@ -64,7 +65,7 @@ public class ColumnVisIT extends ITBaseImpl {
Column col2 = new Column("f", "q", "C|");
// expect get cols w/ bad col vis to fail fast
- tx1.get().row("r").columns(col1, col2).size();
+ tx1.gets("r", Sets.newHashSet(col1, col2)).size();
}
@Test(expected = Exception.class)
@@ -75,6 +76,6 @@ public class ColumnVisIT extends ITBaseImpl {
Column col2 = new Column("f", "q", "C|");
// expect get rows cols w/ bad col vis to fail fast
- tx1.get().rowsString(Arrays.asList("r1", "r2")).columns(col1, col2).toStringMap().size();
+ tx1.gets(Arrays.asList("r1", "r2"), Sets.newHashSet(col1, col2)).size();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/b2c91b95/modules/integration/src/test/java/org/apache/fluo/integration/impl/FailureIT.java
----------------------------------------------------------------------
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/impl/FailureIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/impl/FailureIT.java
index d0b9986..38a857e 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/impl/FailureIT.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/impl/FailureIT.java
@@ -38,8 +38,6 @@ import org.apache.fluo.api.data.Column;
import org.apache.fluo.api.exceptions.CommitException;
import org.apache.fluo.api.exceptions.FluoException;
import org.apache.fluo.api.observer.AbstractObserver;
-import org.apache.fluo.api.types.StringEncoder;
-import org.apache.fluo.api.types.TypeLayer;
import org.apache.fluo.core.exceptions.AlreadyAcknowledgedException;
import org.apache.fluo.core.exceptions.StaleScanException;
import org.apache.fluo.core.impl.Notification;
@@ -50,6 +48,7 @@ import org.apache.fluo.core.oracle.Stamp;
import org.apache.fluo.integration.BankUtil;
import org.apache.fluo.integration.ITBaseImpl;
import org.apache.fluo.integration.TestTransaction;
+import org.apache.fluo.integration.TestUtil;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
@@ -62,8 +61,6 @@ public class FailureIT extends ITBaseImpl {
@Rule
public ExpectedException exception = ExpectedException.none();
- static TypeLayer typeLayer = new TypeLayer(new StringEncoder());
-
public static class NullObserver extends AbstractObserver {
@Override
@@ -71,8 +68,7 @@ public class FailureIT extends ITBaseImpl {
@Override
public ObservedColumn getObservedColumn() {
- return new ObservedColumn(typeLayer.bc().fam("attr").qual("lastupdate").vis(),
- NotificationType.STRONG);
+ return new ObservedColumn(new Column("attr", "lastupdate"), NotificationType.STRONG);
}
}
@@ -93,18 +89,19 @@ public class FailureIT extends ITBaseImpl {
testRollbackMany(false);
}
- public void testRollbackMany(boolean killTransactor) throws Exception {
+ private void testRollbackMany(boolean killTransactor) throws Exception {
// test writing lots of columns that need to be rolled back
- Column col1 = typeLayer.bc().fam("fam1").qual("q1").vis();
- Column col2 = typeLayer.bc().fam("fam1").qual("q2").vis();
+ Column col1 = new Column("fam1", "q1");
+ Column col2 = new Column("fam1", "q2");
TestTransaction tx = new TestTransaction(env);
for (int r = 0; r < 10; r++) {
- tx.mutate().row(r + "").col(col1).set("0" + r + "0");
- tx.mutate().row(r + "").col(col2).set("0" + r + "1");
+ String row = Integer.toString(r);
+ tx.set(row, col1, "0" + r + "0");
+ tx.set(row, col2, "0" + r + "1");
}
tx.done();
@@ -113,8 +110,8 @@ public class FailureIT extends ITBaseImpl {
TestTransaction tx2 = new TestTransaction(env, t2);
for (int r = 0; r < 10; r++) {
- tx2.mutate().row(r + "").col(col1).set("1" + r + "0");
- tx2.mutate().row(r + "").col(col2).set("1" + r + "1");
+ tx2.set(r + "", col1, "1" + r + "0");
+ tx2.set(r + "", col2, "1" + r + "1");
}
CommitData cd = tx2.createCommitData();
@@ -126,8 +123,8 @@ public class FailureIT extends ITBaseImpl {
TestTransaction tx3 = new TestTransaction(env);
for (int r = 0; r < 10; r++) {
- Assert.assertEquals("0" + r + "0", tx3.get().row(r + "").col(col1).toString());
- Assert.assertEquals("0" + r + "1", tx3.get().row(r + "").col(col2).toString());
+ Assert.assertEquals("0" + r + "0", tx3.gets(r + "", col1));
+ Assert.assertEquals("0" + r + "1", tx3.gets(r + "", col2));
}
if (killTransactor) {
@@ -142,8 +139,8 @@ public class FailureIT extends ITBaseImpl {
TestTransaction tx4 = new TestTransaction(env);
for (int r = 0; r < 10; r++) {
- Assert.assertEquals("0" + r + "0", tx4.get().row(r + "").col(col1).toString());
- Assert.assertEquals("0" + r + "1", tx4.get().row(r + "").col(col2).toString());
+ Assert.assertEquals("0" + r + "0", tx4.gets(r + "", col1));
+ Assert.assertEquals("0" + r + "1", tx4.gets(r + "", col2));
}
}
@@ -157,17 +154,17 @@ public class FailureIT extends ITBaseImpl {
testRollforwardMany(false);
}
- public void testRollforwardMany(boolean killTransactor) throws Exception {
+ private void testRollforwardMany(boolean killTransactor) throws Exception {
// test writing lots of columns that need to be rolled forward
- Column col1 = typeLayer.bc().fam("fam1").qual("q1").vis();
- Column col2 = typeLayer.bc().fam("fam1").qual("q2").vis();
+ Column col1 = new Column("fam1", "q1");
+ Column col2 = new Column("fam1", "q2");
TestTransaction tx = new TestTransaction(env);
for (int r = 0; r < 10; r++) {
- tx.mutate().row(r + "").col(col1).set("0" + r + "0");
- tx.mutate().row(r + "").col(col2).set("0" + r + "1");
+ tx.set(r + "", col1, "0" + r + "0");
+ tx.set(r + "", col2, "0" + r + "1");
}
tx.done();
@@ -176,8 +173,8 @@ public class FailureIT extends ITBaseImpl {
TestTransaction tx2 = new TestTransaction(env, t2);
for (int r = 0; r < 10; r++) {
- tx2.mutate().row(r + "").col(col1).set("1" + r + "0");
- tx2.mutate().row(r + "").col(col2).set("1" + r + "1");
+ tx2.set(r + "", col1, "1" + r + "0");
+ tx2.set(r + "", col2, "1" + r + "1");
}
CommitData cd = tx2.createCommitData();
@@ -191,16 +188,16 @@ public class FailureIT extends ITBaseImpl {
TestTransaction tx3 = new TestTransaction(env);
for (int r = 0; r < 10; r++) {
- Assert.assertEquals("1" + r + "0", tx3.get().row(r + "").col(col1).toString());
- Assert.assertEquals("1" + r + "1", tx3.get().row(r + "").col(col2).toString());
+ Assert.assertEquals("1" + r + "0", tx3.gets(r + "", col1));
+ Assert.assertEquals("1" + r + "1", tx3.gets(r + "", col2));
}
tx2.finishCommit(cd, commitTs);
TestTransaction tx4 = new TestTransaction(env);
for (int r = 0; r < 10; r++) {
- Assert.assertEquals("1" + r + "0", tx4.get().row(r + "").col(col1).toString());
- Assert.assertEquals("1" + r + "1", tx4.get().row(r + "").col(col2).toString());
+ Assert.assertEquals("1" + r + "0", tx4.gets(r + "", col1));
+ Assert.assertEquals("1" + r + "1", tx4.gets(r + "", col2));
}
if (!killTransactor) {
@@ -214,19 +211,19 @@ public class FailureIT extends ITBaseImpl {
TestTransaction tx = new TestTransaction(env);
- tx.mutate().row("bob").col(BALANCE).set(10);
- tx.mutate().row("joe").col(BALANCE).set(20);
- tx.mutate().row("jill").col(BALANCE).set(60);
+ tx.set("bob", BALANCE, "10");
+ tx.set("joe", BALANCE, "20");
+ tx.set("jill", BALANCE, "60");
tx.done();
TestTransaction tx2 = new TestTransaction(env);
- int bal1 = tx2.get().row("bob").col(BALANCE).toInteger(0);
- int bal2 = tx2.get().row("joe").col(BALANCE).toInteger(0);
+ int bal1 = Integer.parseInt(tx2.gets("bob", BALANCE));
+ int bal2 = Integer.parseInt(tx2.gets("joe", BALANCE));
- tx2.mutate().row("bob").col(BALANCE).set(bal1 - 7);
- tx2.mutate().row("joe").col(BALANCE).set(bal2 + 7);
+ tx2.set("bob", BALANCE, (bal1 - 7) + "");
+ tx2.set("joe", BALANCE, (bal2 + 7) + "");
// get locks
CommitData cd = tx2.createCommitData();
@@ -246,9 +243,9 @@ public class FailureIT extends ITBaseImpl {
TestTransaction tx4 = new TestTransaction(env);
- Assert.assertEquals(bobBal, tx4.get().row("bob").col(BALANCE).toInteger(0));
- Assert.assertEquals(joeBal, tx4.get().row("joe").col(BALANCE).toInteger(0));
- Assert.assertEquals(67, tx4.get().row("jill").col(BALANCE).toInteger(0));
+ Assert.assertEquals(bobBal + "", tx4.gets("bob", BALANCE));
+ Assert.assertEquals(joeBal + "", tx4.gets("joe", BALANCE));
+ Assert.assertEquals("67", tx4.gets("jill", BALANCE));
Stamp commitTs = env.getSharedResources().getOracleClient().getStamp();
Assert.assertFalse(tx2.commitPrimaryColumn(cd, commitTs));
@@ -259,9 +256,9 @@ public class FailureIT extends ITBaseImpl {
TestTransaction tx6 = new TestTransaction(env);
- Assert.assertEquals(bobBal, tx6.get().row("bob").col(BALANCE).toInteger(0));
- Assert.assertEquals(joeBal, tx6.get().row("joe").col(BALANCE).toInteger(0));
- Assert.assertEquals(67, tx6.get().row("jill").col(BALANCE).toInteger(0));
+ Assert.assertEquals(bobBal + "", tx6.gets("bob", BALANCE));
+ Assert.assertEquals(joeBal + "", tx6.gets("joe", BALANCE));
+ Assert.assertEquals("67", tx6.gets("jill", BALANCE));
}
@Test
@@ -279,19 +276,19 @@ public class FailureIT extends ITBaseImpl {
TestTransaction tx = new TestTransaction(env);
- tx.mutate().row("bob").col(BALANCE).set(10);
- tx.mutate().row("joe").col(BALANCE).set(20);
- tx.mutate().row("jill").col(BALANCE).set(60);
+ tx.set("bob", BALANCE, "10");
+ tx.set("joe", BALANCE, "20");
+ tx.set("jill", BALANCE, "60");
tx.done();
TestTransaction tx2 = new TestTransaction(env, t1);
- int bal1 = tx2.get().row("bob").col(BALANCE).toInteger(0);
- int bal2 = tx2.get().row("joe").col(BALANCE).toInteger(0);
+ int bal1 = Integer.parseInt(tx2.gets("bob", BALANCE));
+ int bal2 = Integer.parseInt(tx2.gets("joe", BALANCE));
- tx2.mutate().row("bob").col(BALANCE).set(bal1 - 7);
- tx2.mutate().row("joe").col(BALANCE).set(bal2 + 7);
+ tx2.set("bob", BALANCE, (bal1 - 7) + "");
+ tx2.set("joe", BALANCE, (bal2 + 7) + "");
CommitData cd = tx2.createCommitData();
Assert.assertTrue(tx2.preCommit(cd));
@@ -338,19 +335,19 @@ public class FailureIT extends ITBaseImpl {
TestTransaction tx = new TestTransaction(env);
- tx.mutate().row("bob").col(BALANCE).set(10);
- tx.mutate().row("joe").col(BALANCE).set(20);
- tx.mutate().row("jill").col(BALANCE).set(60);
+ tx.set("bob", BALANCE, "10");
+ tx.set("joe", BALANCE, "20");
+ tx.set("jill", BALANCE, "60");
tx.done();
TestTransaction tx2 = new TestTransaction(env);
- int bal1 = tx2.get().row("bob").col(BALANCE).toInteger(0);
- int bal2 = tx2.get().row("joe").col(BALANCE).toInteger(0);
+ int bal1 = Integer.parseInt(tx2.gets("bob", BALANCE));
+ int bal2 = Integer.parseInt(tx2.gets("joe", BALANCE));
- tx2.mutate().row("bob").col(BALANCE).set(bal1 - 7);
- tx2.mutate().row("joe").col(BALANCE).set(bal2 + 7);
+ tx2.set("bob", BALANCE, (bal1 - 7) + "");
+ tx2.set("joe", BALANCE, (bal2 + 7) + "");
// get locks
CommitData cd = tx2.createCommitData();
@@ -371,17 +368,17 @@ public class FailureIT extends ITBaseImpl {
TestTransaction tx4 = new TestTransaction(env);
- Assert.assertEquals(bobBal, tx4.get().row("bob").col(BALANCE).toInteger(0));
- Assert.assertEquals(joeBal, tx4.get().row("joe").col(BALANCE).toInteger(0));
- Assert.assertEquals(62, tx4.get().row("jill").col(BALANCE).toInteger(0));
+ Assert.assertEquals(bobBal + "", tx4.gets("bob", BALANCE));
+ Assert.assertEquals(joeBal + "", tx4.gets("joe", BALANCE));
+ Assert.assertEquals("62", tx4.gets("jill", BALANCE));
tx2.finishCommit(cd, commitTs);
TestTransaction tx5 = new TestTransaction(env);
- Assert.assertEquals(bobBal, tx5.get().row("bob").col(BALANCE).toInteger(0));
- Assert.assertEquals(joeBal, tx5.get().row("joe").col(BALANCE).toInteger(0));
- Assert.assertEquals(62, tx5.get().row("jill").col(BALANCE).toInteger(0));
+ Assert.assertEquals(bobBal + "", tx5.gets("bob", BALANCE));
+ Assert.assertEquals(joeBal + "", tx5.gets("joe", BALANCE));
+ Assert.assertEquals("62", tx5.gets("jill", BALANCE));
}
@Test
@@ -390,22 +387,25 @@ public class FailureIT extends ITBaseImpl {
TestTransaction tx = new TestTransaction(env);
- tx.mutate().row("url0000").fam("attr").qual("lastupdate").set(3);
- tx.mutate().row("url0000").fam("doc").qual("content").set("abc def");
+ final Column lastUpdate = new Column("attr", "lastupdate");
+ final Column docContent = new Column("doc", "content");
+ final Column docUrl = new Column("doc", "url");
+
+ tx.set("url0000", lastUpdate, "3");
+ tx.set("url0000", docContent, "abc def");
tx.done();
- TestTransaction tx2 =
- new TestTransaction(env, "url0000", typeLayer.bc().fam("attr").qual("lastupdate").vis());
- tx2.mutate().row("idx:abc").fam("doc").qual("url").set("url0000");
- tx2.mutate().row("idx:def").fam("doc").qual("url").set("url0000");
+ TestTransaction tx2 = new TestTransaction(env, "url0000", lastUpdate);
+ tx2.set("idx:abc", docUrl, "url0000");
+ tx2.set("idx:def", docUrl, "url0000");
CommitData cd = tx2.createCommitData();
tx2.preCommit(cd);
TestTransaction tx3 = new TestTransaction(env);
- Assert.assertNull(tx3.get().row("idx:abc").fam("doc").qual("url").toString());
- Assert.assertNull(tx3.get().row("idx:def").fam("doc").qual("url").toString());
- Assert.assertEquals(3, tx3.get().row("url0000").fam("attr").qual("lastupdate").toInteger(0));
+ Assert.assertNull(tx3.gets("idx:abc", docUrl));
+ Assert.assertNull(tx3.gets("idx:def", docUrl));
+ Assert.assertEquals("3", tx3.gets("url0000", lastUpdate));
Scanner scanner = env.getConnector().createScanner(env.getTable(), Authorizations.EMPTY);
Notification.configureScanner(scanner);
@@ -413,10 +413,9 @@ public class FailureIT extends ITBaseImpl {
Assert.assertTrue(iter.hasNext());
Assert.assertEquals("url0000", iter.next().getKey().getRow().toString());
- TestTransaction tx5 =
- new TestTransaction(env, "url0000", typeLayer.bc().fam("attr").qual("lastupdate").vis());
- tx5.mutate().row("idx:abc").fam("doc").qual("url").set("url0000");
- tx5.mutate().row("idx:def").fam("doc").qual("url").set("url0000");
+ TestTransaction tx5 = new TestTransaction(env, "url0000", lastUpdate);
+ tx5.set("idx:abc", docUrl, "url0000");
+ tx5.set("idx:def", docUrl, "url0000");
cd = tx5.createCommitData();
Assert.assertTrue(tx5.preCommit(cd));
Stamp commitTs = env.getSharedResources().getOracleClient().getStamp();
@@ -424,25 +423,25 @@ public class FailureIT extends ITBaseImpl {
// should roll tx5 forward
TestTransaction tx6 = new TestTransaction(env);
- Assert.assertEquals(3, tx6.get().row("url0000").fam("attr").qual("lastupdate").toInteger(0));
- Assert.assertEquals("url0000", tx6.get().row("idx:abc").fam("doc").qual("url").toString());
- Assert.assertEquals("url0000", tx6.get().row("idx:def").fam("doc").qual("url").toString());
+ Assert.assertEquals("3", tx6.gets("url0000", lastUpdate));
+ Assert.assertEquals("url0000", tx6.gets("idx:abc", docUrl));
+ Assert.assertEquals("url0000", tx6.gets("idx:def", docUrl));
iter = scanner.iterator();
Assert.assertTrue(iter.hasNext());
// TODO is tx4 start before tx5, then this test will not work because AlreadyAck is not thrown
// for overlapping.. CommitException is thrown
- TestTransaction tx4 =
- new TestTransaction(env, "url0000", typeLayer.bc().fam("attr").qual("lastupdate").vis());
- tx4.mutate().row("idx:abc").fam("doc").qual("url").set("url0000");
- tx4.mutate().row("idx:def").fam("doc").qual("url").set("url0000");
+ TestTransaction tx4 = new TestTransaction(env, "url0000", lastUpdate);
+ tx4.set("idx:abc", docUrl, "url0000");
+ tx4.set("idx:def", docUrl, "url0000");
try {
// should not go through if tx5 is properly rolled forward
tx4.commit();
Assert.fail();
} catch (AlreadyAcknowledgedException aae) {
+ // do nothing
}
// commit above should schedule async delete of notification
@@ -456,14 +455,14 @@ public class FailureIT extends ITBaseImpl {
TestTransaction tx = new TestTransaction(env);
- tx.mutate().row("bob").col(BALANCE).set(10);
- tx.mutate().row("joe").col(BALANCE).set(20);
- tx.mutate().row("jill").col(BALANCE).set(60);
+ tx.set("bob", BALANCE, "10");
+ tx.set("joe", BALANCE, "20");
+ tx.set("jill", BALANCE, "60");
tx.done();
TestTransaction tx2 = new TestTransaction(env);
- Assert.assertEquals(10, tx2.get().row("bob").col(BALANCE).toInteger(0));
+ Assert.assertEquals("10", tx2.gets("bob", BALANCE));
BankUtil.transfer(env, "joe", "jill", 1);
BankUtil.transfer(env, "joe", "bob", 1);
@@ -472,16 +471,16 @@ public class FailureIT extends ITBaseImpl {
conn.tableOperations().flush(table, null, null, true);
- Assert.assertEquals(20, tx2.get().row("joe").col(BALANCE).toInteger(0));
+ Assert.assertEquals("20", tx2.gets("joe", BALANCE));
// Stale scan should not occur due to oldest active timestamp tracking in Zookeeper
tx2.close();
TestTransaction tx3 = new TestTransaction(env);
- Assert.assertEquals(9, tx3.get().row("bob").col(BALANCE).toInteger(0));
- Assert.assertEquals(22, tx3.get().row("joe").col(BALANCE).toInteger(0));
- Assert.assertEquals(59, tx3.get().row("jill").col(BALANCE).toInteger(0));
+ Assert.assertEquals("9", tx3.gets("bob", BALANCE));
+ Assert.assertEquals("22", tx3.gets("joe", BALANCE));
+ Assert.assertEquals("59", tx3.gets("jill", BALANCE));
}
@Test(timeout = 60000)
@@ -489,18 +488,18 @@ public class FailureIT extends ITBaseImpl {
TestTransaction tx = new TestTransaction(env);
- tx.mutate().row("bob").col(BALANCE).set(10);
- tx.mutate().row("joe").col(BALANCE).set(20);
- tx.mutate().row("jill").col(BALANCE).set(60);
- tx.mutate().row("john").col(BALANCE).set(3);
+ tx.set("bob", BALANCE, "10");
+ tx.set("joe", BALANCE, "20");
+ tx.set("jill", BALANCE, "60");
+ tx.set("john", BALANCE, "3");
tx.done();
TestTransaction tx2 = new TestTransaction(env);
- Assert.assertEquals(10, tx2.get().row("bob").col(BALANCE).toInteger(0));
+ Assert.assertEquals("10", tx2.gets("bob", BALANCE));
TestTransaction tx3 = new TestTransaction(env);
- tx3.get().row("john").col(BALANCE).toInteger(0);
+ tx3.gets("john", BALANCE);
BankUtil.transfer(env, "joe", "jill", 1);
BankUtil.transfer(env, "joe", "bob", 1);
@@ -527,17 +526,17 @@ public class FailureIT extends ITBaseImpl {
conn.tableOperations().flush(table, null, null, true);
// this data should have been GCed, but the problem is not detected here
- Assert.assertNull(tx2.get().row("joe").col(BALANCE).toInteger());
+ Assert.assertNull(tx2.gets("joe", BALANCE));
try {
// closing should detect the stale scan
tx2.close();
Assert.assertFalse(true);
} catch (StaleScanException sse) {
-
+ // do nothing
}
- tx3.mutate().row("john").col(BALANCE).set(5l);
+ tx3.set("john", BALANCE, "5");
try {
tx3.commit();
@@ -549,10 +548,10 @@ public class FailureIT extends ITBaseImpl {
TestTransaction tx4 = new TestTransaction(env);
- Assert.assertEquals(9, tx4.get().row("bob").col(BALANCE).toInteger(0));
- Assert.assertEquals(22, tx4.get().row("joe").col(BALANCE).toInteger(0));
- Assert.assertEquals(59, tx4.get().row("jill").col(BALANCE).toInteger(0));
- Assert.assertEquals(3, tx4.get().row("john").col(BALANCE).toInteger(0));
+ Assert.assertEquals("9", tx4.gets("bob", BALANCE));
+ Assert.assertEquals("22", tx4.gets("joe", BALANCE));
+ Assert.assertEquals("59", tx4.gets("jill", BALANCE));
+ Assert.assertEquals("3", tx4.gets("john", BALANCE));
}
@Test
@@ -560,9 +559,9 @@ public class FailureIT extends ITBaseImpl {
TestTransaction tx1 = new TestTransaction(env);
- tx1.mutate().row("bob").col(BALANCE).set(10);
- tx1.mutate().row("joe").col(BALANCE).set(20);
- tx1.mutate().row("jill").col(BALANCE).set(60);
+ tx1.set("bob", BALANCE, "10");
+ tx1.set("joe", BALANCE, "20");
+ tx1.set("jill", BALANCE, "60");
CommitData cd = tx1.createCommitData();
Assert.assertTrue(tx1.preCommit(cd));
@@ -570,23 +569,23 @@ public class FailureIT extends ITBaseImpl {
while (true) {
TestTransaction tx2 = new TestTransaction(env);
- tx2.mutate().row("bob").col(BALANCE).set(11);
- tx2.mutate().row("jill").col(BALANCE).set(61);
+ tx2.set("bob", BALANCE, "11");
+ tx2.set("jill", BALANCE, "61");
// tx1 should be rolled back even in case where columns tx1 locked are not read by tx2
try {
tx2.commit();
break;
} catch (CommitException ce) {
-
+ // do nothing
}
}
TestTransaction tx4 = new TestTransaction(env);
- Assert.assertEquals(11, tx4.get().row("bob").col(BALANCE).toInteger(0));
- Assert.assertNull(tx4.get().row("joe").col(BALANCE).toInteger());
- Assert.assertEquals(61, tx4.get().row("jill").col(BALANCE).toInteger(0));
+ Assert.assertEquals("11", tx4.gets("bob", BALANCE));
+ Assert.assertNull(tx4.gets("joe", BALANCE));
+ Assert.assertEquals("61", tx4.gets("jill", BALANCE));
}
@Test
@@ -596,9 +595,9 @@ public class FailureIT extends ITBaseImpl {
TestTransaction tx1 = new TestTransaction(env);
- tx1.mutate().row("bob").col(BALANCE).set(10);
- tx1.mutate().row("joe").col(BALANCE).set(20);
- tx1.mutate().row("jill").col(BALANCE).set(60);
+ tx1.set("bob", BALANCE, "10");
+ tx1.set("joe", BALANCE, "20");
+ tx1.set("jill", BALANCE, "60");
tx1.done();
@@ -606,13 +605,12 @@ public class FailureIT extends ITBaseImpl {
TestTransaction tx2 = new TestTransaction(env, "jill", BALANCE, 1);
TestTransaction tx3 = new TestTransaction(env);
- tx3.mutate().row("bob").col(BALANCE).increment(5);
- tx3.mutate().row("joe").col(BALANCE).increment(-5);
+ TestUtil.increment(tx3, "bob", BALANCE, 5);
+ TestUtil.increment(tx3, "joe", BALANCE, -5);
tx3.done();
- tx2.mutate().row("bob").col(BALANCE).increment(5);
- tx2.mutate().row("jill").col(BALANCE).increment(-5);
-
+ TestUtil.increment(tx2, "bob", BALANCE, 5);
+ TestUtil.increment(tx2, "jill", BALANCE, -5);
// should be able to successfully lock the primary column jill... but then should fail to lock
// bob and have to rollback
@@ -620,10 +618,9 @@ public class FailureIT extends ITBaseImpl {
tx2.commit();
Assert.fail("Expected commit exception");
} catch (CommitException ce) {
-
+ // do nothing
}
-
boolean sawExpected = wasRolledBackPrimary(tx2.getStartTimestamp(), "jill");
Assert.assertTrue(sawExpected);
http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/b2c91b95/modules/integration/src/test/java/org/apache/fluo/integration/impl/FaultyConfig.java
----------------------------------------------------------------------
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/impl/FaultyConfig.java b/modules/integration/src/test/java/org/apache/fluo/integration/impl/FaultyConfig.java
index 7486b5a..4da6b08 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/impl/FaultyConfig.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/impl/FaultyConfig.java
@@ -43,13 +43,11 @@ public class FaultyConfig extends Environment {
private Random rand;
private double wp;
- public FaultyConditionalWriter(ConditionalWriter cw, double unknownProbability,
- double writeProbability) {
+ FaultyConditionalWriter(ConditionalWriter cw, double unknownProbability, double writeProbability) {
this.cw = cw;
this.up = unknownProbability;
this.wp = writeProbability;
this.rand = new Random();
-
}
@Override
@@ -93,7 +91,7 @@ public class FaultyConfig extends Environment {
private double up;
private double wp;
- public FaultyConfig(Environment env, double up, double wp) throws Exception {
+ FaultyConfig(Environment env, double up, double wp) throws Exception {
super(env);
this.up = up;
this.wp = wp;
http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/b2c91b95/modules/integration/src/test/java/org/apache/fluo/integration/impl/FluoIT.java
----------------------------------------------------------------------
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/impl/FluoIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/impl/FluoIT.java
index 661ebf2..321374b 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/impl/FluoIT.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/impl/FluoIT.java
@@ -28,6 +28,7 @@ import org.apache.fluo.api.client.FluoAdmin;
import org.apache.fluo.api.client.FluoClient;
import org.apache.fluo.api.client.FluoFactory;
import org.apache.fluo.api.client.Snapshot;
+import org.apache.fluo.api.client.TransactionBase;
import org.apache.fluo.api.config.FluoConfiguration;
import org.apache.fluo.api.config.ObserverConfiguration;
import org.apache.fluo.api.config.ScannerConfiguration;
@@ -37,33 +38,30 @@ import org.apache.fluo.api.data.Span;
import org.apache.fluo.api.exceptions.CommitException;
import org.apache.fluo.api.iterator.ColumnIterator;
import org.apache.fluo.api.iterator.RowIterator;
-import org.apache.fluo.api.types.StringEncoder;
-import org.apache.fluo.api.types.TypeLayer;
-import org.apache.fluo.api.types.TypedObserver;
-import org.apache.fluo.api.types.TypedTransactionBase;
+import org.apache.fluo.api.observer.AbstractObserver;
import org.apache.fluo.core.exceptions.AlreadyAcknowledgedException;
import org.apache.fluo.core.impl.Environment;
import org.apache.fluo.core.impl.TransactionImpl.CommitData;
import org.apache.fluo.core.oracle.Stamp;
import org.apache.fluo.integration.ITBaseImpl;
import org.apache.fluo.integration.TestTransaction;
+import org.apache.fluo.integration.TestUtil;
import org.junit.Assert;
import org.junit.Test;
-public class FluoIT extends ITBaseImpl {
+import static org.apache.fluo.integration.BankUtil.BALANCE;
- static TypeLayer typeLayer = new TypeLayer(new StringEncoder());
+public class FluoIT extends ITBaseImpl {
- public static class BalanceObserver extends TypedObserver {
+ public static class BalanceObserver extends AbstractObserver {
@Override
public ObservedColumn getObservedColumn() {
- return new ObservedColumn(typeLayer.bc().fam("account").qual("balance").vis(),
- NotificationType.STRONG);
+ return new ObservedColumn(BALANCE, NotificationType.STRONG);
}
@Override
- public void process(TypedTransactionBase tx, Bytes row, Column col) {
+ public void process(TransactionBase tx, Bytes row, Column col) {
Assert.fail();
}
}
@@ -102,42 +100,36 @@ public class FluoIT extends ITBaseImpl {
TestTransaction tx = new TestTransaction(env);
- Column balanceCol = typeLayer.bc().fam("account").qual("balance").vis();
-
- tx.mutate().row("bob").col(balanceCol).set(10);
- tx.mutate().row("joe").col(balanceCol).set(20);
- tx.mutate().row("jill").col(balanceCol).set(60);
+ tx.set("bob", BALANCE, "10");
+ tx.set("joe", BALANCE, "20");
+ tx.set("jill", BALANCE, "60");
tx.done();
tx = new TestTransaction(env);
- int bal1 = tx.get().row("bob").col(balanceCol).toInteger(0);
- int bal2 = tx.get().row("joe").col(balanceCol).toInteger(0);
- Assert.assertEquals(10, bal1);
- Assert.assertEquals(20, bal2);
+ Assert.assertEquals("10", tx.gets("bob", BALANCE));
+ Assert.assertEquals("20", tx.gets("joe", BALANCE));
- tx.mutate().row("bob").col(balanceCol).set(bal1 - 5);
- tx.mutate().row("joe").col(balanceCol).set(bal2 + 5);
+ TestUtil.increment(tx, "bob", BALANCE, -5);
+ TestUtil.increment(tx, "joe", BALANCE, 5);
TestTransaction tx2 = new TestTransaction(env);
- int bal3 = tx2.get().row("bob").col(balanceCol).toInteger(0);
- int bal4 = tx2.get().row("jill").col(balanceCol).toInteger(0);
- Assert.assertEquals(10, bal3);
- Assert.assertEquals(60, bal4);
+ Assert.assertEquals("10", tx2.gets("bob", BALANCE));
+ Assert.assertEquals("60", tx2.gets("jill", BALANCE));
- tx2.mutate().row("bob").col(balanceCol).set(bal3 - 5);
- tx2.mutate().row("jill").col(balanceCol).set(bal4 + 5);
+ TestUtil.increment(tx2, "bob", BALANCE, -5);
+ TestUtil.increment(tx2, "jill", BALANCE, 5);
tx2.done();
assertCommitFails(tx);
TestTransaction tx3 = new TestTransaction(env);
- Assert.assertEquals(5, tx3.get().row("bob").col(balanceCol).toInteger(0));
- Assert.assertEquals(20, tx3.get().row("joe").col(balanceCol).toInteger(0));
- Assert.assertEquals(65, tx3.get().row("jill").col(balanceCol).toInteger(0));
+ Assert.assertEquals("5", tx3.gets("bob", BALANCE));
+ Assert.assertEquals("20", tx3.gets("joe", BALANCE));
+ Assert.assertEquals("65", tx3.gets("jill", BALANCE));
tx3.done();
}
@@ -168,12 +160,10 @@ public class FluoIT extends ITBaseImpl {
TestTransaction tx = new TestTransaction(env);
- Column balanceCol = typeLayer.bc().fam("account").qual("balance").vis();
-
- tx.mutate().row("bob").col(balanceCol).set(10);
- tx.mutate().row("joe").col(balanceCol).set(20);
- tx.mutate().row("jill").col(balanceCol).set(60);
- tx.mutate().row("jane").col(balanceCol).set(0);
+ tx.set("bob", BALANCE, "10");
+ tx.set("joe", BALANCE, "20");
+ tx.set("jill", BALANCE, "60");
+ tx.set("jane", BALANCE, "0");
tx.done();
@@ -181,51 +171,45 @@ public class FluoIT extends ITBaseImpl {
TestTransaction tx2 = new TestTransaction(env);
- tx2.mutate().row("bob").col(balanceCol).set(tx2.get().row("bob").col(balanceCol).toLong() - 5);
- tx2.mutate().row("joe").col(balanceCol).set(tx2.get().row("joe").col(balanceCol).toLong() - 5);
- tx2.mutate().row("jill").col(balanceCol)
- .set(tx2.get().row("jill").col(balanceCol).toLong() + 10);
+ TestUtil.increment(tx2, "bob", BALANCE, -5);
+ TestUtil.increment(tx2, "joe", BALANCE, -5);
+ TestUtil.increment(tx2, "jill", BALANCE, 10);
- long bal1 = tx1.get().row("bob").col(balanceCol).toLong();
+ Assert.assertEquals("10", tx1.gets("bob", BALANCE));
tx2.done();
TestTransaction txd = new TestTransaction(env);
- txd.mutate().row("jane").col(balanceCol).delete();
+ txd.delete("jane", BALANCE);
txd.done();
- long bal2 = tx1.get().row("joe").col(balanceCol).toLong();
- long bal3 = tx1.get().row("jill").col(balanceCol).toLong();
- long bal4 = tx1.get().row("jane").col(balanceCol).toLong();
+ Assert.assertEquals("20", tx1.gets("joe", BALANCE));
+ Assert.assertEquals("60", tx1.gets("jill", BALANCE));
+ Assert.assertEquals("0", tx1.gets("jane", BALANCE));
- Assert.assertEquals(10l, bal1);
- Assert.assertEquals(20l, bal2);
- Assert.assertEquals(60l, bal3);
- Assert.assertEquals(0l, bal4);
-
- tx1.mutate().row("bob").col(balanceCol).set(bal1 - 5);
- tx1.mutate().row("joe").col(balanceCol).set(bal2 + 5);
+ tx1.set("bob", BALANCE, "5");
+ tx1.set("joe", BALANCE, "25");
assertCommitFails(tx1);
TestTransaction tx3 = new TestTransaction(env);
TestTransaction tx4 = new TestTransaction(env);
- tx4.mutate().row("jane").col(balanceCol).set(3);
+ tx4.set("jane", BALANCE, "3");
tx4.done();
- Assert.assertEquals(5l, tx3.get().row("bob").col(balanceCol).toLong(0));
- Assert.assertEquals(15l, tx3.get().row("joe").col(balanceCol).toLong(0));
- Assert.assertEquals(70l, tx3.get().row("jill").col(balanceCol).toLong(0));
- Assert.assertNull(tx3.get().row("jane").col(balanceCol).toLong());
+ Assert.assertEquals("5", tx3.gets("bob", BALANCE));
+ Assert.assertEquals("15", tx3.gets("joe", BALANCE));
+ Assert.assertEquals("70", tx3.gets("jill", BALANCE));
+ Assert.assertNull(tx3.gets("jane", BALANCE));
tx3.done();
TestTransaction tx5 = new TestTransaction(env);
- Assert.assertEquals(5l, tx5.get().row("bob").col(balanceCol).toLong(0));
- Assert.assertEquals(15l, tx5.get().row("joe").col(balanceCol).toLong(0));
- Assert.assertEquals(70l, tx5.get().row("jill").col(balanceCol).toLong(0));
- Assert.assertEquals(3l, tx5.get().row("jane").col(balanceCol).toLong(0));
+ Assert.assertEquals("5", tx5.gets("bob", BALANCE));
+ Assert.assertEquals("15", tx5.gets("joe", BALANCE));
+ Assert.assertEquals("70", tx5.gets("jill", BALANCE));
+ Assert.assertEquals("3", tx5.gets("jane", BALANCE));
tx5.done();
}
@@ -235,45 +219,43 @@ public class FluoIT extends ITBaseImpl {
TestTransaction tx = new TestTransaction(env);
- Column balanceCol = typeLayer.bc().fam("account").qual("balance").vis();
-
- tx.mutate().row("bob").col(balanceCol).set(10);
- tx.mutate().row("joe").col(balanceCol).set(20);
- tx.mutate().row("jill").col(balanceCol).set(60);
+ tx.set("bob", BALANCE, "10");
+ tx.set("joe", BALANCE, "20");
+ tx.set("jill", BALANCE, "60");
tx.done();
- TestTransaction tx1 = new TestTransaction(env, "joe", balanceCol);
- tx1.get().row("joe").col(balanceCol);
- tx1.mutate().row("jill").col(balanceCol).set(61);
+ TestTransaction tx1 = new TestTransaction(env, "joe", BALANCE);
+ tx1.gets("joe", BALANCE);
+ tx1.set("jill", BALANCE, "61");
- TestTransaction tx2 = new TestTransaction(env, "joe", balanceCol);
- tx2.get().row("joe").col(balanceCol);
- tx2.mutate().row("bob").col(balanceCol).set(11);
+ TestTransaction tx2 = new TestTransaction(env, "joe", BALANCE);
+ tx2.gets("joe", BALANCE);
+ tx2.set("bob", BALANCE, "11");
tx1.done();
assertAAck(tx2);
TestTransaction tx3 = new TestTransaction(env);
- Assert.assertEquals(10, tx3.get().row("bob").col(balanceCol).toInteger(0));
- Assert.assertEquals(20, tx3.get().row("joe").col(balanceCol).toInteger(0));
- Assert.assertEquals(61, tx3.get().row("jill").col(balanceCol).toInteger(0));
+ Assert.assertEquals("10", tx3.gets("bob", BALANCE));
+ Assert.assertEquals("20", tx3.gets("joe", BALANCE));
+ Assert.assertEquals("61", tx3.gets("jill", BALANCE));
// update joe, so it can be acknowledged again
- tx3.mutate().row("joe").col(balanceCol).set(21);
+ tx3.set("joe", BALANCE, "21");
tx3.done();
- TestTransaction tx4 = new TestTransaction(env, "joe", balanceCol);
- tx4.get().row("joe").col(balanceCol);
- tx4.mutate().row("jill").col(balanceCol).set(62);
+ TestTransaction tx4 = new TestTransaction(env, "joe", BALANCE);
+ tx4.gets("joe", BALANCE);
+ tx4.set("jill", BALANCE, "62");
- TestTransaction tx5 = new TestTransaction(env, "joe", balanceCol);
- tx5.get().row("joe").col(balanceCol);
- tx5.mutate().row("bob").col(balanceCol).set(11);
+ TestTransaction tx5 = new TestTransaction(env, "joe", BALANCE);
+ tx5.gets("joe", BALANCE);
+ tx5.set("bob", BALANCE, "11");
- TestTransaction tx7 = new TestTransaction(env, "joe", balanceCol);
+ TestTransaction tx7 = new TestTransaction(env, "joe", BALANCE);
// make the 2nd transaction to start commit 1st
tx5.done();
@@ -281,22 +263,22 @@ public class FluoIT extends ITBaseImpl {
TestTransaction tx6 = new TestTransaction(env);
- Assert.assertEquals(11, tx6.get().row("bob").col(balanceCol).toInteger(0));
- Assert.assertEquals(21, tx6.get().row("joe").col(balanceCol).toInteger(0));
- Assert.assertEquals(61, tx6.get().row("jill").col(balanceCol).toInteger(0));
+ Assert.assertEquals("11", tx6.gets("bob", BALANCE));
+ Assert.assertEquals("21", tx6.gets("joe", BALANCE));
+ Assert.assertEquals("61", tx6.gets("jill", BALANCE));
tx6.done();
- tx7.get().row("joe").col(balanceCol);
- tx7.mutate().row("bob").col(balanceCol).set(15);
- tx7.mutate().row("jill").col(balanceCol).set(60);
+ tx7.gets("joe", BALANCE);
+ tx7.set("bob", BALANCE, "15");
+ tx7.set("jill", BALANCE, "60");
assertAAck(tx7);
TestTransaction tx8 = new TestTransaction(env);
- Assert.assertEquals(11, tx8.get().row("bob").col(balanceCol).toInteger(0));
- Assert.assertEquals(21, tx8.get().row("joe").col(balanceCol).toInteger(0));
- Assert.assertEquals(61, tx8.get().row("jill").col(balanceCol).toInteger(0));
+ Assert.assertEquals("11", tx8.gets("bob", BALANCE));
+ Assert.assertEquals("21", tx8.gets("joe", BALANCE));
+ Assert.assertEquals("61", tx8.gets("jill", BALANCE));
tx8.done();
}
@@ -304,27 +286,26 @@ public class FluoIT extends ITBaseImpl {
public void testAck2() throws Exception {
TestTransaction tx = new TestTransaction(env);
- Column balanceCol = typeLayer.bc().fam("account").qual("balance").vis();
- Column addrCol = typeLayer.bc().fam("account").qual("addr").vis();
+ Column addrCol = new Column("account", "addr");
- tx.mutate().row("bob").col(balanceCol).set(10);
- tx.mutate().row("joe").col(balanceCol).set(20);
- tx.mutate().row("jill").col(balanceCol).set(60);
+ tx.set("bob", BALANCE, "10");
+ tx.set("joe", BALANCE, "20");
+ tx.set("jill", BALANCE, "60");
tx.done();
- TestTransaction tx1 = new TestTransaction(env, "bob", balanceCol);
- TestTransaction tx2 = new TestTransaction(env, "bob", balanceCol);
- TestTransaction tx3 = new TestTransaction(env, "bob", balanceCol);
+ TestTransaction tx1 = new TestTransaction(env, "bob", BALANCE);
+ TestTransaction tx2 = new TestTransaction(env, "bob", BALANCE);
+ TestTransaction tx3 = new TestTransaction(env, "bob", BALANCE);
- tx1.get().row("bob").col(balanceCol).toInteger();
- tx2.get().row("bob").col(balanceCol).toInteger();
+ tx1.gets("bob", BALANCE);
+ tx2.gets("bob", BALANCE);
- tx1.get().row("bob").col(addrCol).toInteger();
- tx2.get().row("bob").col(addrCol).toInteger();
+ tx1.gets("bob", addrCol);
+ tx2.gets("bob", addrCol);
- tx1.mutate().row("bob").col(addrCol).set("1 loop pl");
- tx2.mutate().row("bob").col(addrCol).set("1 loop pl");
+ tx1.set("bob", addrCol, "1 loop pl");
+ tx2.set("bob", addrCol, "1 loop pl");
// this test overlaps the commits of two transactions w/ the same trigger
@@ -338,7 +319,7 @@ public class FluoIT extends ITBaseImpl {
tx1.finishCommit(cd, commitTs);
tx1.close();
- tx3.mutate().row("bob").col(addrCol).set("2 loop pl");
+ tx3.set("bob", addrCol, "2 loop pl");
assertAAck(tx3);
}
@@ -346,22 +327,20 @@ public class FluoIT extends ITBaseImpl {
public void testAck3() throws Exception {
TestTransaction tx = new TestTransaction(env);
- Column balanceCol = typeLayer.bc().fam("account").qual("balance").vis();
-
- tx.mutate().row("bob").col(balanceCol).set(10);
- tx.mutate().row("joe").col(balanceCol).set(20);
- tx.mutate().row("jill").col(balanceCol).set(60);
+ tx.set("bob", BALANCE, "10");
+ tx.set("joe", BALANCE, "20");
+ tx.set("jill", BALANCE, "60");
tx.done();
- long notTS1 = TestTransaction.getNotificationTS(env, "bob", balanceCol);
+ long notTS1 = TestTransaction.getNotificationTS(env, "bob", BALANCE);
// this transaction should create a second notification
TestTransaction tx1 = new TestTransaction(env);
- tx1.mutate().row("bob").col(balanceCol).set(11);
+ tx1.set("bob", BALANCE, "11");
tx1.done();
- long notTS2 = TestTransaction.getNotificationTS(env, "bob", balanceCol);
+ long notTS2 = TestTransaction.getNotificationTS(env, "bob", BALANCE);
Assert.assertTrue(notTS1 < notTS2);
@@ -369,17 +348,17 @@ public class FluoIT extends ITBaseImpl {
// should execute
// google paper calls this message collapsing
- TestTransaction tx3 = new TestTransaction(env, "bob", balanceCol, notTS1);
+ TestTransaction tx3 = new TestTransaction(env, "bob", BALANCE, notTS1);
- TestTransaction tx2 = new TestTransaction(env, "bob", balanceCol, notTS1);
- Assert.assertEquals(11, tx2.get().row("bob").col(balanceCol).toInteger(0));
+ TestTransaction tx2 = new TestTransaction(env, "bob", BALANCE, notTS1);
+ Assert.assertEquals("11", tx2.gets("bob", BALANCE));
tx2.done();
- Assert.assertEquals(11, tx3.get().row("bob").col(balanceCol).toInteger(0));
+ Assert.assertEquals("11", tx3.gets("bob", BALANCE));
assertAAck(tx3);
- TestTransaction tx4 = new TestTransaction(env, "bob", balanceCol, notTS2);
- Assert.assertEquals(11, tx4.get().row("bob").col(balanceCol).toInteger(0));
+ TestTransaction tx4 = new TestTransaction(env, "bob", BALANCE, notTS2);
+ Assert.assertEquals("11", tx4.gets("bob", BALANCE));
assertAAck(tx4);
}
@@ -390,31 +369,29 @@ public class FluoIT extends ITBaseImpl {
TestTransaction tx = new TestTransaction(env);
- Column balanceCol = typeLayer.bc().fam("account").qual("balance").vis();
-
- tx.mutate().row("bob").col(balanceCol).set(10);
- tx.mutate().row("joe").col(balanceCol).set(20);
- tx.mutate().row("jill").col(balanceCol).set(60);
+ tx.set("bob", BALANCE, "10");
+ tx.set("joe", BALANCE, "20");
+ tx.set("jill", BALANCE, "60");
tx.done();
- TestTransaction tx2 = new TestTransaction(env, "joe", balanceCol);
- tx2.get().row("joe").col(balanceCol);
- tx2.mutate().row("joe").col(balanceCol).set(21);
- tx2.mutate().row("bob").col(balanceCol).set(11);
+ TestTransaction tx2 = new TestTransaction(env, "joe", BALANCE);
+ tx2.gets("joe", BALANCE);
+ tx2.set("joe", BALANCE, "21");
+ tx2.set("bob", BALANCE, "11");
- TestTransaction tx1 = new TestTransaction(env, "joe", balanceCol);
- tx1.get().row("joe").col(balanceCol);
- tx1.mutate().row("jill").col(balanceCol).set(61);
+ TestTransaction tx1 = new TestTransaction(env, "joe", BALANCE);
+ tx1.gets("joe", BALANCE);
+ tx1.set("jill", BALANCE, "61");
tx1.done();
assertAAck(tx2);
TestTransaction tx3 = new TestTransaction(env);
- Assert.assertEquals(10, tx3.get().row("bob").col(balanceCol).toInteger(0));
- Assert.assertEquals(20, tx3.get().row("joe").col(balanceCol).toInteger(0));
- Assert.assertEquals(61, tx3.get().row("jill").col(balanceCol).toInteger(0));
+ Assert.assertEquals("10", tx3.gets("bob", BALANCE));
+ Assert.assertEquals("20", tx3.gets("joe", BALANCE));
+ Assert.assertEquals("61", tx3.gets("jill", BALANCE));
tx3.done();
}
@@ -426,13 +403,13 @@ public class FluoIT extends ITBaseImpl {
env.setAuthorizations(new Authorizations("A", "B", "C"));
- Column balanceCol = typeLayer.bc().fam("account").qual("balance").vis("A|B");
+ Column balanceCol = new Column("account", "balance", "A|B");
TestTransaction tx = new TestTransaction(env);
- tx.mutate().row("bob").col(balanceCol).set(10);
- tx.mutate().row("joe").col(balanceCol).set(20);
- tx.mutate().row("jill").col(balanceCol).set(60);
+ tx.set("bob", balanceCol, "10");
+ tx.set("joe", balanceCol, "20");
+ tx.set("jill", balanceCol, "60");
tx.done();
@@ -441,9 +418,9 @@ public class FluoIT extends ITBaseImpl {
env2.setAuthorizations(new Authorizations("B"));
TestTransaction tx2 = new TestTransaction(env2);
- Assert.assertEquals(10, tx2.get().row("bob").col(balanceCol).toInteger(0));
- Assert.assertEquals(20, tx2.get().row("joe").col(balanceCol).toInteger(0));
- Assert.assertEquals(60, tx2.get().row("jill").col(balanceCol).toInteger(0));
+ Assert.assertEquals("10", tx2.gets("bob", balanceCol));
+ Assert.assertEquals("20", tx2.gets("joe", balanceCol));
+ Assert.assertEquals("60", tx2.gets("jill", balanceCol));
tx2.done();
env2.close();
@@ -451,9 +428,9 @@ public class FluoIT extends ITBaseImpl {
env3.setAuthorizations(new Authorizations("C"));
TestTransaction tx3 = new TestTransaction(env3);
- Assert.assertNull(tx3.get().row("bob").col(balanceCol).toInteger());
- Assert.assertNull(tx3.get().row("joe").col(balanceCol).toInteger());
- Assert.assertNull(tx3.get().row("jill").col(balanceCol).toInteger());
+ Assert.assertNull(tx3.gets("bob", balanceCol));
+ Assert.assertNull(tx3.gets("joe", balanceCol));
+ Assert.assertNull(tx3.gets("jill", balanceCol));
tx3.done();
env3.close();
}
@@ -464,17 +441,17 @@ public class FluoIT extends ITBaseImpl {
// status
TestTransaction tx = new TestTransaction(env);
- tx.mutate().row("d00001").fam("data").qual("content")
- .set("blah blah, blah http://a.com. Blah blah http://b.com. Blah http://c.com");
- tx.mutate().row("d00001").fam("outlink").qual("http://a.com").set("");
- tx.mutate().row("d00001").fam("outlink").qual("http://b.com").set("");
- tx.mutate().row("d00001").fam("outlink").qual("http://c.com").set("");
-
- tx.mutate().row("d00002").fam("data").qual("content")
- .set("blah blah, blah http://d.com. Blah blah http://e.com. Blah http://c.com");
- tx.mutate().row("d00002").fam("outlink").qual("http://d.com").set("");
- tx.mutate().row("d00002").fam("outlink").qual("http://e.com").set("");
- tx.mutate().row("d00002").fam("outlink").qual("http://c.com").set("");
+ tx.set("d00001", new Column("data", "content"),
+ "blah blah, blah http://a.com. Blah blah http://b.com. Blah http://c.com");
+ tx.set("d00001", new Column("outlink", "http://a.com"), "");
+ tx.set("d00001", new Column("outlink", "http://b.com"), "");
+ tx.set("d00001", new Column("outlink", "http://c.com"), "");
+
+ tx.set("d00002", new Column("data", "content"),
+ "blah blah, blah http://d.com. Blah blah http://e.com. Blah http://c.com");
+ tx.set("d00002", new Column("outlink", "http://d.com"), "");
+ tx.set("d00002", new Column("outlink", "http://e.com"), "");
+ tx.set("d00002", new Column("outlink", "http://c.com"), "");
tx.done();
@@ -482,12 +459,12 @@ public class FluoIT extends ITBaseImpl {
TestTransaction tx3 = new TestTransaction(env);
- tx3.mutate().row("d00001").fam("data").qual("content")
- .set("blah blah, blah http://a.com. Blah http://c.com . Blah http://z.com");
- tx3.mutate().row("d00001").fam("outlink").qual("http://a.com").set("");
- tx3.mutate().row("d00001").fam("outlink").qual("http://b.com").delete();
- tx3.mutate().row("d00001").fam("outlink").qual("http://c.com").set("");
- tx3.mutate().row("d00001").fam("outlink").qual("http://z.com").set("");
+ tx3.set("d00001", new Column("data", "content"),
+ "blah blah, blah http://a.com. Blah http://c.com . Blah http://z.com");
+ tx3.set("d00001", new Column("outlink", "http://a.com"), "");
+ tx3.delete("d00001", new Column("outlink", "http://b.com"));
+ tx3.set("d00001", new Column("outlink", "http://c.com"), "");
+ tx3.set("d00001", new Column("outlink", "http://z.com"), "");
tx3.done();
@@ -505,9 +482,9 @@ public class FluoIT extends ITBaseImpl {
tx2.done();
HashSet<Column> expected = new HashSet<>();
- expected.add(typeLayer.bc().fam("outlink").qual("http://a.com").vis());
- expected.add(typeLayer.bc().fam("outlink").qual("http://b.com").vis());
- expected.add(typeLayer.bc().fam("outlink").qual("http://c.com").vis());
+ expected.add(new Column("outlink", "http://a.com"));
+ expected.add(new Column("outlink", "http://b.com"));
+ expected.add(new Column("outlink", "http://c.com"));
Assert.assertEquals(expected, columns);
@@ -522,8 +499,8 @@ public class FluoIT extends ITBaseImpl {
columns.add(citer.next().getKey());
}
}
- expected.add(typeLayer.bc().fam("outlink").qual("http://z.com").vis());
- expected.remove(typeLayer.bc().fam("outlink").qual("http://b.com").vis());
+ expected.add(new Column("outlink", "http://z.com"));
+ expected.remove(new Column("outlink", "http://b.com"));
Assert.assertEquals(expected, columns);
tx4.done();
}
http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/b2c91b95/modules/integration/src/test/java/org/apache/fluo/integration/impl/GarbageCollectionIteratorIT.java
----------------------------------------------------------------------
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/impl/GarbageCollectionIteratorIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/impl/GarbageCollectionIteratorIT.java
index d83c6f1..0451f16 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/impl/GarbageCollectionIteratorIT.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/impl/GarbageCollectionIteratorIT.java
@@ -90,14 +90,17 @@ public class GarbageCollectionIteratorIT extends ITBaseImpl {
@Test(timeout = 60000)
public void testDeletedDataIsDropped() throws Exception {
+
+ final Column docUri = new Column("doc", "uri");
+
TestTransaction tx1 = new TestTransaction(env);
- tx1.mutate().row("001").fam("doc").qual("uri").set("file:///abc.txt");
+ tx1.set("001", docUri, "file:///abc.txt");
tx1.done();
TestTransaction tx2 = new TestTransaction(env);
TestTransaction tx3 = new TestTransaction(env);
- tx3.mutate().row("001").fam("doc").qual("uri").delete();
+ tx3.delete("001", docUri);
tx3.done();
TestTransaction tx4 = new TestTransaction(env);
@@ -107,16 +110,16 @@ public class GarbageCollectionIteratorIT extends ITBaseImpl {
// Force a garbage collection
conn.tableOperations().compact(table, null, null, true, true);
- Assert.assertEquals("file:///abc.txt", tx2.get().row("001").fam("doc").qual("uri").toString());
+ Assert.assertEquals("file:///abc.txt", tx2.gets("001", docUri));
tx2.done();
- Assert.assertNull(tx4.get().row("001").fam("doc").qual("uri").toString());
+ Assert.assertNull(tx4.gets("001", docUri));
waitForGcTime(tx4.getStartTimestamp());
conn.tableOperations().compact(table, null, null, true, true);
- Assert.assertNull(tx4.get().row("001").fam("doc").qual("uri").toString());
+ Assert.assertNull(tx4.gets("001", docUri));
Scanner scanner = conn.createScanner(table, Authorizations.EMPTY);
Assert.assertEquals(0, Iterables.size(scanner));
@@ -134,8 +137,8 @@ public class GarbageCollectionIteratorIT extends ITBaseImpl {
TestTransaction tx2 = new TestTransaction(env, t2);
for (int r = 0; r < 10; r++) {
- tx2.mutate().row(r + "").col(col1).set("1" + r + "0");
- tx2.mutate().row(r + "").col(col2).set("1" + r + "1");
+ tx2.set(r + "", col1, "1" + r + "0");
+ tx2.set(r + "", col2, "1" + r + "1");
}
CommitData cd = tx2.createCommitData();
http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/b2c91b95/modules/integration/src/test/java/org/apache/fluo/integration/impl/NotificationGcIT.java
----------------------------------------------------------------------
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/impl/NotificationGcIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/impl/NotificationGcIT.java
index 81a5a93..37bca0a 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/impl/NotificationGcIT.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/impl/NotificationGcIT.java
@@ -25,6 +25,7 @@ import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
import org.apache.fluo.accumulo.util.ColumnConstants;
import org.apache.fluo.api.config.ObserverConfiguration;
+import org.apache.fluo.api.data.Column;
import org.apache.fluo.core.impl.Environment;
import org.apache.fluo.core.impl.Notification;
import org.apache.fluo.core.util.ByteUtil;
@@ -36,7 +37,7 @@ import org.junit.Test;
public class NotificationGcIT extends ITBaseMini {
- public static void assertRawNotifications(int expected, Environment env) throws Exception {
+ private static void assertRawNotifications(int expected, Environment env) throws Exception {
Scanner scanner = env.getConnector().createScanner(env.getTable(), env.getAuthorizations());
scanner.fetchColumnFamily(ByteUtil.toText(ColumnConstants.NOTIFY_CF));
int size = Iterables.size(scanner);
@@ -48,7 +49,7 @@ public class NotificationGcIT extends ITBaseMini {
Assert.assertEquals(expected, size);
}
- public static int countNotifications(Environment env) throws Exception {
+ private static int countNotifications(Environment env) throws Exception {
Scanner scanner = env.getConnector().createScanner(env.getTable(), env.getAuthorizations());
Notification.configureScanner(scanner);
return Iterables.size(scanner);
@@ -61,31 +62,35 @@ public class NotificationGcIT extends ITBaseMini {
@Test
public void testNotificationGC() throws Exception {
+
+ final Column statCount = new Column("stat", "count");
+ final Column statCheck = new Column("stat", "check");
+
Environment env = new Environment(config);
TestTransaction tx1 = new TestTransaction(env);
- tx1.mutate().row("r1").fam("stat").qual("count").set(3);
+ tx1.set("r1", statCount, 3 + "");
tx1.done();
TestTransaction tx2 = new TestTransaction(env);
- tx2.mutate().row("r2").fam("stat").qual("count").set(7);
+ tx2.set("r2", statCount, 7 + "");
tx2.done();
TestTransaction tx3 = new TestTransaction(env);
- tx3.mutate().row("r1").fam("stats").qual("af89").set(5);
- tx3.mutate().row("r1").fam("stat").qual("check").weaklyNotify();
+ tx3.set("r1", new Column("stats", "af89"), 5 + "");
+ tx3.setWeakNotification("r1", statCheck);
tx3.done();
TestTransaction tx4 = new TestTransaction(env);
- tx4.mutate().row("r2").fam("stats").qual("af99").set(7);
- tx4.mutate().row("r2").fam("stat").qual("check").weaklyNotify();
+ tx4.set("r2", new Column("stats", "af99"), 7 + "");
+ tx4.setWeakNotification("r2", statCheck);
tx4.done();
miniFluo.waitForObservers();
TestTransaction tx5 = new TestTransaction(env);
- Assert.assertEquals(8, tx5.get().row("r1").fam("stat").qual("count").toInteger(0));
- Assert.assertEquals(14, tx5.get().row("r2").fam("stat").qual("count").toInteger(0));
+ Assert.assertEquals("8", tx5.gets("r1", statCount));
+ Assert.assertEquals("14", tx5.gets("r2", statCount));
assertRawNotifications(4, env);
Assert.assertEquals(0, countNotifications(env));
http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/b2c91b95/modules/integration/src/test/java/org/apache/fluo/integration/impl/ObserverConfigIT.java
----------------------------------------------------------------------
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/impl/ObserverConfigIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/impl/ObserverConfigIT.java
index 79b70e8..9766f9b 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/impl/ObserverConfigIT.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/impl/ObserverConfigIT.java
@@ -20,24 +20,20 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.fluo.api.client.Snapshot;
+import org.apache.fluo.api.client.Transaction;
import org.apache.fluo.api.client.TransactionBase;
import org.apache.fluo.api.config.ObserverConfiguration;
import org.apache.fluo.api.data.Bytes;
import org.apache.fluo.api.data.Column;
import org.apache.fluo.api.observer.AbstractObserver;
import org.apache.fluo.api.observer.Observer.NotificationType;
-import org.apache.fluo.api.types.StringEncoder;
-import org.apache.fluo.api.types.TypeLayer;
-import org.apache.fluo.api.types.TypedSnapshot;
-import org.apache.fluo.api.types.TypedTransaction;
import org.apache.fluo.integration.ITBaseMini;
import org.junit.Assert;
import org.junit.Test;
public class ObserverConfigIT extends ITBaseMini {
- private static TypeLayer tl = new TypeLayer(new StringEncoder());
-
public static class ConfigurableObserver extends AbstractObserver {
private ObservedColumn observedColumn;
@@ -48,7 +44,7 @@ public class ObserverConfigIT extends ITBaseMini {
public void init(Context context) {
String ocTokens[] = context.getParameters().get("observedCol").split(":");
observedColumn =
- new ObservedColumn(tl.bc().fam(ocTokens[0]).qual(ocTokens[1]).vis(),
+ new ObservedColumn(new Column(ocTokens[0], ocTokens[1]),
NotificationType.valueOf(ocTokens[2]));
outputCQ = Bytes.of(context.getParameters().get("outputCQ"));
String swn = context.getParameters().get("setWeakNotification");
@@ -78,7 +74,7 @@ public class ObserverConfigIT extends ITBaseMini {
}
}
- Map<String, String> newMap(String... args) {
+ private Map<String, String> newMap(String... args) {
HashMap<String, String> ret = new HashMap<>();
for (int i = 0; i < args.length; i += 2) {
ret.put(args[i], args[i + 1]);
@@ -107,18 +103,18 @@ public class ObserverConfigIT extends ITBaseMini {
@Test
public void testObserverConfig() throws Exception {
- try (TypedTransaction tx1 = tl.wrap(client.newTransaction())) {
- tx1.mutate().row("r1").fam("fam1").qual("col1").set("abcdefg");
+ try (Transaction tx1 = client.newTransaction()) {
+ tx1.set("r1", new Column("fam1", "col1"), "abcdefg");
tx1.commit();
}
miniFluo.waitForObservers();
- try (TypedSnapshot tx2 = tl.wrap(client.newSnapshot())) {
- Assert.assertNull(tx2.get().row("r1").fam("fam1").qual("col1").toString());
- Assert.assertNull(tx2.get().row("r1").fam("fam1").qual("col2").toString());
- Assert.assertNull(tx2.get().row("r1").fam("fam1").qual("col3").toString());
- Assert.assertEquals("abcdefg", tx2.get().row("r1").fam("fam1").qual("col4").toString());
+ try (Snapshot tx2 = client.newSnapshot()) {
+ Assert.assertNull(tx2.gets("r1", new Column("fam1", "col1")));
+ Assert.assertNull(tx2.gets("r1", new Column("fam1", "col2")));
+ Assert.assertNull(tx2.gets("r1", new Column("fam1", "col3")));
+ Assert.assertEquals("abcdefg", tx2.gets("r1", new Column("fam1", "col4")));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/b2c91b95/modules/integration/src/test/java/org/apache/fluo/integration/impl/OracleIT.java
----------------------------------------------------------------------
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/impl/OracleIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/impl/OracleIT.java
index 857b48d..91636f7 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/impl/OracleIT.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/impl/OracleIT.java
@@ -42,9 +42,6 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
-/**
- *
- */
public class OracleIT extends ITBaseImpl {
@Test