You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2016/05/11 06:21:24 UTC
[2/2] tajo git commit: TAJO-1686: Allow Tajo to use Hive UDF.
TAJO-1686: Allow Tajo to use Hive UDF.
Closes #929
Signed-off-by: Jihoon Son <ji...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/0f4bbb60
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/0f4bbb60
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/0f4bbb60
Branch: refs/heads/master
Commit: 0f4bbb60cd6c6ce22771a22683cfb544e25abe46
Parents: 45802cc
Author: Jongyoung Park <em...@gmail.com>
Authored: Wed May 11 15:20:28 2016 +0900
Committer: Jihoon Son <ji...@apache.org>
Committed: Wed May 11 15:20:28 2016 +0900
----------------------------------------------------------------------
CHANGES | 2 +
.../org/apache/tajo/catalog/FunctionDesc.java | 13 +-
.../tajo/catalog/FunctionDescBuilder.java | 105 +++++++++++
.../tajo/function/FunctionInvocation.java | 37 ++--
.../apache/tajo/function/FunctionSignature.java | 16 ++
.../tajo/function/PythonInvocationDesc.java | 104 -----------
.../apache/tajo/function/UDFInvocationDesc.java | 119 ++++++++++++
.../src/main/proto/CatalogProtos.proto | 15 +-
tajo-common/pom.xml | 4 +
.../java/org/apache/tajo/conf/TajoConf.java | 3 +
.../tajo/util/datetime/DateTimeConstants.java | 18 +-
.../apache/tajo/util/datetime/DateTimeUtil.java | 46 ++---
tajo-core-tests/pom.xml | 4 +
.../apache/tajo/engine/eval/ExprTestBase.java | 16 +-
.../engine/function/TestFunctionLoader.java | 42 -----
.../engine/function/hiveudf/HiveUDFtest.java | 47 +++++
.../hiveudf/TestHiveFunctionLoader.java | 49 +++++
.../function/hiveudf/TestHiveFunctions.java | 102 +++++++++++
.../src/test/resources/hiveudf/my_udf_src.zip | Bin 0 -> 5599 bytes
.../src/test/resources/hiveudf/myudf.jar | Bin 0 -> 5519 bytes
tajo-core/pom.xml | 8 +
.../tajo/engine/function/FunctionLoader.java | 56 +-----
.../function/hiveudf/HiveFunctionLoader.java | 179 +++++++++++++++++++
.../java/org/apache/tajo/master/TajoMaster.java | 53 +++++-
.../java/org/apache/tajo/worker/TajoWorker.java | 3 +-
tajo-plan/pom.xml | 4 +
.../org/apache/tajo/plan/LogicalPlanner.java | 16 +-
.../tajo/plan/function/FunctionInvoke.java | 2 +
.../tajo/plan/function/GeneralFunction.java | 5 -
.../tajo/plan/function/HiveFunctionInvoke.java | 121 +++++++++++++
.../function/python/PythonScriptEngine.java | 16 +-
.../tajo/plan/util/WritableTypeConverter.java | 144 +++++++++++++++
.../plan/util/TestWritableTypeConverter.java | 96 ++++++++++
tajo-project/pom.xml | 12 +-
.../thirdparty/orc/TreeReaderFactory.java | 2 +-
.../tajo/storage/thirdparty/orc/WriterImpl.java | 2 +-
36 files changed, 1156 insertions(+), 305 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/0f4bbb60/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 6469e9f..eec145c 100644
--- a/CHANGES
+++ b/CHANGES
@@ -4,6 +4,8 @@ Release 0.12.0 - unreleased
NEW FEATURES
+ TAJO-1686: Allow Tajo to use Hive UDF. (jihoon)
+
TAJO-2122: PullServer as an Auxiliary service of Yarn. (jihoon)
TAJO-2109: Implement Radix sort. (jihoon)
http://git-wip-us.apache.org/repos/asf/tajo/blob/0f4bbb60/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/FunctionDesc.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/FunctionDesc.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/FunctionDesc.java
index 8c10418..11641a2 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/FunctionDesc.java
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/FunctionDesc.java
@@ -46,9 +46,9 @@ public class FunctionDesc implements ProtoObject<FunctionDescProto>, Cloneable,
public FunctionDesc() {
}
- public FunctionDesc(String signature, Class<? extends Function> clazz,
+ public FunctionDesc(String funcName, Class<? extends Function> clazz,
FunctionType funcType, DataType retType, @NotNull DataType [] params) {
- this.signature = new FunctionSignature(funcType, signature.toLowerCase(), retType, params);
+ this.signature = new FunctionSignature(funcType, funcName.toLowerCase(), retType, params);
this.invocation = new FunctionInvocation();
this.invocation.setLegacy(new ClassBaseInvocationDesc<>(clazz));
this.supplement = new FunctionSupplement();
@@ -116,6 +116,10 @@ public class FunctionDesc implements ProtoObject<FunctionDescProto>, Cloneable,
return signature.getReturnType();
}
+ public boolean isDeterministic() {
+ return signature.isDeterministic();
+ }
+
////////////////////////////////////////
// Invocation
////////////////////////////////////////
@@ -161,7 +165,7 @@ public class FunctionDesc implements ProtoObject<FunctionDescProto>, Cloneable,
public int hashCodeWithoutType() {
return signature.hashCodeWithoutType();
}
-
+
@Override
public boolean equals(Object obj) {
if (obj instanceof FunctionDesc) {
@@ -174,7 +178,8 @@ public class FunctionDesc implements ProtoObject<FunctionDescProto>, Cloneable,
public boolean equalsSignature(Object obj) {
if (obj instanceof FunctionDesc) {
- return this.getSignature().equalsWithoutType(((FunctionDesc) obj).getSignature());
+ FunctionSignature targetSig = ((FunctionDesc)obj).getSignature();
+ return this.getSignature().equalsWithoutType(targetSig);
}
return false;
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/0f4bbb60/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/FunctionDescBuilder.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/FunctionDescBuilder.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/FunctionDescBuilder.java
new file mode 100644
index 0000000..dae97ab
--- /dev/null
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/FunctionDescBuilder.java
@@ -0,0 +1,105 @@
+/***
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.catalog;
+
+import org.apache.tajo.catalog.proto.CatalogProtos.FunctionType;
+import org.apache.tajo.common.TajoDataTypes.DataType;
+import org.apache.tajo.function.*;
+
+public class FunctionDescBuilder {
+ private String name = null;
+ private FunctionType funcType = null;
+ private DataType retType = null;
+ private DataType [] params = null;
+ private Class<? extends Function> clazz = null;
+ private UDFInvocationDesc udfInvocation = null;
+ private boolean isDeterministic = true;
+ private String description = null;
+ private String example = null;
+
+ public FunctionDescBuilder setName(String name) {
+ this.name = name;
+ return this;
+ }
+
+ public FunctionDescBuilder setFunctionType(FunctionType type) {
+ funcType = type;
+ return this;
+ }
+
+ public FunctionDescBuilder setReturnType(DataType type) {
+ retType = type;
+ return this;
+ }
+
+ public FunctionDescBuilder setParams(DataType [] params) {
+ this.params = params;
+ return this;
+ }
+
+ public FunctionDescBuilder setClass(Class<? extends Function> clazz) {
+ this.clazz = clazz;
+ return this;
+ }
+
+ public FunctionDescBuilder setDeterministic(boolean isDeterministic) {
+ this.isDeterministic = isDeterministic;
+ return this;
+ }
+
+ public FunctionDescBuilder setDescription(String desc) {
+ this.description = desc;
+ return this;
+ }
+
+ public FunctionDescBuilder setExample(String ex) {
+ this.example = ex;
+ return this;
+ }
+
+ public FunctionDescBuilder setUDF(UDFInvocationDesc udf) {
+ this.udfInvocation = udf;
+ return this;
+ }
+
+
+ public FunctionDesc build() {
+ FunctionInvocation invocation = new FunctionInvocation();
+ if (funcType == FunctionType.UDF) {
+ invocation.setUDF(udfInvocation);
+ }
+ else {
+ invocation.setLegacy(new ClassBaseInvocationDesc<>(clazz));
+ }
+
+ FunctionSupplement supplement = new FunctionSupplement();
+
+ if (description != null) {
+ supplement.setShortDescription(description);
+ }
+
+ if (example != null) {
+ supplement.setExample(example);
+ }
+
+ FunctionSignature signature = new FunctionSignature(funcType, name, retType, isDeterministic, params);
+
+ return new FunctionDesc(signature, invocation, supplement);
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/0f4bbb60/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/function/FunctionInvocation.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/function/FunctionInvocation.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/function/FunctionInvocation.java
index 09b056b..835fed4 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/function/FunctionInvocation.java
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/function/FunctionInvocation.java
@@ -20,6 +20,7 @@ package org.apache.tajo.function;
import com.google.common.base.Objects;
import com.google.gson.annotations.Expose;
+import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.common.ProtoObject;
import static org.apache.tajo.catalog.proto.CatalogProtos.FunctionInvocationProto;
@@ -36,7 +37,7 @@ public class FunctionInvocation implements ProtoObject<FunctionInvocationProto>
@Expose
ClassBaseInvocationDesc<?> aggregationJIT;
@Expose
- PythonInvocationDesc python;
+ UDFInvocationDesc udf;
public FunctionInvocation() {
}
@@ -57,8 +58,8 @@ public class FunctionInvocation implements ProtoObject<FunctionInvocationProto>
if (proto.hasAggregationJit()) {
this.aggregationJIT = new ClassBaseInvocationDesc(proto.getAggregation());
}
- if (proto.hasPython()) {
- this.python = new PythonInvocationDesc(proto.getPython());
+ if (proto.hasUdfInvocation()) {
+ this.udf = new UDFInvocationDesc(proto.getUdfInvocation());
}
}
@@ -127,27 +128,23 @@ public class FunctionInvocation implements ProtoObject<FunctionInvocationProto>
}
public boolean hasPython() {
- return python != null && python.isScalarFunction();
+ return udf != null && udf.getType() == CatalogProtos.UDFtype.PYTHON && udf.isScalarFunction();
}
- public void setPython(PythonInvocationDesc python) {
- this.python = python;
+ public boolean hasHiveUDF() {
+ return udf != null && udf.getType() == CatalogProtos.UDFtype.HIVE && udf.isScalarFunction();
}
- public PythonInvocationDesc getPython() {
- return python;
+ public void setUDF(UDFInvocationDesc udf) {
+ this.udf = udf;
}
- public boolean hasPythonAggregation() {
- return python != null && !python.isScalarFunction();
- }
-
- public void setPythonAggregation(PythonInvocationDesc pythonAggregation) {
- this.python = pythonAggregation;
+ public UDFInvocationDesc getUDF() {
+ return udf;
}
- public PythonInvocationDesc getPythonAggregation() {
- return this.python;
+ public boolean hasPythonAggregation() {
+ return udf != null && udf.getType() == CatalogProtos.UDFtype.PYTHON && !udf.isScalarFunction();
}
@Override
@@ -168,20 +165,20 @@ public class FunctionInvocation implements ProtoObject<FunctionInvocationProto>
if (hasAggregationJIT()) {
builder.setAggregationJit(aggregationJIT.getProto());
}
- if (hasPython() || hasPythonAggregation()) {
- builder.setPython(python.getProto());
+ if (hasPython() || hasPythonAggregation() || hasHiveUDF()) {
+ builder.setUdfInvocation(udf.getProto());
}
return builder.build();
}
@Override
public int hashCode() {
- return Objects.hashCode(legacy, scalar, scalarJIT, python);
+ return Objects.hashCode(legacy, scalar, scalarJIT, udf);
}
public String toString() {
return "legacy=" + hasLegacy() + ",scalar=" + hasScalar() + ",agg=" + hasAggregation() +
",scalarJIT=" + hasScalarJIT() + ",aggJIT=" + hasAggregationJIT() + ",python=" + hasPython() +
- ",aggPython=" + hasPythonAggregation();
+ ",aggPython=" + hasPythonAggregation() + ",hiveUDF="+hasHiveUDF();
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/0f4bbb60/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/function/FunctionSignature.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/function/FunctionSignature.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/function/FunctionSignature.java
index bf37647..7276d45 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/function/FunctionSignature.java
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/function/FunctionSignature.java
@@ -42,6 +42,10 @@ public class FunctionSignature implements Comparable<FunctionSignature>, ProtoOb
@Expose
private DataType returnType;
+ // for future use
+ @Expose
+ private boolean deterministic = true;
+
public FunctionSignature(FunctionType type, String name, DataType returnType, @NotNull DataType... params) {
this.functionType = type;
this.name = name;
@@ -54,6 +58,14 @@ public class FunctionSignature implements Comparable<FunctionSignature>, ProtoOb
this.name = proto.getName();
this.paramTypes = proto.getParameterTypesList().toArray(new DataType[proto.getParameterTypesCount()]);
this.returnType = proto.getReturnType();
+ if (proto.hasDeterministic()) {
+ this.deterministic = proto.getDeterministic();
+ }
+ }
+
+ public FunctionSignature(FunctionType type, String name, DataType returnType, boolean deterministic, @NotNull DataType... params) {
+ this(type, name, returnType, params);
+ this.deterministic = deterministic;
}
public FunctionType getFunctionType() {
@@ -72,6 +84,10 @@ public class FunctionSignature implements Comparable<FunctionSignature>, ProtoOb
return returnType;
}
+ public boolean isDeterministic() {
+ return deterministic;
+ }
+
@Override
public int hashCode() {
return Objects.hashCode(functionType, name, returnType, Objects.hashCode(paramTypes));
http://git-wip-us.apache.org/repos/asf/tajo/blob/0f4bbb60/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/function/PythonInvocationDesc.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/function/PythonInvocationDesc.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/function/PythonInvocationDesc.java
deleted file mode 100644
index d3365e5..0000000
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/function/PythonInvocationDesc.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.function;
-
-import com.google.common.base.Objects;
-import com.google.gson.annotations.Expose;
-import org.apache.tajo.catalog.proto.CatalogProtos.PythonInvocationDescProto;
-import org.apache.tajo.common.ProtoObject;
-import org.apache.tajo.util.TUtil;
-
-/**
- * <code>PythonInvocationDesc</code> describes a function name
- * and a file path to the script where the function is defined.
- */
-public class PythonInvocationDesc implements ProtoObject<PythonInvocationDescProto>, Cloneable {
- @Expose private boolean isScalarFunction; // true if udf, false if udaf
- @Expose private String funcOrClassName; // function name if udf, class name if udaf
- @Expose private String filePath; // file path to the python module
-
- /**
- * Constructor of {@link PythonInvocationDesc}.
- *
- * @param funcOrClassName if udf, function name. else, class name.
- * @param filePath path to script file
- * @param isScalarFunction
- */
- public PythonInvocationDesc(String funcOrClassName, String filePath, boolean isScalarFunction) {
- this.funcOrClassName = funcOrClassName;
- this.filePath = filePath;
- this.isScalarFunction = isScalarFunction;
- }
-
- public void setFilePath(String filePath) {
- this.filePath = filePath;
- }
-
- public PythonInvocationDesc(PythonInvocationDescProto proto) {
- this(proto.getFuncName(), proto.getFilePath(), proto.getIsScalarFunction());
- }
-
- public String getName() {
- return funcOrClassName;
- }
-
- public String getPath() {
- return filePath;
- }
-
- public boolean isScalarFunction() {
- return this.isScalarFunction;
- }
-
- @Override
- public PythonInvocationDescProto getProto() {
- PythonInvocationDescProto.Builder builder = PythonInvocationDescProto.newBuilder();
- builder.setFuncName(funcOrClassName).setFilePath(filePath).setIsScalarFunction(isScalarFunction);
- return builder.build();
- }
-
- @Override
- public boolean equals(Object o) {
- if (o instanceof PythonInvocationDesc) {
- PythonInvocationDesc other = (PythonInvocationDesc) o;
- return TUtil.checkEquals(funcOrClassName, other.funcOrClassName) &&
- TUtil.checkEquals(filePath, other.filePath) && isScalarFunction == other.isScalarFunction;
- }
- return false;
- }
-
- @Override
- public int hashCode() {
- return Objects.hashCode(funcOrClassName, filePath, isScalarFunction);
- }
-
- @Override
- public String toString() {
- return isScalarFunction ? "[UDF] " : "[UDAF] " + funcOrClassName + " at " + filePath;
- }
-
- @Override
- public Object clone() throws CloneNotSupportedException {
- PythonInvocationDesc clone = (PythonInvocationDesc) super.clone();
- clone.funcOrClassName = funcOrClassName == null ? null : funcOrClassName;
- clone.filePath = filePath == null ? null : filePath;
- clone.isScalarFunction = isScalarFunction;
- return clone;
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/0f4bbb60/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/function/UDFInvocationDesc.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/function/UDFInvocationDesc.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/function/UDFInvocationDesc.java
new file mode 100644
index 0000000..c57194c
--- /dev/null
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/function/UDFInvocationDesc.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.function;
+
+import com.google.common.base.Objects;
+import com.google.gson.annotations.Expose;
+import org.apache.tajo.catalog.proto.CatalogProtos.UDFtype;
+import org.apache.tajo.catalog.proto.CatalogProtos.UDFinvocationDescProto;
+import org.apache.tajo.common.ProtoObject;
+import org.apache.tajo.util.TUtil;
+
+/**
+ * <code>UDFInvocationDesc</code> describes a function name
+ * and a file path to the script where the function is defined.
+ */
+public class UDFInvocationDesc implements ProtoObject<UDFinvocationDescProto>, Cloneable {
+ @Expose private UDFtype type;
+ @Expose private boolean isScalarFunction; // true if udf, false if udaf
+ @Expose private String funcOrClassName; // function name if python udf, class name if python udaf or others
+ @Expose private String filePath; // file path to the python module, or jar path to hive udf
+
+ public UDFInvocationDesc(UDFtype type, String funcOrClassName, boolean isScalarFunction) {
+ this(type, funcOrClassName, null, isScalarFunction);
+ }
+
+ /**
+ * Constructor of {@link UDFInvocationDesc}.
+ *
+ * @param funcOrClassName if udf, function name. else, class name.
+ * @param filePath path to script file
+ * @param isScalarFunction
+ */
+ public UDFInvocationDesc(UDFtype type, String funcOrClassName, String filePath, boolean isScalarFunction) {
+ this.type = type;
+ this.funcOrClassName = funcOrClassName;
+ this.filePath = filePath;
+ this.isScalarFunction = isScalarFunction;
+ }
+
+ public void setFilePath(String filePath) {
+ this.filePath = filePath;
+ }
+
+ public UDFInvocationDesc(UDFinvocationDescProto proto) {
+ this(proto.getType(), proto.getFuncName(), proto.getFilePath(), proto.getIsScalarFunction());
+ }
+
+ public UDFtype getType() {
+ return type;
+ }
+
+ public String getName() {
+ return funcOrClassName;
+ }
+
+ public String getPath() {
+ return filePath;
+ }
+
+ public boolean isScalarFunction() {
+ return this.isScalarFunction;
+ }
+
+ @Override
+ public UDFinvocationDescProto getProto() {
+ UDFinvocationDescProto.Builder builder = UDFinvocationDescProto.newBuilder();
+ builder.setType(type).setFuncName(funcOrClassName).setIsScalarFunction(isScalarFunction);
+ if (filePath != null) {
+ builder.setFilePath(filePath);
+ }
+ return builder.build();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o instanceof UDFInvocationDesc) {
+ UDFInvocationDesc other = (UDFInvocationDesc) o;
+ return TUtil.checkEquals(funcOrClassName, other.funcOrClassName) &&
+ TUtil.checkEquals(filePath, other.filePath) && isScalarFunction == other.isScalarFunction;
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(funcOrClassName, filePath, isScalarFunction);
+ }
+
+ @Override
+ public String toString() {
+ return '['+type.toString()+'/'+ (isScalarFunction ? "UDF] " : "UDAF] ") +
+ funcOrClassName + (filePath != null ? (" at " + filePath) : "");
+ }
+
+ @Override
+ public Object clone() throws CloneNotSupportedException {
+ UDFInvocationDesc clone = (UDFInvocationDesc) super.clone();
+ clone.funcOrClassName = funcOrClassName == null ? null : funcOrClassName;
+ clone.filePath = filePath == null ? null : filePath;
+ clone.isScalarFunction = isScalarFunction;
+ return clone;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/0f4bbb60/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto b/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
index 2b34095..d064c62 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
+++ b/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
@@ -359,6 +359,7 @@ message FunctionSignatureProto {
required string name = 2;
required DataType return_type = 3;
repeated DataType parameter_types = 4;
+ optional bool deterministic = 5;
}
message FunctionSupplementProto {
@@ -373,7 +374,7 @@ message FunctionInvocationProto {
optional ClassBaseInvocationDescProto aggregation = 3;
optional StaticMethodInvocationDescProto scalar_jit = 4;
optional ClassBaseInvocationDescProto aggregation_jit = 5;
- optional PythonInvocationDescProto python = 6;
+ optional UDFinvocationDescProto udf_invocation = 6;
}
message ClassBaseInvocationDescProto {
@@ -387,10 +388,16 @@ message StaticMethodInvocationDescProto {
repeated string param_classes = 4;
}
-message PythonInvocationDescProto {
- required string func_name = 1;
- required string file_path = 2;
+enum UDFtype {
+ PYTHON = 0;
+ HIVE = 1;
+}
+
+message UDFinvocationDescProto {
+ required UDFtype type = 1;
+ required string func_name = 2;
required bool is_scalar_function = 3;
+ optional string file_path = 4;
}
message TableResponse {
http://git-wip-us.apache.org/repos/asf/tajo/blob/0f4bbb60/tajo-common/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-common/pom.xml b/tajo-common/pom.xml
index 06703b3..cf4eb36 100644
--- a/tajo-common/pom.xml
+++ b/tajo-common/pom.xml
@@ -226,6 +226,10 @@
<artifactId>snappy</artifactId>
</dependency>
<dependency>
+ <groupId>org.reflections</groupId>
+ <artifactId>reflections</artifactId>
+ </dependency>
+ <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
http://git-wip-us.apache.org/repos/asf/tajo/blob/0f4bbb60/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
index 6237bd1..6e3eaea 100644
--- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
@@ -303,6 +303,9 @@ public class TajoConf extends Configuration {
PYTHON_CODE_DIR("tajo.function.python.code-dir", ""),
PYTHON_CONTROLLER_LOG_DIR("tajo.function.python.controller.log-dir", ""),
+ // HIVE UDF
+ HIVE_UDF_DIR("tajo.function.hive.code-dir", "./lib/hiveudf"),
+
// Partition
PARTITION_DYNAMIC_BULK_INSERT_BATCH_SIZE("tajo.partition.dynamic.bulk-insert.batch-size", 1000),
http://git-wip-us.apache.org/repos/asf/tajo/blob/0f4bbb60/tajo-common/src/main/java/org/apache/tajo/util/datetime/DateTimeConstants.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/util/datetime/DateTimeConstants.java b/tajo-common/src/main/java/org/apache/tajo/util/datetime/DateTimeConstants.java
index e610dee..002ae0b 100644
--- a/tajo-common/src/main/java/org/apache/tajo/util/datetime/DateTimeConstants.java
+++ b/tajo-common/src/main/java/org/apache/tajo/util/datetime/DateTimeConstants.java
@@ -55,18 +55,18 @@ public class DateTimeConstants {
/** avoid floating-point computation */
public static final int SECS_PER_YEAR = 36525 * 864;
- public static final int SECS_PER_DAY = 86400;
+ public static final int SECS_PER_DAY = 86_400;
public static final int SECS_PER_HOUR = 3600;
public static final int SECS_PER_MINUTE = 60;
public static final int MINS_PER_HOUR = 60;
- public static final long MSECS_PER_DAY = 86400000L;
+ public static final long MSECS_PER_DAY = 86_400_000L;
public static final long MSECS_PER_SEC = 1000L;
- public static final long USECS_PER_DAY = 86400000000L;
- public static final long USECS_PER_HOUR = 3600000000L;
- public static final long USECS_PER_MINUTE = 60000000L;
- public static final long USECS_PER_SEC = 1000000L;
+ public static final long USECS_PER_DAY = 86_400_000_000L;
+ public static final long USECS_PER_HOUR = 3_600_000_000L;
+ public static final long USECS_PER_MINUTE = 60_000_000L;
+ public static final long USECS_PER_SEC = 1_000_000L;
public static final long USECS_PER_MSEC = 1000L;
public static final int JULIAN_MINYEAR = -4713;
@@ -100,11 +100,11 @@ public class DateTimeConstants {
// Julian-date equivalents of Day 0 in Unix and Postgres reckoning
/** == DateTimeUtil.toJulianDate(1970, 1, 1) */
- public static final int UNIX_EPOCH_JDATE = 2440588;
+ public static final int UNIX_EPOCH_JDATE = 2_440_588;
/** == DateTimeUtil.toJulianDate(2000, 1, 1) */
- public static final int POSTGRES_EPOCH_JDATE = 2451545;
+ public static final int POSTGRES_EPOCH_JDATE = 2_451_545;
/** == (POSTGRES_EPOCH_JDATE * SECS_PER_DAY) - (UNIX_EPOCH_JDATE * SECS_PER_DAY); */
- public static final long SECS_DIFFERENCE_BETWEEN_JULIAN_AND_UNIXTIME = 946684800;
+ public static final long SECS_DIFFERENCE_BETWEEN_JULIAN_AND_UNIXTIME = 946_684_800;
public static final int MAX_TZDISP_HOUR = 15; /* maximum allowed hour part */
public static final int TZDISP_LIMIT = ((MAX_TZDISP_HOUR + 1) * SECS_PER_HOUR);
http://git-wip-us.apache.org/repos/asf/tajo/blob/0f4bbb60/tajo-common/src/main/java/org/apache/tajo/util/datetime/DateTimeUtil.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/util/datetime/DateTimeUtil.java b/tajo-common/src/main/java/org/apache/tajo/util/datetime/DateTimeUtil.java
index 0bf58aa..8f43c7e 100644
--- a/tajo-common/src/main/java/org/apache/tajo/util/datetime/DateTimeUtil.java
+++ b/tajo-common/src/main/java/org/apache/tajo/util/datetime/DateTimeUtil.java
@@ -44,8 +44,6 @@ public class DateTimeUtil {
/** maximum possible number of fields in a date * string */
private static int MAXDATEFIELDS = 25;
- public final static int DAYS_FROM_JULIAN_TO_EPOCH = 2440588;
-
public static boolean isJulianCalendar(int year, int month, int day) {
return year <= 1752 && month <= 9 && day < 14;
}
@@ -308,12 +306,12 @@ public class DateTimeUtil {
/**
* Converts julian timestamp to java timestamp.
- * @param timestamp
- * @return
+ * @param timestamp julian time in millisecond
+ * @return java time in millisecond
*/
public static long julianTimeToJavaTime(long timestamp) {
double totalSecs = (double)timestamp / (double)DateTimeConstants.MSECS_PER_SEC;
- return (long)(Math.round(totalSecs + DateTimeConstants.SECS_DIFFERENCE_BETWEEN_JULIAN_AND_UNIXTIME * 1000.0));
+ return Math.round(totalSecs + DateTimeConstants.SECS_DIFFERENCE_BETWEEN_JULIAN_AND_UNIXTIME * 1000.0);
}
/**
@@ -2137,43 +2135,28 @@ public class DateTimeUtil {
}
public static long getDay(TimeMeta dateTime) {
- long usecs = 0;
-
- usecs = julianTimeToJavaTime(toJulianTimestamp(dateTime.years, dateTime.monthOfYear, dateTime.dayOfMonth,
+ return julianTimeToJavaTime(toJulianTimestamp(dateTime.years, dateTime.monthOfYear, dateTime.dayOfMonth,
0, 0, 0, 0)) * DateTimeConstants.USECS_PER_MSEC;
- return usecs;
}
public static long getHour(TimeMeta dateTime) {
- long usecs = 0;
-
- usecs = julianTimeToJavaTime(toJulianTimestamp(dateTime.years, dateTime.monthOfYear, dateTime.dayOfMonth,
+ return julianTimeToJavaTime(toJulianTimestamp(dateTime.years, dateTime.monthOfYear, dateTime.dayOfMonth,
dateTime.hours, 0, 0, 0)) * DateTimeConstants.USECS_PER_MSEC;
- return usecs;
}
public static long getMinute(TimeMeta dateTime) {
- long usecs = 0;
-
- usecs = julianTimeToJavaTime(toJulianTimestamp(dateTime.years, dateTime.monthOfYear, dateTime.dayOfMonth,
+ return julianTimeToJavaTime(toJulianTimestamp(dateTime.years, dateTime.monthOfYear, dateTime.dayOfMonth,
dateTime.hours, dateTime.minutes, 0, 0)) * DateTimeConstants.USECS_PER_MSEC;
- return usecs;
}
public static long getSecond(TimeMeta dateTime) {
- long usecs = 0;
-
- usecs = julianTimeToJavaTime(toJulianTimestamp(dateTime.years, dateTime.monthOfYear, dateTime.dayOfMonth,
+ return julianTimeToJavaTime(toJulianTimestamp(dateTime.years, dateTime.monthOfYear, dateTime.dayOfMonth,
dateTime.hours, dateTime.minutes, dateTime.secs, 0)) * DateTimeConstants.USECS_PER_MSEC;
- return usecs;
}
public static long getMonth(TimeMeta dateTime) {
- long usecs = 0;
-
- usecs = julianTimeToJavaTime(toJulianTimestamp(dateTime.years, dateTime.monthOfYear, 1, 0, 0, 0, 0)) *
+ return julianTimeToJavaTime(toJulianTimestamp(dateTime.years, dateTime.monthOfYear, 1, 0, 0, 0, 0)) *
DateTimeConstants.USECS_PER_MSEC;
- return usecs;
}
public static long getDayOfWeek(TimeMeta dateTime, int weekday) {
@@ -2183,22 +2166,17 @@ public class DateTimeUtil {
int week = date2isoweek(dateTime.years, dateTime.monthOfYear, dateTime.dayOfMonth);
int jday = isoweek2j(dateTime.years, week);
- long usecs = 0;
-
+
jday += (weekday - 1);
jday -= DateTimeConstants.POSTGRES_EPOCH_JDATE;
- usecs = julianTimeToJavaTime(toJulianTimestamp(jday, 0, 0, 0, 0)) *
- DateTimeConstants.USECS_PER_MSEC;
- return usecs;
+
+ return julianTimeToJavaTime(toJulianTimestamp(jday, 0, 0, 0, 0)) * DateTimeConstants.USECS_PER_MSEC;
}
public static long getYear(TimeMeta dateTime) {
- long usecs = 0;
-
- usecs = julianTimeToJavaTime(toJulianTimestamp(dateTime.years, 1, 1, 0, 0, 0, 0)) *
+ return julianTimeToJavaTime(toJulianTimestamp(dateTime.years, 1, 1, 0, 0, 0, 0)) *
DateTimeConstants.USECS_PER_MSEC;
- return usecs;
}
public static TimeMeta getUTCDateTime(Int8Datum int8Datum){
http://git-wip-us.apache.org/repos/asf/tajo/blob/0f4bbb60/tajo-core-tests/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-core-tests/pom.xml b/tajo-core-tests/pom.xml
index 3554df4..74106d1 100644
--- a/tajo-core-tests/pom.xml
+++ b/tajo-core-tests/pom.xml
@@ -327,6 +327,10 @@
</exclusion>
</exclusions>
</dependency>
+ <dependency>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-exec</artifactId>
+ </dependency>
<dependency>
<groupId>com.github.stephenc.jcip</groupId>
http://git-wip-us.apache.org/repos/asf/tajo/blob/0f4bbb60/tajo-core-tests/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java
index 38911c4..723a87f 100644
--- a/tajo-core-tests/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java
@@ -18,6 +18,7 @@
package org.apache.tajo.engine.eval;
+import com.google.common.base.Preconditions;
import io.netty.buffer.Unpooled;
import org.apache.commons.lang.StringEscapeUtils;
import org.apache.tajo.*;
@@ -32,6 +33,7 @@ import org.apache.tajo.datum.*;
import org.apache.tajo.engine.codegen.EvalCodeGenerator;
import org.apache.tajo.engine.codegen.TajoClassLoader;
import org.apache.tajo.engine.function.FunctionLoader;
+import org.apache.tajo.engine.function.hiveudf.HiveFunctionLoader;
import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.exception.TajoException;
import org.apache.tajo.exception.TajoInternalError;
@@ -61,6 +63,7 @@ import org.junit.BeforeClass;
import java.io.File;
import java.io.IOException;
+import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -97,7 +100,14 @@ public class ExprTestBase {
cat.createDatabase(DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME);
Map<FunctionSignature, FunctionDesc> map = FunctionLoader.loadBuiltinFunctions();
List<FunctionDesc> list = new ArrayList<>(map.values());
- list.addAll(FunctionLoader.loadUserDefinedFunctions(conf));
+ list.addAll(FunctionLoader.loadUserDefinedFunctions(conf).orElse(new ArrayList<>()));
+
+ // load Hive UDFs
+ URL hiveUDFURL = ClassLoader.getSystemResource("hiveudf");
+ Preconditions.checkNotNull(hiveUDFURL, "hive udf directory is absent.");
+ conf.set(TajoConf.ConfVars.HIVE_UDF_DIR.varname, hiveUDFURL.toString().substring("file:".length()));
+ list.addAll(HiveFunctionLoader.loadHiveUDFs(conf).orElse(new ArrayList<>()));
+
for (FunctionDesc funcDesc : list) {
cat.createFunction(funcDesc);
}
@@ -118,6 +128,10 @@ public class ExprTestBase {
return new TajoConf(conf);
}
+ protected TajoTestingCluster getCluster() {
+ return cluster;
+ }
+
/**
* verify query syntax and get raw targets.
*
http://git-wip-us.apache.org/repos/asf/tajo/blob/0f4bbb60/tajo-core-tests/src/test/java/org/apache/tajo/engine/function/TestFunctionLoader.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/function/TestFunctionLoader.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/function/TestFunctionLoader.java
index 26f3262..5f1ed68 100644
--- a/tajo-core-tests/src/test/java/org/apache/tajo/engine/function/TestFunctionLoader.java
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/function/TestFunctionLoader.java
@@ -41,7 +41,6 @@ import static org.junit.Assert.assertTrue;
public class TestFunctionLoader {
-
@Test
public void testFindScalarFunctions() throws IOException {
List<FunctionDesc> collections = Lists.newArrayList(FunctionLoader.findScalarFunctions());
@@ -51,45 +50,4 @@ public class TestFunctionLoader {
String result = getResultText(TestFunctionLoader.class, "testFindScalarFunctions.result");
assertEquals(result.trim(), functionList.trim());
}
-
- @Test
- public void testAmbiguousException() {
- FunctionSignature signature = new FunctionSignature(CatalogProtos.FunctionType.GENERAL, "test1",
- CatalogUtil.newSimpleDataType(TajoDataTypes.Type.TEXT),
- CatalogUtil.newSimpleDataType(TajoDataTypes.Type.FLOAT8));
-
- FunctionInvocation invocation = new FunctionInvocation();
- FunctionSupplement supplement = new FunctionSupplement();
-
- FunctionDesc desc = new FunctionDesc(signature, invocation, supplement);
-
- List<FunctionDesc> builtins = new ArrayList<>();
- builtins.add(desc);
-
- signature = new FunctionSignature(CatalogProtos.FunctionType.GENERAL, "test2",
- CatalogUtil.newSimpleDataType(TajoDataTypes.Type.INT8),
- CatalogUtil.newSimpleDataType(TajoDataTypes.Type.INT8));
-
- desc = new FunctionDesc(signature, invocation, supplement);
- builtins.add(desc);
-
- List<FunctionDesc> udfs = new ArrayList<>();
-
- signature = new FunctionSignature(CatalogProtos.FunctionType.UDF, "test1",
- CatalogUtil.newSimpleDataType(TajoDataTypes.Type.TEXT),
- CatalogUtil.newSimpleDataType(TajoDataTypes.Type.FLOAT8));
-
- desc = new FunctionDesc(signature, invocation, supplement);
- udfs.add(desc);
-
- boolean afexOccurs = false;
-
- try {
- FunctionLoader.mergeFunctionLists(builtins, udfs);
- } catch (AmbiguousFunctionException e) {
- afexOccurs = true;
- }
-
- assertTrue(afexOccurs);
- }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/0f4bbb60/tajo-core-tests/src/test/java/org/apache/tajo/engine/function/hiveudf/HiveUDFtest.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/function/hiveudf/HiveUDFtest.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/function/hiveudf/HiveUDFtest.java
new file mode 100644
index 0000000..590d8ec
--- /dev/null
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/function/hiveudf/HiveUDFtest.java
@@ -0,0 +1,47 @@
+/***
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.function.hiveudf;
+
+import org.apache.hadoop.hive.ql.exec.Description;
+import org.apache.hadoop.hive.ql.exec.UDF;
+import org.apache.hadoop.hive.ql.udf.UDFType;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+
+/**
+ * Hive UDF sample class for testing.
+ * Actually it's deterministic, but it is set as false to test.
+ */
+@UDFType(deterministic = false)
+@Description(
+ name="multiplestr",
+ value = "repeat string",
+ extended = "multiplestr(str, num)"
+)
+public class HiveUDFtest extends UDF {
+ public Text evaluate(Text str, IntWritable num) {
+ String origin = str.toString();
+
+ for (int i=0; i<num.get()-1; i++) {
+ origin += origin;
+ }
+
+ return new Text(origin);
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/0f4bbb60/tajo-core-tests/src/test/java/org/apache/tajo/engine/function/hiveudf/TestHiveFunctionLoader.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/function/hiveudf/TestHiveFunctionLoader.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/function/hiveudf/TestHiveFunctionLoader.java
new file mode 100644
index 0000000..ef5d3fe
--- /dev/null
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/function/hiveudf/TestHiveFunctionLoader.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.function.hiveudf;
+
+import org.apache.hadoop.hive.ql.exec.UDF;
+import org.apache.tajo.catalog.FunctionDesc;
+import org.apache.tajo.common.TajoDataTypes;
+import org.junit.Test;
+
+import java.util.*;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestHiveFunctionLoader {
+ @Test
+ public void testAnalyzeUDFclass() {
+ Set<Class<? extends UDF>> funcSet = new HashSet<>();
+ funcSet.add(HiveUDFtest.class);
+ List<FunctionDesc> funcList = new LinkedList<>();
+
+ HiveFunctionLoader.buildFunctionsFromUDF(funcSet, funcList, null);
+
+ assertEquals(funcList.size(), 1);
+
+ FunctionDesc desc = funcList.get(0);
+
+ assertEquals("multiplestr", desc.getFunctionName());
+ assertEquals(false, desc.isDeterministic());
+ assertEquals(TajoDataTypes.Type.TEXT, desc.getReturnType().getType());
+ assertEquals(TajoDataTypes.Type.TEXT, desc.getParamTypes()[0].getType());
+ assertEquals(TajoDataTypes.Type.INT4, desc.getParamTypes()[1].getType());
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/0f4bbb60/tajo-core-tests/src/test/java/org/apache/tajo/engine/function/hiveudf/TestHiveFunctions.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/function/hiveudf/TestHiveFunctions.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/function/hiveudf/TestHiveFunctions.java
new file mode 100644
index 0000000..a2c89fb
--- /dev/null
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/function/hiveudf/TestHiveFunctions.java
@@ -0,0 +1,102 @@
+/***
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.function.hiveudf;
+
+import org.apache.tajo.catalog.CatalogService;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.FunctionDesc;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.engine.eval.ExprTestBase;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestHiveFunctions extends ExprTestBase {
+ @Test
+ public void testFindFunction() throws Exception {
+ CatalogService catService = getCluster().getCatalogService();
+
+ FunctionDesc desc = catService.getFunction("my_upper", CatalogProtos.FunctionType.UDF,
+ CatalogUtil.newSimpleDataType(TajoDataTypes.Type.TEXT));
+
+ assertEquals(TajoDataTypes.Type.TEXT, desc.getReturnType().getType());
+ assertEquals(1, desc.getParamTypes().length);
+ assertEquals(TajoDataTypes.Type.TEXT, desc.getParamTypes()[0].getType());
+ assertEquals("to uppercase", desc.getDescription());
+
+ TajoDataTypes.DataType int4type = CatalogUtil.newSimpleDataType(TajoDataTypes.Type.INT4);
+ desc = catService.getFunction("my_divide", CatalogProtos.FunctionType.UDF, int4type, int4type);
+
+ assertEquals(TajoDataTypes.Type.FLOAT8, desc.getReturnType().getType());
+ assertEquals(2, desc.getParamTypes().length);
+ assertEquals(TajoDataTypes.Type.INT4, desc.getParamTypes()[0].getType());
+ assertEquals(TajoDataTypes.Type.INT4, desc.getParamTypes()[1].getType());
+
+ // synonym
+ desc = catService.getFunction("test_upper", CatalogProtos.FunctionType.UDF,
+ CatalogUtil.newSimpleDataType(TajoDataTypes.Type.TEXT));
+
+ assertEquals(TajoDataTypes.Type.TEXT, desc.getReturnType().getType());
+ assertEquals(1, desc.getParamTypes().length);
+ assertEquals(TajoDataTypes.Type.TEXT, desc.getParamTypes()[0].getType());
+ assertEquals("to uppercase", desc.getDescription());
+
+ // Test for UDF without @Description and including multi 'evaluate()'
+ desc = catService.getFunction("com_example_hive_udf_MyLower", CatalogProtos.FunctionType.UDF,
+ CatalogUtil.newSimpleDataType(TajoDataTypes.Type.TEXT));
+
+ assertEquals(TajoDataTypes.Type.TEXT, desc.getReturnType().getType());
+ assertEquals(1, desc.getParamTypes().length);
+ assertEquals(TajoDataTypes.Type.TEXT, desc.getParamTypes()[0].getType());
+
+ // same function for another parameter signature
+ desc = catService.getFunction("com_example_hive_udf_MyLower", CatalogProtos.FunctionType.UDF,
+ CatalogUtil.newSimpleDataType(TajoDataTypes.Type.TEXT), CatalogUtil.newSimpleDataType(TajoDataTypes.Type.TEXT));
+
+ assertEquals(TajoDataTypes.Type.TEXT, desc.getReturnType().getType());
+ assertEquals(2, desc.getParamTypes().length);
+ assertEquals(TajoDataTypes.Type.TEXT, desc.getParamTypes()[0].getType());
+ assertEquals(TajoDataTypes.Type.TEXT, desc.getParamTypes()[1].getType());
+
+ // multiple signatures
+ // my_substr has two types, (Text, IntWritable) and (Text, IntWritable, IntWritable)
+ desc = catService.getFunction("my_substr", CatalogProtos.FunctionType.UDF,
+ CatalogUtil.newSimpleDataType(TajoDataTypes.Type.TEXT), CatalogUtil.newSimpleDataType(TajoDataTypes.Type.INT4),
+ CatalogUtil.newSimpleDataType(TajoDataTypes.Type.INT4));
+ assertEquals(TajoDataTypes.Type.TEXT, desc.getReturnType().getType());
+ assertEquals(3, desc.getParamTypes().length);
+
+ desc = catService.getFunction("my_substr", CatalogProtos.FunctionType.UDF,
+ CatalogUtil.newSimpleDataType(TajoDataTypes.Type.TEXT), CatalogUtil.newSimpleDataType(TajoDataTypes.Type.INT4));
+ assertEquals(TajoDataTypes.Type.TEXT, desc.getReturnType().getType());
+ assertEquals(2, desc.getParamTypes().length);
+ }
+
+ @Test
+ public void testRunFunctions() throws Exception {
+ testSimpleEval("select my_upper(null)", new String [] {"NULL"});
+ testSimpleEval("select my_upper('abcd')", new String [] {"ABCD"});
+ testSimpleEval("select my_divide(1,2)", new String [] {"0.5"});
+
+ // my_substr() uses 1-based index
+ testSimpleEval("select my_substr('abcde', 3)", new String [] {"cde"});
+ testSimpleEval("select my_substr('abcde', 1, 2)", new String [] {"ab"});
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/0f4bbb60/tajo-core-tests/src/test/resources/hiveudf/my_udf_src.zip
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/resources/hiveudf/my_udf_src.zip b/tajo-core-tests/src/test/resources/hiveudf/my_udf_src.zip
new file mode 100644
index 0000000..62c9b5c
Binary files /dev/null and b/tajo-core-tests/src/test/resources/hiveudf/my_udf_src.zip differ
http://git-wip-us.apache.org/repos/asf/tajo/blob/0f4bbb60/tajo-core-tests/src/test/resources/hiveudf/myudf.jar
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/resources/hiveudf/myudf.jar b/tajo-core-tests/src/test/resources/hiveudf/myudf.jar
new file mode 100644
index 0000000..8124353
Binary files /dev/null and b/tajo-core-tests/src/test/resources/hiveudf/myudf.jar differ
http://git-wip-us.apache.org/repos/asf/tajo/blob/0f4bbb60/tajo-core/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-core/pom.xml b/tajo-core/pom.xml
index f4e1b9e..43464e3 100644
--- a/tajo-core/pom.xml
+++ b/tajo-core/pom.xml
@@ -252,6 +252,10 @@
<artifactId>hadoop-yarn-server-common</artifactId>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-exec</artifactId>
+ </dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
@@ -306,6 +310,10 @@
<groupId>com.jayway.jsonpath</groupId>
<artifactId>json-path</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.reflections</groupId>
+ <artifactId>reflections</artifactId>
+ </dependency>
</dependencies>
<profiles>
http://git-wip-us.apache.org/repos/asf/tajo/blob/0f4bbb60/tajo-core/src/main/java/org/apache/tajo/engine/function/FunctionLoader.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/function/FunctionLoader.java b/tajo-core/src/main/java/org/apache/tajo/engine/function/FunctionLoader.java
index d7fbf88..c309c64 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/function/FunctionLoader.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/function/FunctionLoader.java
@@ -36,7 +36,6 @@ import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.engine.function.annotation.Description;
import org.apache.tajo.engine.function.annotation.ParamOptionTypes;
import org.apache.tajo.engine.function.annotation.ParamTypes;
-import org.apache.tajo.exception.AmbiguousFunctionException;
import org.apache.tajo.function.*;
import org.apache.tajo.plan.function.python.PythonScriptEngine;
import org.apache.tajo.util.ClassUtil;
@@ -45,7 +44,6 @@ import java.io.IOException;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.util.*;
-import java.util.stream.Collectors;
import static org.apache.tajo.catalog.proto.CatalogProtos.FunctionType.GENERAL;
@@ -90,7 +88,7 @@ public class FunctionLoader {
* @return
* @throws IOException
*/
- public static List<FunctionDesc> loadUserDefinedFunctions(TajoConf conf)
+ public static Optional<List<FunctionDesc>> loadUserDefinedFunctions(TajoConf conf)
throws IOException {
List<FunctionDesc> functionList = new LinkedList<>();
@@ -117,12 +115,13 @@ public class FunctionLoader {
filePaths.add(codePath);
}
for (Path filePath : filePaths) {
- PythonScriptEngine.registerFunctions(filePath.toUri(), FunctionLoader.PYTHON_FUNCTION_NAMESPACE).forEach(functionList::add);
+ PythonScriptEngine.registerFunctions(filePath.toUri(), FunctionLoader.PYTHON_FUNCTION_NAMESPACE).
+ forEach(functionList::add);
}
}
}
- return functionList;
+ return Optional.of(functionList);
}
public static Set<FunctionDesc> findScalarFunctions() {
@@ -284,51 +283,4 @@ public class FunctionLoader {
return sqlFuncs;
}
-
- public static Collection<FunctionDesc> loadFunctions(TajoConf conf) throws IOException, AmbiguousFunctionException {
- List<FunctionDesc> functionList = new ArrayList<>(loadBuiltinFunctions().values());
- List<FunctionDesc> udfs = loadUserDefinedFunctions(conf);
-
- /* NOTE:
- * For built-in functions, it is not done to check duplicates.
- * There are two reasons.
- * Firstly, it could be an useless operation in most of cases because built-in functions are not changed frequently
- * but checking will be done each startup.
- * Secondly, this logic checks duplicate excluding type, but there are already duplicates in built-in functions
- * such as sum with/without 'distinct' feature. So to check duplicates in built-in functions, some other logic is needed.
- * It should be another issue.
- */
- // merge lists and return it.
- return mergeFunctionLists(functionList, udfs);
- }
-
- @SafeVarargs
- static Collection<FunctionDesc> mergeFunctionLists(List<FunctionDesc> ... functionLists)
- throws AmbiguousFunctionException {
-
- Map<Integer, FunctionDesc> funcMap = new HashMap<>();
- List<FunctionDesc> baseFuncList = functionLists[0];
-
- // Build a map with a first list
- for (FunctionDesc desc: baseFuncList) {
- funcMap.put(desc.hashCodeWithoutType(), desc);
- }
-
- // Check duplicates for other function lists(should be UDFs practically)
- for (int i=1; i<functionLists.length; i++) {
- for (FunctionDesc desc: functionLists[i]) {
- if (funcMap.containsKey(desc.hashCodeWithoutType())) {
- FunctionDesc storedDesc = funcMap.get(desc.hashCodeWithoutType());
- if (storedDesc.equalsSignature(desc)) {
- throw new AmbiguousFunctionException(String.format("%s", storedDesc.toString()));
- }
- }
-
- funcMap.put(desc.hashCodeWithoutType(), desc);
- baseFuncList.add(desc);
- }
- }
-
- return baseFuncList;
- }
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/0f4bbb60/tajo-core/src/main/java/org/apache/tajo/engine/function/hiveudf/HiveFunctionLoader.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/function/hiveudf/HiveFunctionLoader.java b/tajo-core/src/main/java/org/apache/tajo/engine/function/hiveudf/HiveFunctionLoader.java
new file mode 100644
index 0000000..98ae4cd
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/function/hiveudf/HiveFunctionLoader.java
@@ -0,0 +1,179 @@
+/***
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.function.hiveudf;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.Description;
+import org.apache.hadoop.hive.ql.exec.UDF;
+import org.apache.hadoop.hive.ql.udf.UDFType;
+import org.apache.hadoop.io.Writable;
+import org.apache.tajo.catalog.FunctionDesc;
+import org.apache.tajo.catalog.FunctionDescBuilder;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.exception.TajoInternalError;
+import org.apache.tajo.exception.UnsupportedDataTypeException;
+import org.apache.tajo.function.UDFInvocationDesc;
+import org.apache.tajo.plan.util.WritableTypeConverter;
+import org.reflections.Reflections;
+import org.reflections.util.ConfigurationBuilder;
+
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.*;
+
+public class HiveFunctionLoader {
+ private static final Log LOG = LogFactory.getLog(HiveFunctionLoader.class);
+
+ public static Optional<List<FunctionDesc>> loadHiveUDFs(TajoConf conf) {
+ ArrayList<FunctionDesc> funcList = new ArrayList<>();
+ String udfdir = conf.getVar(TajoConf.ConfVars.HIVE_UDF_DIR);
+
+ try {
+ // Currently Hive udf jar must be on local filesystem
+ FileSystem fs = FileSystem.getLocal(conf);
+ Path udfPath = new Path(udfdir);
+
+ if (!fs.isDirectory(udfPath)) {
+ LOG.warn("Hive UDF directory doesn't exist : "+udfdir);
+ return Optional.empty();
+ }
+
+ // Read jar paths from the directory and change to URLs
+ URL [] urls = Arrays.stream(fs.listStatus(udfPath, (Path path) -> path.getName().endsWith(".jar")))
+ .map(fstatus -> {
+ try {
+ return new URL("jar:" + fstatus.getPath().toUri().toURL() + "!/");
+ } catch (MalformedURLException e) {
+ e.printStackTrace();
+ }
+
+ return null;
+ })
+ .toArray(URL[]::new);
+
+ // Extract UDF classes and build function information
+ Set<Class<? extends UDF>> udfClasses = getSubclassesFromJarEntry(urls, UDF.class);
+ buildFunctionsFromUDF(udfClasses, funcList, "jar:"+urls[0].getPath());
+
+ } catch (IOException e) {
+ throw new TajoInternalError(e);
+ }
+
+ return Optional.of(funcList);
+ }
+
+ private static <T> Set<Class<? extends T>> getSubclassesFromJarEntry(URL[] urls, Class<T> targetCls) {
+ Reflections refl = new Reflections(new ConfigurationBuilder().
+ setUrls(urls).
+ addClassLoader(new URLClassLoader(urls)));
+
+ return refl.getSubTypesOf(targetCls);
+ }
+
+ static void buildFunctionsFromUDF(Set<Class<? extends UDF>> classes, List<FunctionDesc> list, String jarurl) {
+ for (Class<? extends UDF> clazz: classes) {
+ String [] names;
+ String value = null, extended = null;
+
+ Description desc = clazz.getAnnotation(Description.class);
+
+ // Check @Description annotation (if exists)
+ if (desc != null) {
+ names = desc.name().split(",");
+ for (int i=0; i<names.length; i++) {
+ names[i] = names[i].trim();
+ }
+
+ value = desc.value();
+ extended = desc.extended();
+ }
+ else {
+ names = new String [] {clazz.getName().replace('.','_')};
+ }
+
+ // actual function descriptor building
+ FunctionDescBuilder builder = new FunctionDescBuilder();
+
+ UDFType type = clazz.getDeclaredAnnotation(UDFType.class);
+ if (type != null) {
+ builder.setDeterministic(type.deterministic());
+ }
+
+ builder.setFunctionType(CatalogProtos.FunctionType.UDF);
+
+ if (value != null) {
+ builder.setDescription(value);
+ }
+
+ if (extended != null) {
+ builder.setExample(extended);
+ }
+
+ UDFInvocationDesc udfInvocation = new UDFInvocationDesc(CatalogProtos.UDFtype.HIVE, clazz.getName(), jarurl, true);
+
+ // verify 'evaluate' method and extract return type and parameter types
+ for (Method method: clazz.getMethods()) {
+ if (method.getName().equals("evaluate")) {
+ registerMethod(method, names, udfInvocation, builder, list);
+ }
+ }
+ }
+ }
+
+ private static void registerMethod(Method method, String [] names, UDFInvocationDesc udfInvocation,
+ FunctionDescBuilder builder, List<FunctionDesc> list) {
+ try {
+ TajoDataTypes.DataType retType =
+ WritableTypeConverter.convertWritableToTajoType((Class<? extends Writable>) method.getReturnType());
+ TajoDataTypes.DataType[] params = convertTajoParamterTypes(method.getParameterTypes());
+
+ builder.setReturnType(retType).setParams(params);
+
+ for (String name : names) {
+ builder.setName(name);
+ builder.setUDF(udfInvocation);
+ list.add(builder.build());
+ }
+ } catch (UnsupportedDataTypeException e) {
+ LOG.error(String.format("Hive UDF '%s' is not registered because of unsupported type: %s", names[0], e.getMessage()));
+ }
+ }
+
+ private static TajoDataTypes.DataType[] convertTajoParamterTypes(Class[] hiveUDFparams)
+ throws UnsupportedDataTypeException {
+ TajoDataTypes.DataType [] params = null;
+
+ // convert types to ones of Tajo
+ if (hiveUDFparams != null && hiveUDFparams.length > 0) {
+ params = new TajoDataTypes.DataType[hiveUDFparams.length];
+ for (int i=0; i<hiveUDFparams.length; i++) {
+ params[i] = WritableTypeConverter.convertWritableToTajoType(hiveUDFparams[i]);
+ }
+ }
+ return params;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/0f4bbb60/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
index 65fa377..2f046a1 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.util.RackResolver;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.tajo.catalog.CatalogServer;
import org.apache.tajo.catalog.CatalogService;
+import org.apache.tajo.catalog.FunctionDesc;
import org.apache.tajo.catalog.LocalCatalogWrapper;
import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.catalog.proto.CatalogProtos.AlterTablespaceProto;
@@ -44,6 +45,7 @@ import org.apache.tajo.catalog.store.DerbyStore;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.conf.TajoConf.ConfVars;
import org.apache.tajo.engine.function.FunctionLoader;
+import org.apache.tajo.engine.function.hiveudf.HiveFunctionLoader;
import org.apache.tajo.exception.*;
import org.apache.tajo.io.AsyncTaskService;
import org.apache.tajo.master.rm.TajoResourceManager;
@@ -74,6 +76,7 @@ import java.lang.management.ManagementFactory;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
import java.net.InetSocketAddress;
+import java.util.*;
import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME;
import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME;
@@ -92,6 +95,7 @@ public class TajoMaster extends CompositeService {
@SuppressWarnings("OctalInteger")
final public static FsPermission SYSTEM_DIR_PERMISSION = FsPermission.createImmutable((short) 0755);
/** rw-r--r-- */
+ @SuppressWarnings("OctalInteger")
final public static FsPermission SYSTEM_RESOURCE_DIR_PERMISSION = FsPermission.createImmutable((short) 0755);
/** rw-r--r-- */
@SuppressWarnings("OctalInteger")
@@ -109,9 +113,6 @@ public class TajoMaster extends CompositeService {
private FileSystem defaultFS;
private Clock clock;
- private Path tajoRootPath;
- private Path wareHousePath;
-
private CatalogServer catalogServer;
private CatalogService catalog;
private GlobalEngine globalEngine;
@@ -177,7 +178,7 @@ public class TajoMaster extends CompositeService {
checkAndInitializeSystemDirectories();
diagnoseTajoMaster();
- catalogServer = new CatalogServer(TablespaceManager.getMetadataProviders(), FunctionLoader.loadFunctions(systemConf));
+ catalogServer = new CatalogServer(TablespaceManager.getMetadataProviders(), loadFunctions());
addIfService(catalogServer);
catalog = new LocalCatalogWrapper(catalogServer, systemConf);
@@ -215,12 +216,48 @@ public class TajoMaster extends CompositeService {
LOG.info("Tajo Master is initialized.");
}
+ private Collection<FunctionDesc> loadFunctions() throws IOException, AmbiguousFunctionException {
+ List<FunctionDesc> functionList = new ArrayList<>(FunctionLoader.loadBuiltinFunctions().values());
+
+ HashMap<Integer, FunctionDesc> funcSet = new HashMap<>();
+
+ for (FunctionDesc desc: functionList) {
+ funcSet.put(desc.hashCodeWithoutType(), desc);
+ }
+
+ checkUDFduplicateAndMerge(FunctionLoader.loadUserDefinedFunctions(systemConf).orElse(Collections.emptyList()), funcSet, functionList);
+ checkUDFduplicateAndMerge(HiveFunctionLoader.loadHiveUDFs(systemConf).orElse(Collections.emptyList()), funcSet, functionList);
+
+ return functionList;
+ }
+
+ /**
+ * Checks duplicates between pre-loaded functions and UDFs. And they are meged to funcList.
+ *
+ * @param udfs UDF list
+ * @param funcSet set for pre-loaded functions to match signature
+ * @param funcList list to be merged
+ * @throws AmbiguousFunctionException
+ */
+ private static void checkUDFduplicateAndMerge(List<FunctionDesc> udfs, HashMap<Integer, FunctionDesc> funcSet, List<FunctionDesc> funcList)
+ throws AmbiguousFunctionException {
+ for (FunctionDesc desc: udfs) {
+ // check
+ if (funcSet.containsKey(desc.hashCodeWithoutType())) {
+ throw new AmbiguousFunctionException(String.format("UDF %s", desc.toString()));
+ }
+
+ // merge
+ funcSet.put(desc.hashCodeWithoutType(), desc);
+ funcList.add(desc);
+ }
+ }
+
private void initSystemMetrics() {
systemMetrics = new TajoSystemMetrics(systemConf, Master.class, getMasterName());
systemMetrics.start();
systemMetrics.register(Master.Cluster.UPTIME, () -> context.getClusterUptime());
-
systemMetrics.register(Master.Cluster.class, new ClusterResourceMetricSet(context));
}
@@ -241,7 +278,7 @@ public class TajoMaster extends CompositeService {
private void checkAndInitializeSystemDirectories() throws IOException {
// Get Tajo root dir
- this.tajoRootPath = TajoConf.getTajoRootDir(systemConf);
+ Path tajoRootPath = TajoConf.getTajoRootDir(systemConf);
LOG.info("Tajo Root Directory: " + tajoRootPath);
// Check and Create Tajo root dir
@@ -266,7 +303,7 @@ public class TajoMaster extends CompositeService {
}
// Get Warehouse dir
- this.wareHousePath = TajoConf.getWarehouseDir(systemConf);
+ Path wareHousePath = TajoConf.getWarehouseDir(systemConf);
LOG.info("Tajo Warehouse dir: " + wareHousePath);
// Check and Create Warehouse dir
@@ -519,7 +556,7 @@ public class TajoMaster extends CompositeService {
}
}
- String getThreadTaskName(long id, String name) {
+ private String getThreadTaskName(long id, String name) {
if (name == null) {
return Long.toString(id);
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/0f4bbb60/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
index faab446..403e06b 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
@@ -34,6 +34,7 @@ import org.apache.tajo.catalog.CatalogClient;
import org.apache.tajo.catalog.CatalogService;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.engine.function.FunctionLoader;
+import org.apache.tajo.engine.function.hiveudf.HiveFunctionLoader;
import org.apache.tajo.master.cluster.WorkerConnectionInfo;
import org.apache.tajo.metrics.Node;
import org.apache.tajo.plan.function.python.PythonScriptEngine;
@@ -196,6 +197,7 @@ public class TajoWorker extends CompositeService {
historyReader = new HistoryReader(workerContext.getWorkerName(), this.systemConf);
FunctionLoader.loadUserDefinedFunctions(systemConf);
+ HiveFunctionLoader.loadHiveUDFs(systemConf);
PythonScriptEngine.initPythonScriptEngineFiles();
@@ -204,7 +206,6 @@ public class TajoWorker extends CompositeService {
private void initWorkerMetrics() {
workerSystemMetrics = new TajoSystemMetrics(systemConf, Node.class, workerContext.getWorkerName());
-
workerSystemMetrics.start();
workerSystemMetrics.register(Node.QueryMaster.RUNNING_QM,
http://git-wip-us.apache.org/repos/asf/tajo/blob/0f4bbb60/tajo-plan/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-plan/pom.xml b/tajo-plan/pom.xml
index 32a6df0..f115015 100644
--- a/tajo-plan/pom.xml
+++ b/tajo-plan/pom.xml
@@ -173,6 +173,10 @@
<artifactId>hadoop-common</artifactId>
</dependency>
<dependency>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-exec</artifactId>
+ </dependency>
+ <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
http://git-wip-us.apache.org/repos/asf/tajo/blob/0f4bbb60/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java b/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java
index 6495af3..39caec7 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java
@@ -460,20 +460,16 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
*/
private EvalExprNode buildPlanForNoneFromStatement(PlanContext context, Stack<Expr> stack, Projection projection)
throws TajoException {
- LogicalPlan plan = context.plan;
QueryBlock block = context.queryBlock;
- int finalTargetNum = projection.getNamedExprs().size();
List<Target> targets = new ArrayList<>();
- for (int i = 0; i < finalTargetNum; i++) {
- NamedExpr namedExpr = projection.getNamedExprs().get(i);
- EvalNode evalNode = exprAnnotator.createEvalNode(context, namedExpr.getExpr(), NameResolvingMode.RELS_ONLY);
- if (namedExpr.hasAlias()) {
- targets.add(new Target(evalNode, namedExpr.getAlias()));
- } else {
- targets.add(new Target(evalNode, context.plan.generateUniqueColumnName(namedExpr.getExpr())));
- }
+ for (NamedExpr namedExpr: projection.getNamedExprs()) {
+ Expr expr = namedExpr.getExpr();
+ EvalNode evalNode = exprAnnotator.createEvalNode(context, expr, NameResolvingMode.RELS_ONLY);
+
+ String alias = namedExpr.hasAlias() ? namedExpr.getAlias() : context.plan.generateUniqueColumnName(expr);
+ targets.add(new Target(evalNode, alias));
}
EvalExprNode evalExprNode = context.queryBlock.getNodeFromExpr(projection);
evalExprNode.setTargets(targets);
http://git-wip-us.apache.org/repos/asf/tajo/blob/0f4bbb60/tajo-plan/src/main/java/org/apache/tajo/plan/function/FunctionInvoke.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/function/FunctionInvoke.java b/tajo-plan/src/main/java/org/apache/tajo/plan/function/FunctionInvoke.java
index 8db90ec..9a08892 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/function/FunctionInvoke.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/function/FunctionInvoke.java
@@ -47,6 +47,8 @@ public abstract class FunctionInvoke implements Cloneable {
return new ClassBasedScalarFunctionInvoke(desc);
} else if (desc.getInvocation().hasPython()) {
return new PythonFunctionInvoke(desc);
+ } else if (desc.getInvocation().hasHiveUDF()) {
+ return new HiveFunctionInvoke(desc);
} else {
throw new TajoRuntimeException(new UnsupportedException("function invocation '" + desc.getInvocation() + "'"));
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/0f4bbb60/tajo-plan/src/main/java/org/apache/tajo/plan/function/GeneralFunction.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/function/GeneralFunction.java b/tajo-plan/src/main/java/org/apache/tajo/plan/function/GeneralFunction.java
index 9f20489..d1fb0e9 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/function/GeneralFunction.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/function/GeneralFunction.java
@@ -47,11 +47,6 @@ public abstract class GeneralFunction extends Function implements GsonObject {
public abstract Datum eval(Tuple params);
- public enum Type {
- AGG,
- GENERAL
- }
-
@Override
public String toJson() {
return CatalogGsonHelper.toJson(this, GeneralFunction.class);
http://git-wip-us.apache.org/repos/asf/tajo/blob/0f4bbb60/tajo-plan/src/main/java/org/apache/tajo/plan/function/HiveFunctionInvoke.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/function/HiveFunctionInvoke.java b/tajo-plan/src/main/java/org/apache/tajo/plan/function/HiveFunctionInvoke.java
new file mode 100644
index 0000000..d254499
--- /dev/null
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/function/HiveFunctionInvoke.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.plan.function;
+
+import org.apache.hadoop.io.*;
+import org.apache.tajo.catalog.FunctionDesc;
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.exception.*;
+import org.apache.tajo.function.UDFInvocationDesc;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.plan.util.WritableTypeConverter;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.Arrays;
+
+public class HiveFunctionInvoke extends FunctionInvoke implements Cloneable {
+ private Object instance = null;
+ private Method evalMethod = null;
+ private Writable [] params;
+
+ public HiveFunctionInvoke(FunctionDesc desc) {
+ super(desc);
+ params = new Writable[desc.getParamTypes().length];
+ }
+
+ @Override
+ public void init(FunctionInvokeContext context) throws IOException {
+ UDFInvocationDesc udfDesc = functionDesc.getInvocation().getUDF();
+
+ URL [] urls = new URL [] { new URL(udfDesc.getPath()) };
+ URLClassLoader loader = new URLClassLoader(urls);
+
+ try {
+ Class<?> udfclass = loader.loadClass(udfDesc.getName());
+ evalMethod = getEvaluateMethod(functionDesc.getParamTypes(), udfclass);
+ } catch (ClassNotFoundException e) {
+ throw new TajoInternalError(e);
+ }
+ }
+
+ private Method getEvaluateMethod(TajoDataTypes.DataType [] tajoParamTypes, Class<?> clazz) {
+ Constructor constructor = clazz.getConstructors()[0];
+
+ try {
+ instance = constructor.newInstance();
+ } catch (InstantiationException|IllegalAccessException|InvocationTargetException e) {
+ throw new TajoInternalError(e);
+ }
+
+ for (Method m: clazz.getMethods()) {
+ if (m.getName().equals("evaluate")) {
+ Class [] methodParamTypes = m.getParameterTypes();
+ if (checkParamTypes(methodParamTypes, tajoParamTypes)) {
+ return m;
+ }
+ }
+ }
+
+ throw new TajoInternalError(new UndefinedFunctionException(String.format("Hive UDF (%s)", clazz.getSimpleName())));
+ }
+
+ private boolean checkParamTypes(Class [] writableParams, TajoDataTypes.DataType [] tajoParams) {
+ int i = 0;
+
+ if (writableParams.length != tajoParams.length) {
+ return false;
+ }
+
+ for (Class writable: writableParams) {
+ try {
+ if (!WritableTypeConverter.convertWritableToTajoType(writable).equals(tajoParams[i++])) {
+ return false;
+ }
+ } catch (UnsupportedDataTypeException e) {
+ throw new TajoRuntimeException(e);
+ }
+ }
+
+ return true;
+ }
+
+ @Override
+ public Datum eval(Tuple tuple) {
+ Datum resultDatum;
+
+ for (int i=0; i<tuple.size(); i++) {
+ params[i] = WritableTypeConverter.convertDatum2Writable(tuple.asDatum(i));
+ }
+
+ try {
+ Writable result = (Writable)evalMethod.invoke(instance, params);
+ resultDatum = WritableTypeConverter.convertWritable2Datum(result);
+ } catch (Exception e) {
+ throw new TajoInternalError(e);
+ }
+
+ return resultDatum;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/0f4bbb60/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/PythonScriptEngine.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/PythonScriptEngine.java b/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/PythonScriptEngine.java
index dcaac0c..9db792c 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/PythonScriptEngine.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/PythonScriptEngine.java
@@ -33,7 +33,7 @@ import org.apache.tajo.datum.Datum;
import org.apache.tajo.function.FunctionInvocation;
import org.apache.tajo.function.FunctionSignature;
import org.apache.tajo.function.FunctionSupplement;
-import org.apache.tajo.function.PythonInvocationDesc;
+import org.apache.tajo.function.UDFInvocationDesc;
import org.apache.tajo.plan.function.FunctionContext;
import org.apache.tajo.plan.function.PythonAggFunctionInvoke.PythonAggFunctionContext;
import org.apache.tajo.plan.function.stream.*;
@@ -92,8 +92,8 @@ public class PythonScriptEngine extends TajoScriptEngine {
TajoDataTypes.DataType returnType = getReturnTypes(scalarFuncInfo)[0];
signature = new FunctionSignature(CatalogProtos.FunctionType.UDF, scalarFuncInfo.funcName,
returnType, createParamTypes(scalarFuncInfo.paramNum));
- PythonInvocationDesc invocationDesc = new PythonInvocationDesc(scalarFuncInfo.funcName, path.getPath(), true);
- invocation.setPython(invocationDesc);
+ UDFInvocationDesc invocationDesc = new UDFInvocationDesc(CatalogProtos.UDFtype.PYTHON, scalarFuncInfo.funcName, path.getPath(), true);
+ invocation.setUDF(invocationDesc);
functionDescs.add(new FunctionDesc(signature, invocation, supplement));
} else {
AggFuncInfo aggFuncInfo = (AggFuncInfo) funcInfo;
@@ -101,9 +101,9 @@ public class PythonScriptEngine extends TajoScriptEngine {
TajoDataTypes.DataType returnType = getReturnTypes(aggFuncInfo.getFinalResultInfo)[0];
signature = new FunctionSignature(CatalogProtos.FunctionType.UDA, aggFuncInfo.funcName,
returnType, createParamTypes(aggFuncInfo.evalInfo.paramNum));
- PythonInvocationDesc invocationDesc = new PythonInvocationDesc(aggFuncInfo.className, path.getPath(), false);
+ UDFInvocationDesc invocationDesc = new UDFInvocationDesc(CatalogProtos.UDFtype.PYTHON, aggFuncInfo.className, path.getPath(), false);
- invocation.setPythonAggregation(invocationDesc);
+ invocation.setUDF(invocationDesc);
functionDescs.add(new FunctionDesc(signature, invocation, supplement));
}
}
@@ -283,7 +283,7 @@ public class PythonScriptEngine extends TajoScriptEngine {
private InputStream stderr; // stderr of the process
private final FunctionSignature functionSignature;
- private final PythonInvocationDesc invocationDesc;
+ private final UDFInvocationDesc invocationDesc;
private Schema inSchema;
private Schema outSchema;
private int[] projectionCols;
@@ -299,7 +299,7 @@ public class PythonScriptEngine extends TajoScriptEngine {
throw new IllegalStateException("Function type must be 'python'");
}
functionSignature = functionDesc.getSignature();
- invocationDesc = functionDesc.getInvocation().getPython();
+ invocationDesc = functionDesc.getInvocation().getUDF();
setSchema();
}
@@ -308,7 +308,7 @@ public class PythonScriptEngine extends TajoScriptEngine {
throw new IllegalStateException("Function type must be 'python'");
}
functionSignature = functionDesc.getSignature();
- invocationDesc = functionDesc.getInvocation().getPython();
+ invocationDesc = functionDesc.getInvocation().getUDF();
this.firstPhase = firstPhase;
this.lastPhase = lastPhase;
setSchema();