You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2017/01/24 07:44:37 UTC

[1/2] flink git commit: [FLINK-5613][query] querying a non-existing key is inconsistent among state backends

Repository: flink
Updated Branches:
  refs/heads/release-1.2 b323f66a6 -> 44f7098d7


[FLINK-5613][query] querying a non-existing key is inconsistent among state backends

Querying for a non-existing key for a state that has a default value set
currently results in an UnknownKeyOrNamespace exception when the
MemoryStateBackend or FsStateBackend is used but results in the default value
if RocksDBStateBackend is set.

This removes the special handling from the RocksDBStateBackend and makes it
consistent with the other two back-ends, i.e. returning null which results
in the mentioned UnknownKeyOrNamespace exception.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/44f7098d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/44f7098d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/44f7098d

Branch: refs/heads/release-1.2
Commit: 44f7098d74476ab561f38c8ced5f3a0b5f9bd0c2
Parents: 2e175fb
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Tue Jan 17 14:26:16 2017 +0100
Committer: Robert Metzger <rm...@apache.org>
Committed: Tue Jan 24 08:44:08 2017 +0100

----------------------------------------------------------------------
 .../streaming/state/RocksDBValueState.java      |  12 --
 .../flink/test/query/QueryableStateITCase.java  | 160 ++++++++++++++++++-
 2 files changed, 153 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/44f7098d/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
index 9563ed8..d8d89ba 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.RocksDBException;
 import org.rocksdb.WriteOptions;
@@ -102,15 +101,4 @@ public class RocksDBValueState<K, N, V>
 			throw new RuntimeException("Error while adding data to RocksDB", e);
 		}
 	}
