You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by mw...@apache.org on 2017/01/25 17:24:23 UTC
[4/5] accumulo-testing git commit: ACCUMULO-4510 Refactored Continous
Ingest tests
http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/fc3ddfc4/continuous/start-stats.sh
----------------------------------------------------------------------
diff --git a/continuous/start-stats.sh b/continuous/start-stats.sh
deleted file mode 100755
index 0a90364..0000000
--- a/continuous/start-stats.sh
+++ /dev/null
@@ -1,49 +0,0 @@
-#! /usr/bin/env bash
-
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-# Start: Resolve Script Directory
-SOURCE="${BASH_SOURCE[0]}"
-while [[ -h "${SOURCE}" ]]; do # resolve $SOURCE until the file is no longer a symlink
- bin=$( cd -P "$( dirname "${SOURCE}" )" && pwd )
- SOURCE=$(readlink "${SOURCE}")
- [[ "${SOURCE}" != /* ]] && SOURCE="${bin}/${SOURCE}" # if $SOURCE was a relative symlink, we need to resolve it relative to the path where the symlink file was located
-done
-bin=$( cd -P "$( dirname "${SOURCE}" )" && pwd )
-script=$( basename "${SOURCE}" )
-# Stop: Resolve Script Directory
-
-CONTINUOUS_CONF_DIR=${CONTINUOUS_CONF_DIR:-${bin}}
-. "$CONTINUOUS_CONF_DIR/continuous-env.sh"
-
-mkdir -p "$CONTINUOUS_LOG_DIR"
-
-CONFIG_OUT=$CONTINUOUS_LOG_DIR/$(date +%Y%m%d%H%M%S)_$(hostname)_config.out
-
-cat "$ACCUMULO_CONF_DIR/accumulo-env.sh" > "$CONFIG_OUT"
-echo >> "$CONFIG_OUT"
-echo -e "config -np\nconfig -t $TABLE -np\nquit" | "$ACCUMULO_HOME/bin/accumulo" shell -u "$USER" -p "$PASS" >> "$CONFIG_OUT"
-echo >> "$CONFIG_OUT"
-cat "$CONTINUOUS_CONF_DIR/continuous-env.sh" >> "$CONFIG_OUT"
-echo >> "$CONFIG_OUT"
-wc -l "$CONTINUOUS_CONF_DIR/walkers.txt" >> "$CONFIG_OUT"
-wc -l "$CONTINUOUS_CONF_DIR/ingesters.txt" >> "$CONFIG_OUT"
-wc -l "$CONTINUOUS_CONF_DIR/scanners.txt" >> "$CONFIG_OUT"
-wc -l "$CONTINUOUS_CONF_DIR/batch_walkers.txt" >> "$CONFIG_OUT"
-
-
-nohup "$ACCUMULO_HOME/bin/accumulo" org.apache.accumulo.test.continuous.ContinuousStatsCollector --table "$TABLE" -i "$INSTANCE_NAME" -z "$ZOO_KEEPERS" -u "$USER" -p "$PASS" >"$CONTINUOUS_LOG_DIR/$(date +%Y%m%d%H%M%S)_$(hostname)_stats.out" 2>"$CONTINUOUS_LOG_DIR/$(date +%Y%m%d%H%M%S)_$(hostname)_stats.err" &
-
http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/fc3ddfc4/continuous/start-walkers.sh
----------------------------------------------------------------------
diff --git a/continuous/start-walkers.sh b/continuous/start-walkers.sh
deleted file mode 100755
index d9bbff4..0000000
--- a/continuous/start-walkers.sh
+++ /dev/null
@@ -1,41 +0,0 @@
-#! /usr/bin/env bash
-
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-# Start: Resolve Script Directory
-SOURCE="${BASH_SOURCE[0]}"
-while [[ -h "${SOURCE}" ]]; do # resolve $SOURCE until the file is no longer a symlink
- bin=$( cd -P "$( dirname "${SOURCE}" )" && pwd )
- SOURCE=$(readlink "${SOURCE}")
- [[ "${SOURCE}" != /* ]] && SOURCE="${bin}/${SOURCE}" # if $SOURCE was a relative symlink, we need to resolve it relative to the path where the symlink file was located
-done
-bin=$( cd -P "$( dirname "${SOURCE}" )" && pwd )
-script=$( basename "${SOURCE}" )
-# Stop: Resolve Script Directory
-
-CONTINUOUS_CONF_DIR=${CONTINUOUS_CONF_DIR:-${bin}}
-. "$CONTINUOUS_CONF_DIR/continuous-env.sh"
-
-DEBUG_OPT=''
-if [[ "$DEBUG_WALKER" == "on" ]] ; then
- DEBUG_OPT="--debug $CONTINUOUS_LOG_DIR/\`date +%Y%m%d%H%M%S\`_\`hostname\`_walk.log";
-fi
-
-AUTH_OPT=''
-[[ -n "$AUTHS" ]] && AUTH_OPT="--auths \"$AUTHS\""
-
-pssh -h "$CONTINUOUS_CONF_DIR/walkers.txt" "mkdir -p $CONTINUOUS_LOG_DIR; nohup $ACCUMULO_HOME/bin/accumulo org.apache.accumulo.test.continuous.ContinuousWalk $DEBUG_OPT $AUTH_OPT -i $INSTANCE_NAME -z $ZOO_KEEPERS -u $USER -p $PASS --table $TABLE --min $MIN --max $MAX --sleep $SLEEP_TIME >$CONTINUOUS_LOG_DIR/\`date +%Y%m%d%H%M%S\`_\`hostname\`_walk.out 2>$CONTINUOUS_LOG_DIR/\`date +%Y%m%d%H%M%S\`_\`hostname\`_walk.err &" < /dev/null
-
http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/fc3ddfc4/continuous/stop-agitator.sh
----------------------------------------------------------------------
diff --git a/continuous/stop-agitator.sh b/continuous/stop-agitator.sh
deleted file mode 100755
index d8f30e4..0000000
--- a/continuous/stop-agitator.sh
+++ /dev/null
@@ -1,51 +0,0 @@
-#! /usr/bin/env bash
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-# Start: Resolve Script Directory
-SOURCE="${BASH_SOURCE[0]}"
-while [[ -h "${SOURCE}" ]]; do # resolve $SOURCE until the file is no longer a symlink
- bin=$( cd -P "$( dirname "${SOURCE}" )" && pwd )
- SOURCE=$(readlink "${SOURCE}")
- [[ "${SOURCE}" != /* ]] && SOURCE="${bin}/${SOURCE}" # if $SOURCE was a relative symlink, we need to resolve it relative to the path where the symlink file was located
-done
-bin=$( cd -P "$( dirname "${SOURCE}" )" && pwd )
-script=$( basename "${SOURCE}" )
-# Stop: Resolve Script Directory
-
-CONTINUOUS_CONF_DIR=${CONTINUOUS_CONF_DIR:-${bin}}
-. "$CONTINUOUS_CONF_DIR/continuous-env.sh"
-
-# Try to use sudo when we wouldn't normally be able to kill the processes
-[[ -n $AGITATOR_USER ]] || AGITATOR_USER=$(whoami)
-if [[ $AGITATOR_USER == root ]]; then
- echo "Stopping all processes matching 'agitator.pl' as root"
- pkill -f agitator.pl 2>/dev/null
-elif [[ $AGITATOR_USER == "$ACCUMULO_USER" ]]; then
- echo "Stopping all processes matching 'datanode-agitator.pl' as $HDFS_USER"
- sudo -u "$HDFS_USER" pkill -f datanode-agitator.pl 2>/dev/null
- echo "Stopping all processes matching 'hdfs-agitator.pl' as $HDFS_USER"
- sudo -u "$HDFS_USER" pkill -f hdfs-agitator.pl 2>/dev/null
- echo "Stopping all processes matching 'agitator.pl' as $AGITATOR_USER"
- pkill -f agitator.pl 2>/dev/null 2>/dev/null
-else
- echo "Stopping all processes matching 'datanode-agitator.pl' as $HDFS_USER"
- sudo -u "$HDFS_USER" pkill -f datanode-agitator.pl 2>/dev/null
- echo "Stopping all processes matching 'hdfs-agitator.pl' as $HDFS_USER"
- sudo -u "$HDFS_USER" pkill -f hdfs-agitator.pl 2>/dev/null
- echo "Stopping all processes matching 'agitator.pl' as $ACCUMULO_USER"
- sudo -u "$ACCUMULO_USER" pkill -f agitator.pl 2>/dev/null
-fi
-
http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/fc3ddfc4/continuous/stop-batchwalkers.sh
----------------------------------------------------------------------
diff --git a/continuous/stop-batchwalkers.sh b/continuous/stop-batchwalkers.sh
deleted file mode 100755
index 4696387..0000000
--- a/continuous/stop-batchwalkers.sh
+++ /dev/null
@@ -1,33 +0,0 @@
-#! /usr/bin/env bash
-
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-# Start: Resolve Script Directory
-SOURCE="${BASH_SOURCE[0]}"
-while [[ -h "${SOURCE}" ]]; do # resolve $SOURCE until the file is no longer a symlink
- bin=$( cd -P "$( dirname "${SOURCE}" )" && pwd )
- SOURCE=$(readlink "${SOURCE}")
- [[ "${SOURCE}" != /* ]] && SOURCE="${bin}/${SOURCE}" # if $SOURCE was a relative symlink, we need to resolve it relative to the path where the symlink file was located
-done
-bin=$( cd -P "$( dirname "${SOURCE}" )" && pwd )
-script=$( basename "${SOURCE}" )
-# Stop: Resolve Script Directory
-
-CONTINUOUS_CONF_DIR=${CONTINUOUS_CONF_DIR:-${bin}}
-. "$CONTINUOUS_CONF_DIR/continuous-env.sh"
-
-pssh -h "$CONTINUOUS_CONF_DIR/batch_walkers.txt" "pkill -f '[o]rg.apache.accumulo.test.continuous.ContinuousBatchWalker'" < /dev/null
-
http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/fc3ddfc4/continuous/stop-ingest.sh
----------------------------------------------------------------------
diff --git a/continuous/stop-ingest.sh b/continuous/stop-ingest.sh
deleted file mode 100755
index d159bf7..0000000
--- a/continuous/stop-ingest.sh
+++ /dev/null
@@ -1,33 +0,0 @@
-#! /usr/bin/env bash
-
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-# Start: Resolve Script Directory
-SOURCE="${BASH_SOURCE[0]}"
-while [[ -h "${SOURCE}" ]]; do # resolve $SOURCE until the file is no longer a symlink
- bin=$( cd -P "$( dirname "${SOURCE}" )" && pwd )
- SOURCE=$(readlink "${SOURCE}")
- [[ "${SOURCE}" != /* ]] && SOURCE="${bin}/${SOURCE}" # if $SOURCE was a relative symlink, we need to resolve it relative to the path where the symlink file was located
-done
-bin=$( cd -P "$( dirname "${SOURCE}" )" && pwd )
-script=$( basename "${SOURCE}" )
-# Stop: Resolve Script Directory
-
-CONTINUOUS_CONF_DIR=${CONTINUOUS_CONF_DIR:-${bin}}
-. "$CONTINUOUS_CONF_DIR/continuous-env.sh"
-
-pssh -h "$CONTINUOUS_CONF_DIR/ingesters.txt" "pkill -f '[o]rg.apache.accumulo.test.continuous.ContinuousIngest'" < /dev/null
-
http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/fc3ddfc4/continuous/stop-scanners.sh
----------------------------------------------------------------------
diff --git a/continuous/stop-scanners.sh b/continuous/stop-scanners.sh
deleted file mode 100755
index cf927b0..0000000
--- a/continuous/stop-scanners.sh
+++ /dev/null
@@ -1,33 +0,0 @@
-#! /usr/bin/env bash
-
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-# Start: Resolve Script Directory
-SOURCE="${BASH_SOURCE[0]}"
-while [[ -h "${SOURCE}" ]]; do # resolve $SOURCE until the file is no longer a symlink
- bin=$( cd -P "$( dirname "${SOURCE}" )" && pwd )
- SOURCE=$(readlink "${SOURCE}")
- [[ "${SOURCE}" != /* ]] && SOURCE="${bin}/${SOURCE}" # if $SOURCE was a relative symlink, we need to resolve it relative to the path where the symlink file was located
-done
-bin=$( cd -P "$( dirname "${SOURCE}" )" && pwd )
-script=$( basename "${SOURCE}" )
-# Stop: Resolve Script Directory
-
-CONTINUOUS_CONF_DIR=${CONTINUOUS_CONF_DIR:-${bin}}
-. "$CONTINUOUS_CONF_DIR/continuous-env.sh"
-
-pssh -h "$CONTINUOUS_CONF_DIR/scanners.txt" "pkill -f '[o]rg.apache.accumulo.test.continuous.ContinuousScanner'" < /dev/null
-
http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/fc3ddfc4/continuous/stop-stats.sh
----------------------------------------------------------------------
diff --git a/continuous/stop-stats.sh b/continuous/stop-stats.sh
deleted file mode 100755
index 9886eec..0000000
--- a/continuous/stop-stats.sh
+++ /dev/null
@@ -1,33 +0,0 @@
-#! /usr/bin/env bash
-
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-# Start: Resolve Script Directory
-SOURCE="${BASH_SOURCE[0]}"
-while [[ -h "${SOURCE}" ]]; do # resolve $SOURCE until the file is no longer a symlink
- bin=$( cd -P "$( dirname "${SOURCE}" )" && pwd )
- SOURCE=$(readlink "${SOURCE}")
- [[ "${SOURCE}" != /* ]] && SOURCE="${bin}/${SOURCE}" # if $SOURCE was a relative symlink, we need to resolve it relative to the path where the symlink file was located
-done
-bin=$( cd -P "$( dirname "${SOURCE}" )" && pwd )
-script=$( basename "${SOURCE}" )
-# Stop: Resolve Script Directory
-
-CONTINUOUS_CONF_DIR=${CONTINUOUS_CONF_DIR:-${bin}}
-. "$CONTINUOUS_CONF_DIR/continuous-env.sh"
-
-pkill -f org.apache.accumulo.test.continuous.ContinuousStatsCollector
-
http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/fc3ddfc4/continuous/stop-walkers.sh
----------------------------------------------------------------------
diff --git a/continuous/stop-walkers.sh b/continuous/stop-walkers.sh
deleted file mode 100755
index 2c22cfa..0000000
--- a/continuous/stop-walkers.sh
+++ /dev/null
@@ -1,33 +0,0 @@
-#! /usr/bin/env bash
-
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-# Start: Resolve Script Directory
-SOURCE="${BASH_SOURCE[0]}"
-while [[ -h "${SOURCE}" ]]; do # resolve $SOURCE until the file is no longer a symlink
- bin=$( cd -P "$( dirname "${SOURCE}" )" && pwd )
- SOURCE=$(readlink "${SOURCE}")
- [[ "${SOURCE}" != /* ]] && SOURCE="${bin}/${SOURCE}" # if $SOURCE was a relative symlink, we need to resolve it relative to the path where the symlink file was located
-done
-bin=$( cd -P "$( dirname "${SOURCE}" )" && pwd )
-script=$( basename "${SOURCE}" )
-# Stop: Resolve Script Directory
-
-CONTINUOUS_CONF_DIR=${CONTINUOUS_CONF_DIR:-${bin}}
-. "$CONTINUOUS_CONF_DIR/continuous-env.sh"
-
-pssh -h "$CONTINUOUS_CONF_DIR/walkers.txt" "pkill -f '[o]rg.apache.accumulo.test.continuous.ContinuousWalk'" < /dev/null
-
http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/fc3ddfc4/continuous/tserver-agitator.pl
----------------------------------------------------------------------
diff --git a/continuous/tserver-agitator.pl b/continuous/tserver-agitator.pl
deleted file mode 100755
index 0e65a50..0000000
--- a/continuous/tserver-agitator.pl
+++ /dev/null
@@ -1,134 +0,0 @@
-#! /usr/bin/env perl
-
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-use POSIX qw(strftime);
-use Cwd qw();
-
-if(scalar(@ARGV) != 4 && scalar(@ARGV) != 2){
- print "Usage : tserver-agitator.pl <min sleep before kill in minutes>[:max sleep before kill in minutes] <min sleep before tup in minutes>[:max sleep before tup in minutes] [<min kill> <max kill>]\n";
- exit(1);
-}
-
-my $ACCUMULO_HOME;
-if( defined $ENV{'ACCUMULO_HOME'} ){
- $ACCUMULO_HOME = $ENV{'ACCUMULO_HOME'};
-} else {
- $cwd=Cwd::cwd();
- $ACCUMULO_HOME=$cwd . '/../../..';
-}
-
-print "ACCUMULO_HOME=$ACCUMULO_HOME\n";
-
-@sleeprange1 = split(/:/, $ARGV[0]);
-$sleep1 = $sleeprange1[0];
-
-@sleeprange2 = split(/:/, $ARGV[1]);
-$sleep2 = $sleeprange2[0];
-
-if (scalar(@sleeprange1) > 1) {
- $sleep1max = $sleeprange1[1] + 1;
-} else {
- $sleep1max = $sleep1;
-}
-
-if ($sleep1 > $sleep1max) {
- die("sleep1 > sleep1max $sleep1 > $sleep1max");
-}
-
-if (scalar(@sleeprange2) > 1) {
- $sleep2max = $sleeprange2[1] + 1;
-} else {
- $sleep2max = $sleep2;
-}
-
-if($sleep2 > $sleep2max){
- die("sleep2 > sleep2max $sleep2 > $sleep2max");
-}
-
-if(defined $ENV{'ACCUMULO_CONF_DIR'}){
- $ACCUMULO_CONF_DIR = $ENV{'ACCUMULO_CONF_DIR'};
-}else{
- $ACCUMULO_CONF_DIR = $ACCUMULO_HOME . '/conf';
-}
-
-if(scalar(@ARGV) == 4){
- $minKill = $ARGV[2];
- $maxKill = $ARGV[3];
-}else{
- $minKill = 1;
- $maxKill = 1;
-}
-
-if($minKill > $maxKill){
- die("minKill > maxKill $minKill > $maxKill");
-}
-
-@tserversRaw = `cat $ACCUMULO_CONF_DIR/tservers`;
-chomp(@tserversRaw);
-
-for $tserver (@tserversRaw){
- if($tserver eq "" || substr($tserver,0,1) eq "#"){
- next;
- }
-
- push(@tservers, $tserver);
-}
-
-
-if(scalar(@tservers) < $maxKill){
- print STDERR "WARN setting maxKill to ".scalar(@tservers)."\n";
- $maxKill = scalar(@tservers);
-}
-
-if ($minKill > $maxKill){
- print STDERR "WARN setting minKill to equal maxKill\n";
- $minKill = $maxKill;
-}
-
-while(1){
-
- $numToKill = int(rand($maxKill - $minKill + 1)) + $minKill;
- %killed = {};
- $server = "";
-
- for($i = 0; $i < $numToKill; $i++){
- while($server eq "" || $killed{$server} != undef){
- $index = int(rand(scalar(@tservers)));
- $server = $tservers[$index];
- }
-
- $killed{$server} = 1;
-
- $t = strftime "%Y%m%d %H:%M:%S", localtime;
-
- print STDERR "$t Killing tserver on $server\n";
- # We're the accumulo user, just run the commandj
- system("$ACCUMULO_HOME/bin/stop-server.sh $server 'accumulo-start.jar' tserver KILL");
- }
-
- $nextsleep2 = int(rand($sleep2max - $sleep2)) + $sleep2;
- sleep($nextsleep2 * 60);
- $t = strftime "%Y%m%d %H:%M:%S", localtime;
- print STDERR "$t Running tup\n";
- # restart the as them as the accumulo user
- system("$ACCUMULO_HOME/bin/tup.sh");
-
- $nextsleep1 = int(rand($sleep1max - $sleep1)) + $sleep1;
- sleep($nextsleep1 * 60);
-}
-
http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/fc3ddfc4/continuous/walkers.txt.example
----------------------------------------------------------------------
diff --git a/continuous/walkers.txt.example b/continuous/walkers.txt.example
deleted file mode 100644
index b59052d..0000000
--- a/continuous/walkers.txt.example
+++ /dev/null
@@ -1,17 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-host3
-host4
http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/fc3ddfc4/core/src/main/java/org/apache/accumulo/testing/core/TestEnv.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/testing/core/TestEnv.java b/core/src/main/java/org/apache/accumulo/testing/core/TestEnv.java
new file mode 100644
index 0000000..55fecb7
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/testing/core/TestEnv.java
@@ -0,0 +1,179 @@
+package org.apache.accumulo.testing.core;
+
+import static java.util.Objects.requireNonNull;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.ClientConfiguration;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
+import org.apache.accumulo.core.client.security.tokens.KerberosToken;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.tools.CLI;
+import org.apache.hadoop.security.UserGroupInformation;
+
+public class TestEnv {
+
+ protected final Properties p;
+ private Instance instance = null;
+ private Connector connector = null;
+
+ /**
+ * Creates new test environment using provided properties
+ *
+ * @param p Properties
+ */
+ public TestEnv(Properties p) {
+ requireNonNull(p);
+ this.p = p;
+ }
+
+ /**
+ * Gets a copy of the configuration properties.
+ *
+ * @return a copy of the configuration properties
+ */
+ public Properties copyConfigProperties() {
+ return new Properties(p);
+ }
+
+ /**
+ * Gets a configuration property.
+ *
+ * @param key key
+ * @return property value
+ */
+ public String getConfigProperty(String key) {
+ return p.getProperty(key);
+ }
+
+ /**
+ * Gets the configured username.
+ *
+ * @return username
+ */
+ public String getAccumuloUserName() {
+ return p.getProperty(TestProps.ACCUMULO_USERNAME);
+ }
+
+ /**
+ * Gets the configured password.
+ *
+ * @return password
+ */
+ public String getAccumuloPassword() {
+ return p.getProperty(TestProps.ACCUMULO_PASSWORD);
+ }
+
+ /**
+ * Gets the configured keytab.
+ *
+ * @return path to keytab
+ */
+ public String getAccumuloKeytab() {
+ return p.getProperty(TestProps.ACCUMULO_KEYTAB);
+ }
+
+ /**
+ * Gets this process's ID.
+ *
+ * @return pid
+ */
+ public String getPid() {
+ return ManagementFactory.getRuntimeMXBean().getName().split("@")[0];
+ }
+
+
+ public Configuration getHadoopConfiguration() {
+ Configuration config = new Configuration();
+ config.set("mapreduce.framework.name", "yarn");
+ // Setting below are required due to bundled jar breaking default config.
+ // See http://stackoverflow.com/questions/17265002/hadoop-no-filesystem-for-scheme-file
+ config.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
+ config.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());
+ return config;
+ }
+
+ /**
+ * Gets an authentication token based on the configured password.
+ */
+ public AuthenticationToken getToken() {
+ String password = getAccumuloPassword();
+ if (null != password) {
+ return new PasswordToken(getAccumuloPassword());
+ }
+ String keytab = getAccumuloKeytab();
+ if (null != keytab) {
+ File keytabFile = new File(keytab);
+ if (!keytabFile.exists() || !keytabFile.isFile()) {
+ throw new IllegalArgumentException("Provided keytab is not a normal file: " + keytab);
+ }
+ try {
+ UserGroupInformation.loginUserFromKeytab(getAccumuloUserName(), keytabFile.getAbsolutePath());
+ return new KerberosToken();
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to login", e);
+ }
+ }
+ throw new IllegalArgumentException("Must provide password or keytab in configuration");
+ }
+
+ public String getAccumuloInstanceName() {
+ return p.getProperty(TestProps.ACCUMULO_INSTANCE);
+ }
+
+ public String getZookeepers() {
+ return p.getProperty(TestProps.ZOOKEEPERS);
+ }
+
+ public ClientConfiguration getClientConfiguration() {
+ return ClientConfiguration.loadDefault().withInstance(getAccumuloInstanceName())
+ .withZkHosts(getZookeepers());
+ }
+
+ /**
+ * Gets an Accumulo instance object. The same instance is reused after the first call.
+ */
+ public Instance getAccumuloInstance() {
+ if (instance == null) {
+ this.instance = new ZooKeeperInstance(getClientConfiguration());
+ }
+ return instance;
+ }
+
+ /**
+ * Gets an Accumulo connector. The same connector is reused after the first call.
+ */
+ public Connector getAccumuloConnector() throws AccumuloException, AccumuloSecurityException {
+ if (connector == null) {
+ connector = getAccumuloInstance().getConnector(getAccumuloUserName(), getToken());
+ }
+ return connector;
+ }
+
+ public BatchWriterConfig getBatchWriterConfig() {
+ int numThreads = Integer.parseInt(p.getProperty(TestProps.BW_NUM_THREADS));
+ long maxLatency = Long.parseLong(p.getProperty(TestProps.BW_MAX_LATENCY_MS));
+ long maxMemory = Long.parseLong(p.getProperty(TestProps.BW_MAX_MEM_BYTES));
+
+ BatchWriterConfig config = new BatchWriterConfig();
+ config.setMaxWriteThreads(numThreads);
+ config.setMaxLatency(maxLatency, TimeUnit.MILLISECONDS);
+ config.setMaxMemory(maxMemory);
+ return config;
+ }
+
+ public int getScannerBatchSize() {
+ return Integer.parseInt(p.getProperty(TestProps.SCANNER_BATCH_SIZE));
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/fc3ddfc4/core/src/main/java/org/apache/accumulo/testing/core/TestProps.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/testing/core/TestProps.java b/core/src/main/java/org/apache/accumulo/testing/core/TestProps.java
index f8ce9ca..e134c7f 100644
--- a/core/src/main/java/org/apache/accumulo/testing/core/TestProps.java
+++ b/core/src/main/java/org/apache/accumulo/testing/core/TestProps.java
@@ -17,11 +17,21 @@
package org.apache.accumulo.testing.core;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.Properties;
+
public class TestProps {
private static final String PREFIX = "test.";
- private static final String RANDOMWALK = PREFIX + "randomwalk.";
private static final String COMMON = PREFIX + "common.";
+ private static final String CI = PREFIX + "ci.";
+ private static final String CI_COMMON = CI + "common.";
+ private static final String CI_INGEST = CI + "ingest.";
+ private static final String CI_WALKER = CI + "walker.";
+ private static final String CI_BW = CI + "batch.walker.";
+ private static final String CI_SCANNER = CI + "scanner.";
+ private static final String CI_VERIFY = CI + "verify.";
/** Common properties **/
// Zookeeper connection string
@@ -38,16 +48,80 @@ public class TestProps {
public static final String YARN_CONTAINER_MEMORY_MB = COMMON + "yarn.container.memory.mb";
// Number of cores given to each YARN container
public static final String YARN_CONTAINER_CORES = COMMON + "yarn.container.cores";
+ // Max memory (in bytes) each batch writer will use to buffer writes
+ public static final String BW_MAX_MEM_BYTES = COMMON + "bw.max.memory.bytes";
+ // Max the maximum time (in ms) each batch writer will buffer data
+ public static final String BW_MAX_LATENCY_MS = COMMON + "bw.max.latency.ms";
+ // Number of threads each batch writer will use to write data
+ public static final String BW_NUM_THREADS = COMMON + "bw.num.threads";
+ // Number of thread for each batch scanner
+ public static final String BS_NUM_THREADS = COMMON + "bw.num.threads";
+ // Number of key/value entries to pull during scan
+ public static final String SCANNER_BATCH_SIZE = COMMON + "scanner.batch.size";
+
+ /** Continuous ingest test properties **/
+ /** Common **/
+ // Accumulo table used by continuous ingest tests
+ public static final String CI_COMMON_ACCUMULO_TABLE = CI_COMMON + "accumulo.table";
+ // Number of tablets that should exist in Accumulo table when created
+ public static final String CI_COMMON_ACCUMULO_NUM_TABLETS = CI_COMMON + "accumulo.num.tablets";
+ // Optional authorizations (in CSV format) that if specified will be randomly selected by scanners
+ // and walkers
+ public static final String CI_COMMON_AUTHS = CI_COMMON + "auths";
+
+ /** Ingest **/
+ // Number of entries each ingest client should write
+ public static final String CI_INGEST_CLIENT_ENTRIES = CI_INGEST + "client.entries";
+ // Minimum random row to generate
+ public static final String CI_INGEST_ROW_MIN = CI_INGEST + "row.min";
+ // Maximum random row to generate
+ public static final String CI_INGEST_ROW_MAX = CI_INGEST + "row.max";
+ // Maximum number of random column families to generate
+ public static final String CI_INGEST_MAX_CF = CI_INGEST + "max.cf";
+ // Maximum number of random column qualifiers to generate
+ public static final String CI_INGEST_MAX_CQ = CI_INGEST + "max.cq";
+ // Optional visibilities (in CSV format) that if specified will be randomly selected by ingesters for
+ // each linked list
+ public static final String CI_INGEST_VISIBILITIES = CI_INGEST + "visibilities";
+ // Checksums will be generated during ingest if set to true
+ public static final String CI_INGEST_CHECKSUM = CI_INGEST + "checksum";
+
+ /** Batch Walker **/
+ // Sleep time between batch scans (in ms)
+ public static final String CI_BW_SLEEP_MS = CI_BW + "sleep.ms";
+ // Scan batch size
+ public static final String CI_BW_BATCH_SIZE = CI_BW + "batch.size";
+
+ /** Walker **/
+ // Sleep time between scans (in ms)
+ public static final String CI_WALKER_SLEEP_MS = CI_WALKER + "sleep.ms";
+
+ /** Scanner **/
+ // Sleep time between scans (in ms)
+ public static final String CI_SCANNER_SLEEP_MS = CI_SCANNER + "sleep.ms";
+ // Scanner entries
+ public static final String CI_SCANNER_ENTRIES = CI_SCANNER + "entries";
+ /** Verify **/
+ // Maximum number of mapreduce mappers
+ public static final String CI_VERIFY_MAX_MAPS = CI_VERIFY + "max.maps";
+ // Number of mapreduce reducers
+ public static final String CI_VERIFY_REDUCERS = CI_VERIFY + "reducers";
+ // Perform the verification directly on the files while the table is offline"
+ public static final String CI_VERIFY_SCAN_OFFLINE = CI_VERIFY + "scan.offline";
+ // Comma separated list of auths to use for verify
+ public static final String CI_VERIFY_AUTHS = CI_VERIFY + "auths";
+ // Location in HDFS to store output
+ public static final String CI_VERIFY_OUTPUT_DIR = CI_VERIFY + "output.dir";
- /** Random walk properties **/
- // Number of random walker (if running in YARN)
- public static final String RW_NUM_WALKERS = RANDOMWALK + "num.walkers";
- // Max memory for multi-table batch writer
- public static final String RW_BW_MAX_MEM = RANDOMWALK + "bw.max.mem";
- // Max latency in milliseconds for multi-table batch writer
- public static final String RW_BW_MAX_LATENCY = RANDOMWALK + "bw.max.latency";
- // Number of write thread for multi-table batch writer
- public static final String RW_BW_NUM_THREADS = RANDOMWALK + "bw.num.threads";
+ public static Properties loadFromFile(String propsFilePath) throws IOException {
+ return loadFromStream(new FileInputStream(propsFilePath));
+ }
+ public static Properties loadFromStream(FileInputStream fis) throws IOException {
+ Properties props = new Properties();
+ props.load(fis);
+ fis.close();
+ return props;
+ }
}
http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/fc3ddfc4/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousBatchWalker.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousBatchWalker.java b/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousBatchWalker.java
index e89f2eb..0282c2b 100644
--- a/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousBatchWalker.java
+++ b/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousBatchWalker.java
@@ -21,13 +21,11 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
+import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.TimeUnit;
-import org.apache.accumulo.core.cli.BatchScannerOpts;
-import org.apache.accumulo.core.cli.ClientOnDefaultTable;
-import org.apache.accumulo.core.cli.ScannerOpts;
import org.apache.accumulo.core.client.BatchScanner;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.Scanner;
@@ -35,50 +33,44 @@ import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.testing.core.TestProps;
import org.apache.hadoop.io.Text;
-import com.beust.jcommander.Parameter;
-import com.beust.jcommander.validators.PositiveInteger;
import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
public class ContinuousBatchWalker {
- static class Opts extends ContinuousWalk.Opts {
- @Parameter(names = "--numToScan", description = "Number rows to scan between sleeps", required = true, validateWith = PositiveInteger.class)
- long numToScan = 0;
- }
-
public static void main(String[] args) throws Exception {
- Opts opts = new Opts();
- ScannerOpts scanOpts = new ScannerOpts();
- BatchScannerOpts bsOpts = new BatchScannerOpts();
- ClientOnDefaultTable clientOpts = new ClientOnDefaultTable("ci");
- clientOpts.parseArgs(ContinuousBatchWalker.class.getName(), args, scanOpts, bsOpts, opts);
+ Properties props = TestProps.loadFromFile(args[0]);
+
+ ContinuousEnv env = new ContinuousEnv(props);
+
+ Authorizations auths = env.getRandomAuthorizations();
+ Connector conn = env.getAccumuloConnector();
+ Scanner scanner = ContinuousUtil.createScanner(conn, env.getAccumuloTableName(), auths);
+ int scanBatchSize = Integer.parseInt(props.getProperty(TestProps.CI_BW_BATCH_SIZE));
+ scanner.setBatchSize(scanBatchSize);
Random r = new Random();
- Authorizations auths = opts.randomAuths.getAuths(r);
- Connector conn = clientOpts.getConnector();
- Scanner scanner = ContinuousUtil.createScanner(conn, clientOpts.getTableName(), auths);
- scanner.setBatchSize(scanOpts.scanBatchSize);
+ int scanThreads = Integer.parseInt(props.getProperty(TestProps.BS_NUM_THREADS));
while (true) {
- BatchScanner bs = conn.createBatchScanner(clientOpts.getTableName(), auths, bsOpts.scanThreads);
- bs.setTimeout(bsOpts.scanTimeout, TimeUnit.MILLISECONDS);
+ BatchScanner bs = conn.createBatchScanner(env.getAccumuloTableName(), auths, scanThreads);
- Set<Text> batch = getBatch(scanner, opts.min, opts.max, scanOpts.scanBatchSize, r);
+ Set<Text> batch = getBatch(scanner, env.getRowMin(), env.getRowMax(), scanBatchSize, r);
List<Range> ranges = new ArrayList<>(batch.size());
for (Text row : batch) {
ranges.add(new Range(row));
}
- runBatchScan(scanOpts.scanBatchSize, bs, batch, ranges);
+ runBatchScan(scanBatchSize, bs, batch, ranges);
- sleepUninterruptibly(opts.sleepTime, TimeUnit.MILLISECONDS);
+ int bwSleepMs = Integer.parseInt(props.getProperty(TestProps.CI_BW_SLEEP_MS));
+ sleepUninterruptibly(bwSleepMs, TimeUnit.MILLISECONDS);
}
-
}
private static void runBatchScan(int batchSize, BatchScanner bs, Set<Text> batch, List<Range> ranges) {
@@ -117,7 +109,6 @@ public class ContinuousBatchWalker {
} else {
System.out.printf("BRQ %d %d %d %d %d%n", t1, (t2 - t1), rowsSeen.size(), count, (int) (rowsSeen.size() / ((t2 - t1) / 1000.0)));
}
-
}
private static void addRow(int batchSize, Value v) {
@@ -171,5 +162,4 @@ public class ContinuousBatchWalker {
return ret;
}
-
}
http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/fc3ddfc4/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousEnv.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousEnv.java b/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousEnv.java
new file mode 100644
index 0000000..7907ffd
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousEnv.java
@@ -0,0 +1,66 @@
+package org.apache.accumulo.testing.core.continuous;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.Random;
+
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.testing.core.TestEnv;
+import org.apache.accumulo.testing.core.TestProps;
+
+class ContinuousEnv extends TestEnv {
+
+ private List<Authorizations> authList;
+
+ ContinuousEnv(Properties props) {
+ super(props);
+ }
+
+ /**
+ * @return Accumulo authorizations list
+ */
+ private List<Authorizations> getAuthList() {
+ if (authList == null) {
+ String authValue = p.getProperty(TestProps.CI_COMMON_AUTHS);
+ if (authValue == null || authValue.trim().isEmpty()) {
+ authList = Collections.singletonList(Authorizations.EMPTY);
+ } else {
+ authList = new ArrayList<>();
+ for (String a : authValue.split("|")) {
+ authList.add(new Authorizations(a.split(",")));
+ }
+ }
+ }
+ return authList;
+ }
+
+ /**
+ * @return random authorization
+ */
+ Authorizations getRandomAuthorizations() {
+ Random r = new Random();
+ return getAuthList().get(r.nextInt(getAuthList().size()));
+ }
+
+ long getRowMin() {
+ return Long.parseLong(p.getProperty(TestProps.CI_INGEST_ROW_MIN));
+ }
+
+ long getRowMax() {
+ return Long.parseLong(p.getProperty(TestProps.CI_INGEST_ROW_MAX));
+ }
+
+ int getMaxColF() {
+ return Integer.parseInt(p.getProperty(TestProps.CI_INGEST_MAX_CF));
+ }
+
+ int getMaxColQ() {
+ return Integer.parseInt(p.getProperty(TestProps.CI_INGEST_MAX_CQ));
+ }
+
+ String getAccumuloTableName() {
+ return p.getProperty(TestProps.CI_COMMON_ACCUMULO_TABLE);
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/fc3ddfc4/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousIngest.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousIngest.java b/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousIngest.java
index 4681cb8..f260e78 100644
--- a/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousIngest.java
+++ b/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousIngest.java
@@ -18,18 +18,15 @@ package org.apache.accumulo.testing.core.continuous;
import static java.nio.charset.StandardCharsets.UTF_8;
-import java.io.BufferedReader;
-import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.Properties;
import java.util.Random;
import java.util.UUID;
import java.util.zip.CRC32;
import java.util.zip.Checksum;
-import org.apache.accumulo.core.cli.BatchWriterOpts;
-import org.apache.accumulo.core.cli.ClientOnDefaultTable;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.MutationsRejectedException;
@@ -40,9 +37,7 @@ import org.apache.accumulo.core.security.ColumnVisibility;
import org.apache.accumulo.core.trace.CountSampler;
import org.apache.accumulo.core.trace.Trace;
import org.apache.accumulo.core.util.FastFormat;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
+import org.apache.accumulo.testing.core.TestProps;
import org.apache.hadoop.io.Text;
public class ContinuousIngest {
@@ -51,49 +46,44 @@ public class ContinuousIngest {
private static List<ColumnVisibility> visibilities;
- private static void initVisibilities(ContinuousOpts opts) throws Exception {
- if (opts.visFile == null) {
- visibilities = Collections.singletonList(new ColumnVisibility());
- return;
- }
-
- visibilities = new ArrayList<>();
-
- FileSystem fs = FileSystem.get(new Configuration());
- BufferedReader in = new BufferedReader(new InputStreamReader(fs.open(new Path(opts.visFile)), UTF_8));
-
- String line;
-
- while ((line = in.readLine()) != null) {
- visibilities.add(new ColumnVisibility(line));
- }
-
- in.close();
- }
-
private static ColumnVisibility getVisibility(Random rand) {
return visibilities.get(rand.nextInt(visibilities.size()));
}
public static void main(String[] args) throws Exception {
- ContinuousOpts opts = new ContinuousOpts();
- BatchWriterOpts bwOpts = new BatchWriterOpts();
- ClientOnDefaultTable clientOpts = new ClientOnDefaultTable("ci");
- clientOpts.parseArgs(ContinuousIngest.class.getName(), args, bwOpts, opts);
+ if (args.length != 1) {
+ System.err.println("Usage: ContinuousIngest <propsPath>");
+ System.exit(-1);
+ }
+
+ Properties props = TestProps.loadFromFile(args[0]);
+
+ String vis = props.getProperty(TestProps.CI_INGEST_VISIBILITIES);
+ if (vis == null) {
+ visibilities = Collections.singletonList(new ColumnVisibility());
+ } else {
+ visibilities = new ArrayList<>();
+ for (String v : vis.split(",")) {
+ visibilities.add(new ColumnVisibility(v.trim()));
+ }
+ }
- initVisibilities(opts);
+ ContinuousEnv env = new ContinuousEnv(props);
- if (opts.min < 0 || opts.max < 0 || opts.max <= opts.min) {
+ long rowMin = env.getRowMin();
+ long rowMax = env.getRowMax();
+ if (rowMin < 0 || rowMax < 0 || rowMax <= rowMin) {
throw new IllegalArgumentException("bad min and max");
}
- Connector conn = clientOpts.getConnector();
- if (!conn.tableOperations().exists(clientOpts.getTableName())) {
- throw new TableNotFoundException(null, clientOpts.getTableName(), "Consult the README and create the table before starting ingest.");
+ Connector conn = env.getAccumuloConnector();
+ String tableName = env.getAccumuloTableName();
+ if (!conn.tableOperations().exists(tableName)) {
+ throw new TableNotFoundException(null, tableName, "Consult the README and create the table before starting ingest.");
}
- BatchWriter bw = conn.createBatchWriter(clientOpts.getTableName(), bwOpts.getBatchWriterConfig());
+ BatchWriter bw = conn.createBatchWriter(tableName, env.getBatchWriterConfig());
bw = Trace.wrapAll(bw, new CountSampler(1024));
Random r = new Random();
@@ -117,61 +107,65 @@ public class ContinuousIngest {
long lastFlushTime = System.currentTimeMillis();
+ int maxColF = env.getMaxColF();
+ int maxColQ = env.getMaxColQ();
+ boolean checksum = Boolean.parseBoolean(props.getProperty(TestProps.CI_INGEST_CHECKSUM));
+ long numEntries = Long.parseLong(props.getProperty(TestProps.CI_INGEST_CLIENT_ENTRIES));
+
out: while (true) {
// generate first set of nodes
ColumnVisibility cv = getVisibility(r);
for (int index = 0; index < flushInterval; index++) {
- long rowLong = genLong(opts.min, opts.max, r);
+ long rowLong = genLong(rowMin, rowMax, r);
prevRows[index] = rowLong;
firstRows[index] = rowLong;
- int cf = r.nextInt(opts.maxColF);
- int cq = r.nextInt(opts.maxColQ);
+ int cf = r.nextInt(maxColF);
+ int cq = r.nextInt(maxColQ);
firstColFams[index] = cf;
firstColQuals[index] = cq;
- Mutation m = genMutation(rowLong, cf, cq, cv, ingestInstanceId, count, null, r, opts.checksum);
+ Mutation m = genMutation(rowLong, cf, cq, cv, ingestInstanceId, count, null,
+ checksum);
count++;
bw.addMutation(m);
}
lastFlushTime = flush(bw, count, flushInterval, lastFlushTime);
- if (count >= opts.num)
+ if (count >= numEntries)
break out;
// generate subsequent sets of nodes that link to previous set of nodes
for (int depth = 1; depth < maxDepth; depth++) {
for (int index = 0; index < flushInterval; index++) {
- long rowLong = genLong(opts.min, opts.max, r);
+ long rowLong = genLong(rowMin, rowMax, r);
byte[] prevRow = genRow(prevRows[index]);
prevRows[index] = rowLong;
- Mutation m = genMutation(rowLong, r.nextInt(opts.maxColF), r.nextInt(opts.maxColQ), cv, ingestInstanceId, count, prevRow, r, opts.checksum);
+ Mutation m = genMutation(rowLong, r.nextInt(maxColF), r.nextInt(maxColQ), cv, ingestInstanceId, count, prevRow, checksum);
count++;
bw.addMutation(m);
}
lastFlushTime = flush(bw, count, flushInterval, lastFlushTime);
- if (count >= opts.num)
+ if (count >= numEntries)
break out;
}
// create one big linked list, this makes all of the first inserts
// point to something
for (int index = 0; index < flushInterval - 1; index++) {
- Mutation m = genMutation(firstRows[index], firstColFams[index], firstColQuals[index], cv, ingestInstanceId, count, genRow(prevRows[index + 1]), r,
- opts.checksum);
+ Mutation m = genMutation(firstRows[index], firstColFams[index], firstColQuals[index], cv, ingestInstanceId, count, genRow(prevRows[index + 1]), checksum);
count++;
bw.addMutation(m);
}
lastFlushTime = flush(bw, count, flushInterval, lastFlushTime);
- if (count >= opts.num)
+ if (count >= numEntries)
break out;
}
bw.close();
- clientOpts.stopTracing();
}
private static long flush(BatchWriter bw, long count, final int flushInterval, long lastFlushTime) throws MutationsRejectedException {
@@ -183,8 +177,9 @@ public class ContinuousIngest {
return lastFlushTime;
}
- public static Mutation genMutation(long rowLong, int cfInt, int cqInt, ColumnVisibility cv, byte[] ingestInstanceId, long count, byte[] prevRow, Random r,
- boolean checksum) {
+ static Mutation genMutation(long rowLong, int cfInt, int cqInt, ColumnVisibility cv,
+ byte[] ingestInstanceId, long count, byte[] prevRow,
+ boolean checksum) {
// Adler32 is supposed to be faster, but according to wikipedia is not good for small data.... so used CRC32 instead
CRC32 cksum = null;
@@ -207,15 +202,15 @@ public class ContinuousIngest {
return m;
}
- public static final long genLong(long min, long max, Random r) {
- return ((r.nextLong() & 0x7fffffffffffffffl) % (max - min)) + min;
+ static long genLong(long min, long max, Random r) {
+ return ((r.nextLong() & 0x7fffffffffffffffL) % (max - min)) + min;
}
- static final byte[] genRow(long min, long max, Random r) {
+ static byte[] genRow(long min, long max, Random r) {
return genRow(genLong(min, max, r));
}
- static final byte[] genRow(long rowLong) {
+ static byte[] genRow(long rowLong) {
return FastFormat.toZeroPaddedString(rowLong, 16, 16, EMPTY_BYTES);
}
http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/fc3ddfc4/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousMoru.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousMoru.java b/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousMoru.java
index c2902ee..560e2ff 100644
--- a/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousMoru.java
+++ b/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousMoru.java
@@ -19,12 +19,11 @@ package org.apache.accumulo.testing.core.continuous;
import static java.nio.charset.StandardCharsets.UTF_8;
import java.io.IOException;
+import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
-import org.apache.accumulo.core.cli.BatchWriterOpts;
-import org.apache.accumulo.core.cli.MapReduceClientOnDefaultTable;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
@@ -34,6 +33,7 @@ import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.security.ColumnVisibility;
import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.accumulo.testing.core.TestProps;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.Text;
@@ -43,13 +43,11 @@ import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
-import com.beust.jcommander.Parameter;
-import com.beust.jcommander.validators.PositiveInteger;
-
/**
- * A map only job that reads a table created by continuous ingest and creates doubly linked list. This map reduce job tests the ability of a map only job to
- * read and write to accumulo at the same time. This map reduce job mutates the table in such a way that it should not create any undefined nodes.
- *
+ * A map only job that reads a table created by continuous ingest and creates doubly linked list.
+ * This map reduce job tests the ability of a map only job to read and write to accumulo at the
+ * same time. This map reduce job mutates the table in such a way that it should not create any
+ * undefined nodes.
*/
public class ContinuousMoru extends Configured implements Tool {
private static final String PREFIX = ContinuousMoru.class.getSimpleName() + ".";
@@ -59,8 +57,8 @@ public class ContinuousMoru extends Configured implements Tool {
private static final String MIN = PREFIX + "MIN";
private static final String CI_ID = PREFIX + "CI_ID";
- static enum Counts {
- SELF_READ;
+ enum Counts {
+ SELF_READ
}
public static class CMapper extends Mapper<Key,Value,Text,Mutation> {
@@ -105,43 +103,36 @@ public class ContinuousMoru extends Configured implements Tool {
if (offset > 0) {
long rowLong = Long.parseLong(new String(val, offset, 16, UTF_8), 16);
Mutation m = ContinuousIngest.genMutation(rowLong, random.nextInt(max_cf), random.nextInt(max_cq), EMPTY_VIS, iiId, count++, key.getRowData()
- .toArray(), random, true);
+ .toArray(), true);
context.write(null, m);
}
} else {
- context.getCounter(Counts.SELF_READ).increment(1l);
+ context.getCounter(Counts.SELF_READ).increment(1L);
}
}
}
- static class Opts extends ContinuousOpts {
- @Parameter(names = "--maxColF", description = "maximum column family value to use", converter = ShortConverter.class)
- short maxColF = Short.MAX_VALUE;
-
- @Parameter(names = "--maxColQ", description = "maximum column qualifier value to use", converter = ShortConverter.class)
- short maxColQ = Short.MAX_VALUE;
-
- @Parameter(names = "--maxMappers", description = "the maximum number of mappers to use", required = true, validateWith = PositiveInteger.class)
- int maxMaps = 0;
- }
-
@Override
public int run(String[] args) throws IOException, InterruptedException, ClassNotFoundException, AccumuloSecurityException {
- Opts opts = new Opts();
- BatchWriterOpts bwOpts = new BatchWriterOpts();
- MapReduceClientOnDefaultTable clientOpts = new MapReduceClientOnDefaultTable("ci");
- clientOpts.parseArgs(ContinuousMoru.class.getName(), args, bwOpts, opts);
+
+ Properties props = TestProps.loadFromFile(args[0]);
+ ContinuousEnv env = new ContinuousEnv(props);
Job job = Job.getInstance(getConf(), this.getClass().getSimpleName() + "_" + System.currentTimeMillis());
job.setJarByClass(this.getClass());
job.setInputFormatClass(AccumuloInputFormat.class);
- clientOpts.setAccumuloConfigs(job);
+
+ AccumuloInputFormat.setConnectorInfo(job, env.getAccumuloUserName(), env.getToken());
+ AccumuloInputFormat.setInputTableName(job, env.getAccumuloTableName());
+ AccumuloInputFormat.setZooKeeperInstance(job, env.getClientConfiguration());
+
+ int maxMaps = Integer.parseInt(props.getProperty(TestProps.CI_VERIFY_MAX_MAPS));
// set up ranges
try {
- Set<Range> ranges = clientOpts.getConnector().tableOperations().splitRangeByTablets(clientOpts.getTableName(), new Range(), opts.maxMaps);
+ Set<Range> ranges = env.getAccumuloConnector().tableOperations().splitRangeByTablets(env.getAccumuloTableName(), new Range(), maxMaps);
AccumuloInputFormat.setRanges(job, ranges);
AccumuloInputFormat.setAutoAdjustRanges(job, false);
} catch (Exception e) {
@@ -149,31 +140,28 @@ public class ContinuousMoru extends Configured implements Tool {
}
job.setMapperClass(CMapper.class);
-
job.setNumReduceTasks(0);
-
job.setOutputFormatClass(AccumuloOutputFormat.class);
- AccumuloOutputFormat.setBatchWriterOptions(job, bwOpts.getBatchWriterConfig());
+ AccumuloOutputFormat.setBatchWriterOptions(job, env.getBatchWriterConfig());
+ AccumuloOutputFormat.setConnectorInfo(job, env.getAccumuloUserName(), env.getToken());
+ AccumuloOutputFormat.setCreateTables(job, true);
+ AccumuloOutputFormat.setDefaultTableName(job, env.getAccumuloTableName());
+ AccumuloOutputFormat.setZooKeeperInstance(job, env.getClientConfiguration());
Configuration conf = job.getConfiguration();
- conf.setLong(MIN, opts.min);
- conf.setLong(MAX, opts.max);
- conf.setInt(MAX_CF, opts.maxColF);
- conf.setInt(MAX_CQ, opts.maxColQ);
+ conf.setLong(MIN, env.getRowMin());
+ conf.setLong(MAX, env.getRowMax());
+ conf.setInt(MAX_CF, env.getMaxColF());
+ conf.setInt(MAX_CQ, env.getMaxColQ());
conf.set(CI_ID, UUID.randomUUID().toString());
job.waitForCompletion(true);
- clientOpts.stopTracing();
return job.isSuccessful() ? 0 : 1;
}
- /**
- *
- * @param args
- * instanceName zookeepers username password table columns outputpath
- */
public static void main(String[] args) throws Exception {
- int res = ToolRunner.run(CachedConfiguration.getInstance(), new ContinuousMoru(), args);
+ ContinuousEnv env = new ContinuousEnv(TestProps.loadFromFile(args[0]));
+ int res = ToolRunner.run(env.getHadoopConfiguration(), new ContinuousMoru(), args);
if (res != 0)
System.exit(res);
}
http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/fc3ddfc4/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousQuery.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousQuery.java b/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousQuery.java
deleted file mode 100644
index 8180383..0000000
--- a/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousQuery.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.testing.core.continuous;
-
-import static java.nio.charset.StandardCharsets.UTF_8;
-
-import java.util.Map.Entry;
-import java.util.Random;
-
-import org.apache.accumulo.core.cli.ClientOnDefaultTable;
-import org.apache.accumulo.core.cli.ClientOpts.TimeConverter;
-import org.apache.accumulo.core.cli.ScannerOpts;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Range;
-import org.apache.accumulo.core.data.Value;
-import org.apache.hadoop.io.Text;
-
-import com.beust.jcommander.Parameter;
-
-public class ContinuousQuery {
-
- public static class Opts extends ContinuousOpts {
- @Parameter(names = "--sleep", description = "the time to wait between queries", converter = TimeConverter.class)
- long sleepTime = 100;
- }
-
- public static void main(String[] args) throws Exception {
- Opts opts = new Opts();
- ScannerOpts scanOpts = new ScannerOpts();
- ClientOnDefaultTable clientOpts = new ClientOnDefaultTable("ci");
- clientOpts.parseArgs(ContinuousQuery.class.getName(), args, scanOpts, opts);
-
- Connector conn = clientOpts.getConnector();
- Scanner scanner = ContinuousUtil.createScanner(conn, clientOpts.getTableName(), clientOpts.auths);
- scanner.setBatchSize(scanOpts.scanBatchSize);
-
- Random r = new Random();
-
- while (true) {
- byte[] row = ContinuousIngest.genRow(opts.min, opts.max, r);
-
- int count = 0;
-
- long t1 = System.currentTimeMillis();
- scanner.setRange(new Range(new Text(row)));
- for (Entry<Key,Value> entry : scanner) {
- ContinuousWalk.validate(entry.getKey(), entry.getValue());
- count++;
- }
- long t2 = System.currentTimeMillis();
-
- System.out.printf("SRQ %d %s %d %d%n", t1, new String(row, UTF_8), (t2 - t1), count);
-
- if (opts.sleepTime > 0)
- Thread.sleep(opts.sleepTime);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/fc3ddfc4/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousScanner.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousScanner.java b/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousScanner.java
index 42e0ea8..162e64d 100644
--- a/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousScanner.java
+++ b/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousScanner.java
@@ -20,49 +20,44 @@ import static java.nio.charset.StandardCharsets.UTF_8;
import java.util.Iterator;
import java.util.Map.Entry;
+import java.util.Properties;
import java.util.Random;
import java.util.concurrent.TimeUnit;
-import org.apache.accumulo.core.cli.ClientOnDefaultTable;
-import org.apache.accumulo.core.cli.ScannerOpts;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.testing.core.TestProps;
import org.apache.hadoop.io.Text;
-import com.beust.jcommander.Parameter;
-import com.beust.jcommander.validators.PositiveInteger;
import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
public class ContinuousScanner {
- static class Opts extends ContinuousWalk.Opts {
- @Parameter(names = "--numToScan", description = "Number rows to scan between sleeps", required = true, validateWith = PositiveInteger.class)
- long numToScan = 0;
- }
-
public static void main(String[] args) throws Exception {
- Opts opts = new Opts();
- ScannerOpts scanOpts = new ScannerOpts();
- ClientOnDefaultTable clientOpts = new ClientOnDefaultTable("ci");
- clientOpts.parseArgs(ContinuousScanner.class.getName(), args, scanOpts, opts);
+
+ Properties props = TestProps.loadFromFile(args[0]);
+ ContinuousEnv env = new ContinuousEnv(props);
Random r = new Random();
long distance = 1000000000000l;
- Connector conn = clientOpts.getConnector();
- Authorizations auths = opts.randomAuths.getAuths(r);
- Scanner scanner = ContinuousUtil.createScanner(conn, clientOpts.getTableName(), auths);
- scanner.setBatchSize(scanOpts.scanBatchSize);
+ Connector conn = env.getAccumuloConnector();
+ Authorizations auths = env.getRandomAuthorizations();
+ Scanner scanner = ContinuousUtil.createScanner(conn, env.getAccumuloTableName(), auths);
+ scanner.setBatchSize(env.getScannerBatchSize());
- double delta = Math.min(.05, .05 / (opts.numToScan / 1000.0));
+ int numToScan = Integer.parseInt(props.getProperty(TestProps.CI_SCANNER_ENTRIES));
+ int scannerSleepMs = Integer.parseInt(props.getProperty(TestProps.CI_SCANNER_SLEEP_MS));
+
+ double delta = Math.min(.05, .05 / (numToScan / 1000.0));
while (true) {
- long startRow = ContinuousIngest.genLong(opts.min, opts.max - distance, r);
+ long startRow = ContinuousIngest.genLong(env.getRowMin(), env.getRowMax() - distance, r);
byte[] scanStart = ContinuousIngest.genRow(startRow);
byte[] scanStop = ContinuousIngest.genRow(startRow + distance);
@@ -83,13 +78,13 @@ public class ContinuousScanner {
// System.out.println("P1 " +count +" "+((1-delta) * numToScan)+" "+((1+delta) * numToScan)+" "+numToScan);
- if (count < (1 - delta) * opts.numToScan || count > (1 + delta) * opts.numToScan) {
+ if (count < (1 - delta) * numToScan || count > (1 + delta) * numToScan) {
if (count == 0) {
distance = distance * 10;
if (distance < 0)
distance = 1000000000000l;
} else {
- double ratio = (double) opts.numToScan / count;
+ double ratio = (double) numToScan / count;
// move ratio closer to 1 to make change slower
ratio = ratio - (ratio - 1.0) * (2.0 / 3.0);
distance = (long) (ratio * distance);
@@ -100,8 +95,9 @@ public class ContinuousScanner {
System.out.printf("SCN %d %s %d %d%n", t1, new String(scanStart, UTF_8), (t2 - t1), count);
- if (opts.sleepTime > 0)
- sleepUninterruptibly(opts.sleepTime, TimeUnit.MILLISECONDS);
+ if (scannerSleepMs > 0) {
+ sleepUninterruptibly(scannerSleepMs, TimeUnit.MILLISECONDS);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/fc3ddfc4/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousStatsCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousStatsCollector.java b/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousStatsCollector.java
deleted file mode 100644
index 818e387..0000000
--- a/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousStatsCollector.java
+++ /dev/null
@@ -1,206 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.testing.core.continuous;
-
-import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.TimeUnit;
-import java.util.Timer;
-import java.util.TimerTask;
-
-import org.apache.accumulo.core.cli.ScannerOpts;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.IteratorSetting;
-import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.client.impl.ClientContext;
-import org.apache.accumulo.core.client.impl.Credentials;
-import org.apache.accumulo.core.client.impl.MasterClient;
-import org.apache.accumulo.core.client.impl.Tables;
-import org.apache.accumulo.core.client.impl.thrift.ThriftNotActiveServiceException;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.data.impl.KeyExtent;
-import org.apache.accumulo.core.iterators.ColumnFamilyCounter;
-import org.apache.accumulo.core.master.thrift.MasterClientService;
-import org.apache.accumulo.core.master.thrift.MasterMonitorInfo;
-import org.apache.accumulo.core.master.thrift.TableInfo;
-import org.apache.accumulo.core.master.thrift.TabletServerStatus;
-import org.apache.accumulo.core.metadata.MetadataTable;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
-import org.apache.accumulo.core.trace.Tracer;
-import org.apache.accumulo.core.util.CachedConfiguration;
-import org.apache.accumulo.core.util.Stat;
-import org.apache.accumulo.server.ServerConstants;
-import org.apache.accumulo.server.cli.ClientOnRequiredTable;
-import org.apache.accumulo.server.conf.ServerConfigurationFactory;
-import org.apache.accumulo.server.fs.VolumeManager;
-import org.apache.accumulo.server.fs.VolumeManagerImpl;
-import org.apache.accumulo.server.util.TableInfoUtil;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.ContentSummary;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.ClusterStatus;
-import org.apache.hadoop.mapred.JobClient;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class ContinuousStatsCollector {
-
- private static final Logger log = LoggerFactory.getLogger(ContinuousStatsCollector.class);
-
- static class StatsCollectionTask extends TimerTask {
-
- private final String tableId;
- private final Opts opts;
- private final int scanBatchSize;
-
- public StatsCollectionTask(Opts opts, int scanBatchSize) {
- this.opts = opts;
- this.scanBatchSize = scanBatchSize;
- this.tableId = Tables.getNameToIdMap(opts.getInstance()).get(opts.getTableName());
- System.out
- .println("TIME TABLET_SERVERS TOTAL_ENTRIES TOTAL_INGEST TOTAL_QUERY TABLE_RECS TABLE_RECS_IN_MEM TABLE_INGEST TABLE_QUERY TABLE_TABLETS TABLE_TABLETS_ONLINE"
- + " ACCUMULO_DU ACCUMULO_DIRS ACCUMULO_FILES TABLE_DU TABLE_DIRS TABLE_FILES"
- + " MAP_TASK MAX_MAP_TASK REDUCE_TASK MAX_REDUCE_TASK TASK_TRACKERS BLACK_LISTED MIN_FILES/TABLET MAX_FILES/TABLET AVG_FILES/TABLET STDDEV_FILES/TABLET");
- }
-
- @Override
- public void run() {
- try {
- String acuStats = getACUStats();
- String fsStats = getFSStats();
- String mrStats = getMRStats();
- String tabletStats = getTabletStats();
-
- System.out.println(System.currentTimeMillis() + " " + acuStats + " " + fsStats + " " + mrStats + " " + tabletStats);
- } catch (Exception e) {
- log.error(System.currentTimeMillis() + " - Failed to collect stats", e);
- }
- }
-
- private String getTabletStats() throws Exception {
-
- Connector conn = opts.getConnector();
- Scanner scanner = conn.createScanner(MetadataTable.NAME, opts.auths);
- scanner.setBatchSize(scanBatchSize);
- scanner.fetchColumnFamily(DataFileColumnFamily.NAME);
- scanner.addScanIterator(new IteratorSetting(1000, "cfc", ColumnFamilyCounter.class.getName()));
- scanner.setRange(new KeyExtent(tableId, null, null).toMetadataRange());
-
- Stat s = new Stat();
-
- int count = 0;
- for (Entry<Key,Value> entry : scanner) {
- count++;
- s.addStat(Long.parseLong(entry.getValue().toString()));
- }
-
- if (count > 0)
- return String.format("%d %d %.3f %.3f", s.getMin(), s.getMax(), s.getAverage(), s.getStdDev());
- else
- return "0 0 0 0";
-
- }
-
- private String getFSStats() throws Exception {
- VolumeManager fs = VolumeManagerImpl.get();
- long length1 = 0, dcount1 = 0, fcount1 = 0;
- long length2 = 0, dcount2 = 0, fcount2 = 0;
- for (String dir : ServerConstants.getTablesDirs()) {
- ContentSummary contentSummary = fs.getContentSummary(new Path(dir));
- length1 += contentSummary.getLength();
- dcount1 += contentSummary.getDirectoryCount();
- fcount1 += contentSummary.getFileCount();
- contentSummary = fs.getContentSummary(new Path(dir, tableId));
- length2 += contentSummary.getLength();
- dcount2 += contentSummary.getDirectoryCount();
- fcount2 += contentSummary.getFileCount();
- }
-
- return "" + length1 + " " + dcount1 + " " + fcount1 + " " + length2 + " " + dcount2 + " " + fcount2;
- }
-
- private String getACUStats() throws Exception {
-
- MasterClientService.Iface client = null;
- while (true) {
- try {
- ClientContext context = new ClientContext(opts.getInstance(), new Credentials(opts.getPrincipal(), opts.getToken()), new ServerConfigurationFactory(
- opts.getInstance()).getConfiguration());
- client = MasterClient.getConnectionWithRetry(context);
- MasterMonitorInfo stats = client.getMasterStats(Tracer.traceInfo(), context.rpcCreds());
-
- TableInfo all = new TableInfo();
- Map<String,TableInfo> tableSummaries = new HashMap<>();
-
- for (TabletServerStatus server : stats.tServerInfo) {
- for (Entry<String,TableInfo> info : server.tableMap.entrySet()) {
- TableInfo tableSummary = tableSummaries.get(info.getKey());
- if (tableSummary == null) {
- tableSummary = new TableInfo();
- tableSummaries.put(info.getKey(), tableSummary);
- }
- TableInfoUtil.add(tableSummary, info.getValue());
- TableInfoUtil.add(all, info.getValue());
- }
- }
-
- TableInfo ti = tableSummaries.get(tableId);
-
- return "" + stats.tServerInfo.size() + " " + all.recs + " " + (long) all.ingestRate + " " + (long) all.queryRate + " " + ti.recs + " "
- + ti.recsInMemory + " " + (long) ti.ingestRate + " " + (long) ti.queryRate + " " + ti.tablets + " " + ti.onlineTablets;
-
- } catch (ThriftNotActiveServiceException e) {
- // Let it loop, fetching a new location
- log.debug("Contacted a Master which is no longer active, retrying");
- sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
- } finally {
- if (client != null)
- MasterClient.close(client);
- }
- }
- }
-
- }
-
- private static String getMRStats() throws Exception {
- Configuration conf = CachedConfiguration.getInstance();
- // No alternatives for hadoop 20
- JobClient jc = new JobClient(new org.apache.hadoop.mapred.JobConf(conf));
-
- ClusterStatus cs = jc.getClusterStatus(false);
-
- return "" + cs.getMapTasks() + " " + cs.getMaxMapTasks() + " " + cs.getReduceTasks() + " " + cs.getMaxReduceTasks() + " " + cs.getTaskTrackers() + " "
- + cs.getBlacklistedTrackers();
-
- }
-
- static class Opts extends ClientOnRequiredTable {}
-
- public static void main(String[] args) {
- Opts opts = new Opts();
- ScannerOpts scanOpts = new ScannerOpts();
- opts.parseArgs(ContinuousStatsCollector.class.getName(), args, scanOpts);
- Timer jtimer = new Timer();
-
- jtimer.schedule(new StatsCollectionTask(opts, scanOpts.scanBatchSize), 0, 30000);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/fc3ddfc4/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousVerify.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousVerify.java b/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousVerify.java
index 64f8a35..430bf3b 100644
--- a/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousVerify.java
+++ b/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousVerify.java
@@ -22,16 +22,16 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Properties;
import java.util.Random;
import java.util.Set;
-import org.apache.accumulo.core.cli.MapReduceClientOnDefaultTable;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.accumulo.testing.core.TestProps;
import org.apache.accumulo.testing.core.continuous.ContinuousWalk.BadChecksumException;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
@@ -47,13 +47,9 @@ import org.apache.hadoop.util.ToolRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.beust.jcommander.Parameter;
-import com.beust.jcommander.validators.PositiveInteger;
-
/**
* A map reduce job that verifies a table created by continuous ingest. It verifies that all referenced nodes are defined.
*/
-
public class ContinuousVerify extends Configured implements Tool {
public static final VLongWritable DEF = new VLongWritable(-1);
@@ -76,7 +72,7 @@ public class ContinuousVerify extends Configured implements Tool {
try {
ContinuousWalk.validate(key, data);
} catch (BadChecksumException bce) {
- context.getCounter(Counts.CORRUPT).increment(1l);
+ context.getCounter(Counts.CORRUPT).increment(1L);
if (corrupt < 1000) {
log.error("Bad checksum : " + key);
} else if (corrupt == 1000) {
@@ -100,7 +96,7 @@ public class ContinuousVerify extends Configured implements Tool {
}
}
- public static enum Counts {
+ public enum Counts {
UNREFERENCED, UNDEFINED, REFERENCED, CORRUPT
}
@@ -131,95 +127,82 @@ public class ContinuousVerify extends Configured implements Tool {
}
context.write(new Text(ContinuousIngest.genRow(key.get())), new Text(sb.toString()));
- context.getCounter(Counts.UNDEFINED).increment(1l);
+ context.getCounter(Counts.UNDEFINED).increment(1L);
} else if (defCount > 0 && refs.size() == 0) {
- context.getCounter(Counts.UNREFERENCED).increment(1l);
+ context.getCounter(Counts.UNREFERENCED).increment(1L);
} else {
- context.getCounter(Counts.REFERENCED).increment(1l);
+ context.getCounter(Counts.REFERENCED).increment(1L);
}
}
}
- static class Opts extends MapReduceClientOnDefaultTable {
- @Parameter(names = "--output", description = "location in HDFS to store the results; must not exist")
- String outputDir = "/tmp/continuousVerify";
-
- @Parameter(names = "--maxMappers", description = "the maximum number of mappers to use", validateWith = PositiveInteger.class)
- int maxMaps = 1;
-
- @Parameter(names = "--reducers", description = "the number of reducers to use", validateWith = PositiveInteger.class)
- int reducers = 1;
-
- @Parameter(names = "--offline", description = "perform the verification directly on the files while the table is offline")
- boolean scanOffline = false;
-
- public Opts() {
- super("ci");
- }
- }
-
@Override
public int run(String[] args) throws Exception {
- Opts opts = new Opts();
- opts.parseArgs(this.getClass().getName(), args);
+
+ Properties props = TestProps.loadFromFile(args[0]);
+ ContinuousEnv env = new ContinuousEnv(props);
Job job = Job.getInstance(getConf(), this.getClass().getSimpleName() + "_" + System.currentTimeMillis());
job.setJarByClass(this.getClass());
job.setInputFormatClass(AccumuloInputFormat.class);
- opts.setAccumuloConfigs(job);
- Set<Range> ranges = null;
- String clone = opts.getTableName();
- Connector conn = null;
+ boolean scanOffline = Boolean.parseBoolean(props.getProperty(TestProps.CI_VERIFY_SCAN_OFFLINE));
+ String tableName = env.getAccumuloTableName();
+ int maxMaps = Integer.parseInt(props.getProperty(TestProps.CI_VERIFY_MAX_MAPS));
+ int reducers = Integer.parseInt(props.getProperty(TestProps.CI_VERIFY_REDUCERS));
+ String outputDir = props.getProperty(TestProps.CI_VERIFY_OUTPUT_DIR);
+
+ Set<Range> ranges;
+ String clone = "";
+ Connector conn = env.getAccumuloConnector();
- if (opts.scanOffline) {
+ if (scanOffline) {
Random random = new Random();
- clone = opts.getTableName() + "_" + String.format("%016x", (random.nextLong() & 0x7fffffffffffffffl));
- conn = opts.getConnector();
- conn.tableOperations().clone(opts.getTableName(), clone, true, new HashMap<String,String>(), new HashSet<String>());
- ranges = conn.tableOperations().splitRangeByTablets(opts.getTableName(), new Range(), opts.maxMaps);
+ clone = tableName + "_" + String.format("%016x", (random.nextLong() & 0x7fffffffffffffffL));
+ conn.tableOperations().clone(tableName, clone, true, new HashMap<>(), new HashSet<>());
+ ranges = conn.tableOperations().splitRangeByTablets(tableName, new Range(), maxMaps);
conn.tableOperations().offline(clone);
AccumuloInputFormat.setInputTableName(job, clone);
AccumuloInputFormat.setOfflineTableScan(job, true);
} else {
- ranges = opts.getConnector().tableOperations().splitRangeByTablets(opts.getTableName(), new Range(), opts.maxMaps);
+ ranges = conn.tableOperations().splitRangeByTablets(tableName, new Range(), maxMaps);
+ AccumuloInputFormat.setInputTableName(job, tableName);
}
-
+
AccumuloInputFormat.setRanges(job, ranges);
AccumuloInputFormat.setAutoAdjustRanges(job, false);
+ AccumuloInputFormat.setConnectorInfo(job, env.getAccumuloUserName(), env.getToken());
+ AccumuloInputFormat.setZooKeeperInstance(job, env.getClientConfiguration());
job.setMapperClass(CMapper.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(VLongWritable.class);
job.setReducerClass(CReducer.class);
- job.setNumReduceTasks(opts.reducers);
+ job.setNumReduceTasks(reducers);
job.setOutputFormatClass(TextOutputFormat.class);
- job.getConfiguration().setBoolean("mapred.map.tasks.speculative.execution", opts.scanOffline);
+ job.getConfiguration().setBoolean("mapred.map.tasks.speculative.execution", scanOffline);
- TextOutputFormat.setOutputPath(job, new Path(opts.outputDir));
+ TextOutputFormat.setOutputPath(job, new Path(outputDir));
job.waitForCompletion(true);
- if (opts.scanOffline) {
+ if (scanOffline) {
conn.tableOperations().delete(clone);
}
- opts.stopTracing();
return job.isSuccessful() ? 0 : 1;
}
- /**
- *
- * @param args
- * instanceName zookeepers username password table columns outputpath
- */
public static void main(String[] args) throws Exception {
- int res = ToolRunner.run(CachedConfiguration.getInstance(), new ContinuousVerify(), args);
+
+ ContinuousEnv env = new ContinuousEnv(TestProps.loadFromFile(args[0]));
+
+ int res = ToolRunner.run(env.getHadoopConfiguration(), new ContinuousVerify(), args);
if (res != 0)
System.exit(res);
}
http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/fc3ddfc4/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousWalk.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousWalk.java b/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousWalk.java
index 2335fd4..49c10c9 100644
--- a/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousWalk.java
+++ b/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousWalk.java
@@ -18,106 +18,49 @@ package org.apache.accumulo.testing.core.continuous;
import static java.nio.charset.StandardCharsets.UTF_8;
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
import java.util.Map.Entry;
+import java.util.Properties;
import java.util.Random;
import java.util.zip.CRC32;
-import org.apache.accumulo.core.cli.ClientOnDefaultTable;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.trace.Span;
import org.apache.accumulo.core.trace.Trace;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
+import org.apache.accumulo.testing.core.TestProps;
import org.apache.hadoop.io.Text;
-import com.beust.jcommander.IStringConverter;
-import com.beust.jcommander.Parameter;
-
public class ContinuousWalk {
- static public class Opts extends ContinuousQuery.Opts {
- class RandomAuthsConverter implements IStringConverter<RandomAuths> {
- @Override
- public RandomAuths convert(String value) {
- try {
- return new RandomAuths(value);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
- }
-
- @Parameter(names = "--authsFile", description = "read the authorities to use from a file")
- RandomAuths randomAuths = new RandomAuths();
- }
-
static class BadChecksumException extends RuntimeException {
private static final long serialVersionUID = 1L;
- public BadChecksumException(String msg) {
+ BadChecksumException(String msg) {
super(msg);
}
}
- static class RandomAuths {
- private List<Authorizations> auths;
-
- RandomAuths() {
- auths = Collections.singletonList(Authorizations.EMPTY);
- }
-
- RandomAuths(String file) throws IOException {
- if (file == null) {
- auths = Collections.singletonList(Authorizations.EMPTY);
- return;
- }
-
- auths = new ArrayList<>();
-
- FileSystem fs = FileSystem.get(new Configuration());
- BufferedReader in = new BufferedReader(new InputStreamReader(fs.open(new Path(file)), UTF_8));
- try {
- String line;
- while ((line = in.readLine()) != null) {
- auths.add(new Authorizations(line.split(",")));
- }
- } finally {
- in.close();
- }
- }
-
- Authorizations getAuths(Random r) {
- return auths.get(r.nextInt(auths.size()));
- }
- }
-
public static void main(String[] args) throws Exception {
- Opts opts = new Opts();
- ClientOnDefaultTable clientOpts = new ClientOnDefaultTable("ci");
- clientOpts.parseArgs(ContinuousWalk.class.getName(), args, opts);
- Connector conn = clientOpts.getConnector();
+ Properties props = TestProps.loadFromFile(args[0]);
+ ContinuousEnv env = new ContinuousEnv(props);
+
+ Connector conn = env.getAccumuloConnector();
Random r = new Random();
ArrayList<Value> values = new ArrayList<>();
+ int sleepTime = Integer.parseInt(props.getProperty(TestProps.CI_WALKER_SLEEP_MS));
+
while (true) {
- Scanner scanner = ContinuousUtil.createScanner(conn, clientOpts.getTableName(), opts.randomAuths.getAuths(r));
- String row = findAStartRow(opts.min, opts.max, scanner, r);
+ Scanner scanner = ContinuousUtil.createScanner(conn, env.getAccumuloTableName(), env.getRandomAuthorizations());
+ String row = findAStartRow(env.getRowMin(), env.getRowMax(), scanner, r);
while (row != null) {
@@ -146,12 +89,12 @@ public class ContinuousWalk {
row = null;
}
- if (opts.sleepTime > 0)
- Thread.sleep(opts.sleepTime);
+ if (sleepTime > 0)
+ Thread.sleep(sleepTime);
}
- if (opts.sleepTime > 0)
- Thread.sleep(opts.sleepTime);
+ if (sleepTime > 0)
+ Thread.sleep(sleepTime);
}
}
@@ -197,7 +140,7 @@ public class ContinuousWalk {
return -1;
}
- static String getPrevRow(Value value) {
+ private static String getPrevRow(Value value) {
byte[] val = value.get();
int offset = getPrevRowOffset(val);
@@ -208,7 +151,7 @@ public class ContinuousWalk {
return null;
}
- static int getChecksumOffset(byte val[]) {
+ private static int getChecksumOffset(byte val[]) {
if (val[val.length - 1] != ':') {
if (val[val.length - 9] != ':')
throw new IllegalArgumentException(new String(val, UTF_8));