-
-	@Override
-	public byte[] getSerializedValue(byte[] serializedKeyAndNamespace) throws Exception {
-		byte[] value = super.getSerializedValue(serializedKeyAndNamespace);
-
-		if (value != null) {
-			return value;
-		} else {
-			return KvStateRequestSerializer.serializeValue(stateDesc.getDefaultValue(), stateDesc.getSerializer());
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/44f7098d/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java b/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java
index 327a715..c2df6ae 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java
@@ -39,6 +39,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.QueryableStateOptions;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
@@ -50,10 +51,14 @@ import org.apache.flink.runtime.messages.JobManagerMessages.JobFound;
 import org.apache.flink.runtime.query.KvStateLocation;
 import org.apache.flink.runtime.query.KvStateMessage;
 import org.apache.flink.runtime.query.QueryableStateClient;
+import org.apache.flink.runtime.query.netty.UnknownKeyOrNamespace;
 import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
+import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.runtime.testingUtils.TestingCluster;
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
 import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
@@ -67,7 +72,9 @@ import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.TestLogger;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.Deadline;
@@ -102,6 +109,9 @@ public class QueryableStateITCase extends TestLogger {
 	private final static int NUM_SLOTS_PER_TM = 4;
 	private final static int NUM_SLOTS = NUM_TMS * NUM_SLOTS_PER_TM;
 
+	@Rule
+	public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
 	/**
 	 * Shared between all the test. Make sure to have at least NUM_SLOTS
 	 * available after your test finishes, e.g. cancel the job you submitted.
@@ -229,7 +239,8 @@ public class QueryableStateITCase extends TestLogger {
 							queryName,
 							key,
 							serializedKey,
-							QUERY_RETRY_DELAY);
+							QUERY_RETRY_DELAY,
+							false);
 
 					serializedResult.onSuccess(new OnSuccess<byte[]>() {
 						@Override
@@ -352,7 +363,8 @@ public class QueryableStateITCase extends TestLogger {
 						queryName,
 						key,
 						serializedKey,
-						QUERY_RETRY_DELAY);
+						QUERY_RETRY_DELAY,
+						false);
 
 				byte[] serializedResult = Await.result(serializedResultFuture, deadline.timeLeft());
 
@@ -455,7 +467,8 @@ public class QueryableStateITCase extends TestLogger {
 					queryName,
 					key,
 					serializedKey,
-					QUERY_RETRY_DELAY);
+					QUERY_RETRY_DELAY,
+					false);
 
 			byte[] serializedResult = Await.result(serializedResultFuture, deadline.timeLeft());
 
@@ -731,7 +744,8 @@ public class QueryableStateITCase extends TestLogger {
 					queryableState.getQueryableStateName(),
 					key,
 					serializedKey,
-					QUERY_RETRY_DELAY);
+					QUERY_RETRY_DELAY,
+					false);
 
 				byte[] serializedValue = Await.result(future, deadline.timeLeft());
 
@@ -753,6 +767,132 @@ public class QueryableStateITCase extends TestLogger {
 	}
 
 	/**
+	 * Tests simple value state queryable state instance with a default value
+	 * set using the {@link MemoryStateBackend}.
+	 */
+	@Test(expected = UnknownKeyOrNamespace.class)
+	public void testValueStateDefaultValueMemoryBackend() throws Exception {
+		testValueStateDefault(new MemoryStateBackend());
+	}
+
+	/**
+	 * Tests simple value state queryable state instance with a default value
+	 * set using the {@link RocksDBStateBackend}.
+	 */
+	@Test(expected = UnknownKeyOrNamespace.class)
+	public void testValueStateDefaultValueRocksDBBackend() throws Exception {
+		testValueStateDefault(new RocksDBStateBackend(
+			temporaryFolder.newFolder().toURI().toString()));
+	}
+
+	/**
+	 * Tests simple value state queryable state instance with a default value
+	 * set using the {@link FsStateBackend}.
+	 */
+	@Test(expected = UnknownKeyOrNamespace.class)
+	public void testValueStateDefaultValueFsBackend() throws Exception {
+		testValueStateDefault(new FsStateBackend(
+			temporaryFolder.newFolder().toURI().toString()));
+	}
+
+	/**
+	 * Tests simple value state queryable state instance with a default value
+	 * set. Each source emits (subtaskIndex, 0)..(subtaskIndex, numElements)
+	 * tuples, the key is mapped to 1 but key 0 is queried which should throw
+	 * a {@link UnknownKeyOrNamespace} exception.
+	 *
+	 * @param stateBackend state back-end to use for the job
+	 *
+	 * @throws UnknownKeyOrNamespace thrown due querying a non-existent key
+	 */
+	void testValueStateDefault(final AbstractStateBackend stateBackend) throws
+		Exception, UnknownKeyOrNamespace {
+
+		// Config
+		final Deadline deadline = TEST_TIMEOUT.fromNow();
+
+		final int numElements = 1024;
+
+		final QueryableStateClient client = new QueryableStateClient(cluster.configuration());
+
+		JobID jobId = null;
+		try {
+			StreamExecutionEnvironment env =
+				StreamExecutionEnvironment.getExecutionEnvironment();
+			env.setParallelism(NUM_SLOTS);
+			// Very important, because cluster is shared between tests and we
+			// don't explicitly check that all slots are available before
+			// submitting.
+			env.setRestartStrategy(RestartStrategies
+				.fixedDelayRestart(Integer.MAX_VALUE, 1000));
+
+			env.setStateBackend(stateBackend);
+
+			DataStream<Tuple2<Integer, Long>> source = env
+				.addSource(new TestAscendingValueSource(numElements));
+
+			// Value state
+			ValueStateDescriptor<Tuple2<Integer, Long>> valueState =
+				new ValueStateDescriptor<>(
+					"any",
+					source.getType(),
+					Tuple2.of(0, 1337l));
+
+			// only expose key "1"
+			QueryableStateStream<Integer, Tuple2<Integer, Long>>
+				queryableState =
+				source.keyBy(
+					new KeySelector<Tuple2<Integer, Long>, Integer>() {
+						@Override
+						public Integer getKey(
+							Tuple2<Integer, Long> value) throws
+							Exception {
+							return 1;
+						}
+					}).asQueryableState("hakuna", valueState);
+
+			// Submit the job graph
+			JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+			jobId = jobGraph.getJobID();
+
+			cluster.submitJobDetached(jobGraph);
+
+			// Now query
+			int key = 0;
+			final byte[] serializedKey =
+				KvStateRequestSerializer.serializeKeyAndNamespace(
+					key,
+					queryableState.getKeySerializer(),
+					VoidNamespace.INSTANCE,
+					VoidNamespaceSerializer.INSTANCE);
+
+			Future<byte[]> future = getKvStateWithRetries(client,
+				jobId,
+				queryableState.getQueryableStateName(),
+				key,
+				serializedKey,
+				QUERY_RETRY_DELAY,
+				true);
+
+			Await.result(future, deadline.timeLeft());
+		} finally {
+			// Free cluster resources
+			if (jobId != null) {
+				Future<CancellationSuccess> cancellation = cluster
+					.getLeaderGateway(deadline.timeLeft())
+					.ask(new JobManagerMessages.CancelJob(jobId),
+						deadline.timeLeft())
+					.mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(
+						CancellationSuccess.class));
+
+				Await.ready(cancellation, deadline.timeLeft());
+			}
+
+			client.shutDown();
+		}
+	}
+
+	/**
 	 * Tests simple value state queryable state instance. Each source emits
 	 * (subtaskIndex, 0)..(subtaskIndex, numElements) tuples, which are then
 	 * queried. The tests succeeds after each subtask index is queried with
@@ -883,7 +1023,8 @@ public class QueryableStateITCase extends TestLogger {
 							queryableState.getQueryableStateName(),
 							key,
 							serializedKey,
-							QUERY_RETRY_DELAY);
+							QUERY_RETRY_DELAY,
+							false);
 
 					byte[] serializedValue = Await.result(future, deadline.timeLeft());
 
@@ -993,7 +1134,8 @@ public class QueryableStateITCase extends TestLogger {
 			final String queryName,
 			final int key,
 			final byte[] serializedKey,
-			final FiniteDuration retryDelay) {
+			final FiniteDuration retryDelay,
+			final boolean failForUknownKeyOrNamespace) {
 
 		return client.getKvState(jobId, queryName, key, serializedKey)
 				.recoverWith(new Recover<Future<byte[]>>() {
@@ -1001,6 +1143,9 @@ public class QueryableStateITCase extends TestLogger {
 					public Future<byte[]> recover(Throwable failure) throws Throwable {
 						if (failure instanceof AssertionError) {
 							return Futures.failed(failure);
+						} else if (failForUknownKeyOrNamespace &&
+								(failure instanceof UnknownKeyOrNamespace)) {
+							return Futures.failed(failure);
 						} else {
 							// At startup some failures are expected
 							// due to races. Make sure that they don't
@@ -1018,7 +1163,8 @@ public class QueryableStateITCase extends TestLogger {
 													queryName,
 													key,
 													serializedKey,
-													retryDelay);
+													retryDelay,
+													failForUknownKeyOrNamespace);
 										}
 									});
 						}


[2/2] flink git commit: [FLINK-5395] [Build System] support locally build distribution by script create_release_files.sh

Posted by rm...@apache.org.
[FLINK-5395] [Build System] support locally build distribution by script create_release_files.sh

This closes #3049


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2e175fb3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2e175fb3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2e175fb3

Branch: refs/heads/release-1.2
Commit: 2e175fb301cc84ca0c3195d1fe325319b34fd57a
Parents: b323f66
Author: shijinkui <sh...@163.com>
Authored: Wed Jan 11 17:42:16 2017 +0800
Committer: Robert Metzger <rm...@apache.org>
Committed: Tue Jan 24 08:44:08 2017 +0100

----------------------------------------------------------------------
 .gitignore                    |   2 +
 tools/create_release_files.sh | 156 +++++++++++++++++++++++++++++--------
 2 files changed, 124 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2e175fb3/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index 9012d0a..3c9e4e8 100644
--- a/.gitignore
+++ b/.gitignore
@@ -32,3 +32,5 @@ out/
 /docs/.jekyll-metadata
 *.ipr
 *.iws
+tools/flink
+tools/flink-*

http://git-wip-us.apache.org/repos/asf/flink/blob/2e175fb3/tools/create_release_files.sh
----------------------------------------------------------------------
diff --git a/tools/create_release_files.sh b/tools/create_release_files.sh
index fdf50a5..ed01d97 100755
--- a/tools/create_release_files.sh
+++ b/tools/create_release_files.sh
@@ -66,16 +66,20 @@ fi
 GPG_PASSPHRASE=${GPG_PASSPHRASE:-XXX}
 GPG_KEY=${GPG_KEY:-XXX}
 GIT_AUTHOR=${GIT_AUTHOR:-"Your name <yo...@apache.org>"}
-OLD_VERSION=${OLD_VERSION:-1.1-SNAPSHOT}
-RELEASE_VERSION=${NEW_VERSION}
-RELEASE_CANDIDATE=${RELEASE_CANDIDATE:-rc1}
+OLD_VERSION=${OLD_VERSION:-1.2-SNAPSHOT}
+RELEASE_VERSION=${NEW_VERSION:-1.3-SNAPSHOT}
+RELEASE_CANDIDATE=${RELEASE_CANDIDATE:-none}
 RELEASE_BRANCH=${RELEASE_BRANCH:-master}
 USER_NAME=${USER_NAME:-yourapacheidhere}
 MVN=${MVN:-mvn}
 GPG=${GPG:-gpg}
 sonatype_user=${sonatype_user:-yourapacheidhere}
 sonatype_pw=${sonatype_pw:-XXX}
-
+# whether only build the dist local and don't release to apache
+IS_LOCAL_DIST=${IS_LOCAL_DIST:-false}
+GIT_REPO=${GIT_REPO:-git-wip-us.apache.org/repos/asf/flink.git}
+SCALA_VERSION=none
+HADOOP_VERSION=none
 
 if [ "$(uname)" == "Darwin" ]; then
     SHASUM="shasum -a 512"
@@ -85,18 +89,81 @@ else
     MD5SUM="md5sum"
 fi
 
+usage() {
+  set +x
+  echo "./create_release_files.sh --scala-version 2.11 --hadoop-version 2.7.2"
+  echo ""
+  echo "usage:"
+  echo "[--scala-version <version>] [--hadoop-version <version>]"
+  echo ""
+  echo "example 1: build apache release"
+  echo "  sonatype_user=APACHEID sonatype_pw=APACHEIDPASSWORD \ "
+  echo "  NEW_VERSION=1.2.0 RELEASE_CANDIDATE="rc1" RELEASE_BRANCH=release-1.2.0 OLD_VERSION=1.1-SNAPSHOT \ "
+  echo "  USER_NAME=APACHEID GPG_PASSPHRASE=XXX GPG_KEY=KEYID \ "
+  echo "  GIT_AUTHOR=\"`git config --get user.name` <`git config --get user.email`>\" \ "
+  echo "  GIT_REPO=github.com/apache/flink.git \ "
+  echo "  ./create_release_files.sh --scala-version 2.11 --hadoop-version 2.7.2"
+  echo ""
+  echo "example 2: build local release"
+  echo "  NEW_VERSION=1.2.0 RELEASE_BRANCH=master OLD_VERSION=1.2-SNAPSHOT \ "
+  echo "  GPG_PASSPHRASE=XXX GPG_KEY=XXX IS_LOCAL_DIST=true \ "
+  echo "  ./create_release_files.sh --scala-version 2.11 --hadoop-version 2.7.2"
+
+  exit 1
+}
+
+# Parse arguments
+while (( "$#" )); do
+  case $1 in
+    --scala-version)
+      SCALA_VERSION="$2"
+      shift
+      ;;
+    --hadoop-version)
+      HADOOP_VERSION="$2"
+      shift
+      ;;
+    --help)
+      usage
+      ;;
+    *)
+      break
+      ;;
+  esac
+  shift
+done
+
+###########################
 
 prepare() {
   # prepare
-  git clone http://git-wip-us.apache.org/repos/asf/flink.git flink
+  target_branch=release-$RELEASE_VERSION
+  if [ "$RELEASE_CANDIDATE" != "none" ]; then
+    target_branch=$target_branch-$RELEASE_CANDIDATE
+  fi
+
+  if [ ! -d ./flink ]; then
+    git clone http://$GIT_REPO flink
+  else
+    # if flink git repo exist, delete target branch, delete builded distribution
+    rm -rf flink-*.tgz
+    cd flink
+    # try-catch
+    {
+      git pull --all
+      git checkout master
+      git branch -D $target_branch -f
+    } || {
+      echo "branch $target_branch not found"
+    }
+    cd ..
+  fi
+
   cd flink
-  git checkout -b "release-$RELEASE_VERSION-$RELEASE_CANDIDATE" origin/$RELEASE_BRANCH
-  rm -f .gitignore
-  rm -f .gitattributes
-  rm -f .travis.yml
-  rm -f deploysettings.xml
-  rm -f CHANGELOG
-  rm -rf .github
+
+  git checkout -b $target_branch origin/$RELEASE_BRANCH
+  rm -rf .gitignore .gitattributes .travis.yml deploysettings.xml CHANGELOG .github
+
   cd ..
 }
 
@@ -117,10 +184,13 @@ make_source_release() {
   perl -pi -e "s#^version_short: .*#version_short: ${NEW_VERSION}#" _config.yml
   cd ..
 
-  git commit --author="$GIT_AUTHOR" -am "Commit for release $RELEASE_VERSION"
-  git remote add asf_push https://$USER_NAME@git-wip-us.apache.org/repos/asf/flink.git
-  RELEASE_HASH=`git rev-parse HEAD`
-  echo "Echo created release hash $RELEASE_HASH"
+  # local dist have no need to commit to remote
+  if [ ! IS_LOCAL_DIST ]; then
+    git commit --author="$GIT_AUTHOR" -am "Commit for release $RELEASE_VERSION"
+    git remote add asf_push https://$USER_NAME@$GIT_REPO
+    RELEASE_HASH=`git rev-parse HEAD`
+    echo "Echo created release hash $RELEASE_HASH"
+  fi
 
   cd ..
 
@@ -134,7 +204,7 @@ make_source_release() {
   rm -rf flink-$RELEASE_VERSION
 }
 
-
+# build maven package, create Flink distribution, generate signature
 make_binary_release() {
   NAME=$1
   FLAGS=$2
@@ -151,8 +221,8 @@ make_binary_release() {
   # enable release profile here (to check for the maven version)
   $MVN clean package $FLAGS -DskipTests -Prelease -Dgpg.skip
 
-  cd flink-dist/target/flink-$RELEASE_VERSION-bin/
-  tar czf "${dir_name}.tgz" flink-$RELEASE_VERSION
+  cd flink-dist/target/flink-*-bin/
+  tar czf "${dir_name}.tgz" flink-*
 
   cp flink-*.tgz ../../../../
   cd ../../../../
@@ -164,7 +234,6 @@ make_binary_release() {
     --detach-sig "${dir_name}.tgz"
   $MD5SUM "${dir_name}.tgz" > "${dir_name}.tgz.md5"
   $SHASUM "${dir_name}.tgz" > "${dir_name}.tgz.sha"
-
 }
 
 deploy_to_maven() {
@@ -188,7 +257,11 @@ deploy_to_maven() {
 copy_data() {
   # Copy data
   echo "Copying release tarballs"
-  folder=flink-$RELEASE_VERSION-$RELEASE_CANDIDATE
+  folder=flink-$RELEASE_VERSION
+  # candidate is not none, append it
+  if [ "$RELEASE_CANDIDATE" != "none" ]; then
+    folder=$folder-$RELEASE_CANDIDATE
+  fi
   sftp $USER_NAME@home.apache.org <<EOF
 mkdir public_html/$folder
 put flink-*.tgz* public_html/$folder
@@ -201,19 +274,34 @@ prepare
 
 make_source_release
 
-make_binary_release "hadoop2" "" 2.10
-make_binary_release "hadoop24" "-Dhadoop.version=2.4.1" 2.10
-make_binary_release "hadoop26" "-Dhadoop.version=2.6.3" 2.10
-make_binary_release "hadoop27" "-Dhadoop.version=2.7.2" 2.10
-
-make_binary_release "hadoop2" "" 2.11
-make_binary_release "hadoop24" "-Dhadoop.version=2.4.1" 2.11
-make_binary_release "hadoop26" "-Dhadoop.version=2.6.3" 2.11
-make_binary_release "hadoop27" "-Dhadoop.version=2.7.2" 2.11
-
-copy_data
-
-deploy_to_maven
+# build dist by input parameter of "--scala-vervion xxx --hadoop-version xxx"
+if [ "$SCALA_VERSION" == "none" ] && [ "$HADOOP_VERSION" == "none" ]; then
+  make_binary_release "hadoop2" "" 2.10
+  make_binary_release "hadoop24" "-Dhadoop.version=2.4.1" "2.10"
+  make_binary_release "hadoop26" "-Dhadoop.version=2.6.3" "2.10"
+  make_binary_release "hadoop27" "-Dhadoop.version=2.7.2" "2.10"
+
+  make_binary_release "hadoop2" "" 2.11
+  make_binary_release "hadoop24" "-Dhadoop.version=2.4.1" "2.11"
+  make_binary_release "hadoop26" "-Dhadoop.version=2.6.3" "2.11"
+  make_binary_release "hadoop27" "-Dhadoop.version=2.7.2" "2.11"
+elif [ "$SCALA_VERSION" == none ] && [ "$HADOOP_VERSION" != "none" ]
+then
+  make_binary_release "hadoop2" "-Dhadoop.version=$HADOOP_VERSION" "2.10"
+  make_binary_release "hadoop2" "-Dhadoop.version=$HADOOP_VERSION" "2.11"
+elif [ "$SCALA_VERSION" != none ] && [ "$HADOOP_VERSION" == "none" ]
+then
+  make_binary_release "hadoop2" "" $SCALA_VERSION
+  make_binary_release "hadoop24" "-Dhadoop.version=2.4.1" "$SCALA_VERSION"
+  make_binary_release "hadoop26" "-Dhadoop.version=2.6.3" "$SCALA_VERSION"
+  make_binary_release "hadoop27" "-Dhadoop.version=2.7.2" "$SCALA_VERSION"
+else
+  make_binary_release "hadoop2x" "-Dhadoop.version=$HADOOP_VERSION" "$SCALA_VERSION"
+fi
 
+if [ ! IS_LOCAL_DIST ] ; then
+    copy_data
+    deploy_to_maven
+fi
 
 echo "Done. Don't forget to commit the release version"