You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2020/11/09 14:00:39 UTC
[nifi] branch main updated: NIFI-7954 Wrapping
HBase_*_ClientService calls in getUgi().doAs() (#4629)
This is an automated email from the ASF dual-hosted git repository.
bbende pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 940bc30 NIFI-7954 Wrapping HBase_*_ClientService calls in getUgi().doAs() (#4629)
940bc30 is described below
commit 940bc3056cc59fc54e59c50084d942d667f95a46
Author: tpalfy <53...@users.noreply.github.com>
AuthorDate: Mon Nov 9 15:00:20 2020 +0100
NIFI-7954 Wrapping HBase_*_ClientService calls in getUgi().doAs() (#4629)
* NIFI-7954 Wrapping HBase_*_ClientService calls in getUgi().doAs() and taking care of TGT renewal.
* NIFI-7954 Simplified SecurityUtil.callWithUgi a little.
* NIFI-7954 Simplified SecurityUtil.callWithUgi more.
* NIFI-7954 Removed unnecessary code.
---
.../java/org/apache/nifi/hadoop/SecurityUtil.java | 26 +++
.../nifi/hbase/HBase_1_1_2_ClientService.java | 246 ++++++++++----------
.../apache/nifi/hbase/MockHBaseClientService.java | 23 ++
.../apache/nifi/hbase/HBase_2_ClientService.java | 248 +++++++++++----------
.../apache/nifi/hbase/MockHBaseClientService.java | 35 ++-
5 files changed, 333 insertions(+), 245 deletions(-)
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/SecurityUtil.java b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/SecurityUtil.java
index a080034..85c0829 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/SecurityUtil.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/SecurityUtil.java
@@ -19,6 +19,8 @@ package org.apache.nifi.hadoop;
import org.apache.commons.lang3.Validate;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.security.krb.KerberosUser;
import javax.security.auth.Subject;
@@ -146,4 +148,28 @@ public class SecurityUtil {
Validate.notNull(config);
return KERBEROS.equalsIgnoreCase(config.get(HADOOP_SECURITY_AUTHENTICATION));
}
+
+ public static <T> T callWithUgi(UserGroupInformation ugi, PrivilegedExceptionAction<T> action) throws IOException {
+ try {
+ return ugi.doAs(action);
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+ }
+
+ public static void checkTGTAndRelogin(ComponentLog log, KerberosUser kerberosUser) {
+ log.trace("getting UGI instance");
+ if (kerberosUser != null) {
+ // if there's a KerberosUser associated with this UGI, check the TGT and relogin if it is close to expiring
+ log.debug("kerberosUser is " + kerberosUser);
+ try {
+ log.debug("checking TGT on kerberosUser " + kerberosUser);
+ kerberosUser.checkTGTAndRelogin();
+ } catch (LoginException e) {
+ throw new ProcessException("Unable to relogin with kerberos credentials for " + kerberosUser.getPrincipal(), e);
+ }
+ } else {
+ log.debug("kerberosUser was null, will not refresh TGT with KerberosUser");
+ }
+ }
}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java
index f0c4d7e..6e46c3e 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java
@@ -62,7 +62,6 @@ import org.apache.nifi.hbase.scan.ResultCell;
import org.apache.nifi.hbase.scan.ResultHandler;
import org.apache.nifi.hbase.validate.ConfigFilesValidator;
import org.apache.nifi.kerberos.KerberosCredentialsService;
-import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.security.krb.KerberosKeytabUser;
@@ -71,7 +70,6 @@ import org.apache.nifi.security.krb.KerberosUser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.security.auth.login.LoginException;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
@@ -461,47 +459,55 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme
@Override
public void put(final String tableName, final Collection<PutFlowFile> puts) throws IOException {
- try (final Table table = connection.getTable(TableName.valueOf(tableName))) {
- // Create one Put per row....
- final Map<String, List<PutColumn>> sorted = new HashMap<>();
- final List<Put> newPuts = new ArrayList<>();
-
- for (final PutFlowFile putFlowFile : puts) {
- final String rowKeyString = new String(putFlowFile.getRow(), StandardCharsets.UTF_8);
- List<PutColumn> columns = sorted.get(rowKeyString);
- if (columns == null) {
- columns = new ArrayList<>();
- sorted.put(rowKeyString, columns);
+ SecurityUtil.callWithUgi(getUgi(), () -> {
+ try (final Table table = connection.getTable(TableName.valueOf(tableName))) {
+ // Create one Put per row....
+ final Map<String, List<PutColumn>> sorted = new HashMap<>();
+ final List<Put> newPuts = new ArrayList<>();
+
+ for (final PutFlowFile putFlowFile : puts) {
+ final String rowKeyString = new String(putFlowFile.getRow(), StandardCharsets.UTF_8);
+ List<PutColumn> columns = sorted.get(rowKeyString);
+ if (columns == null) {
+ columns = new ArrayList<>();
+ sorted.put(rowKeyString, columns);
+ }
+
+ columns.addAll(putFlowFile.getColumns());
}
- columns.addAll(putFlowFile.getColumns());
- }
+ for (final Map.Entry<String, List<PutColumn>> entry : sorted.entrySet()) {
+ newPuts.addAll(buildPuts(entry.getKey().getBytes(StandardCharsets.UTF_8), entry.getValue()));
+ }
- for (final Map.Entry<String, List<PutColumn>> entry : sorted.entrySet()) {
- newPuts.addAll(buildPuts(entry.getKey().getBytes(StandardCharsets.UTF_8), entry.getValue()));
+ table.put(newPuts);
}
-
- table.put(newPuts);
- }
+ return null;
+ });
}
@Override
public void put(final String tableName, final byte[] rowId, final Collection<PutColumn> columns) throws IOException {
- try (final Table table = connection.getTable(TableName.valueOf(tableName))) {
- table.put(buildPuts(rowId, new ArrayList(columns)));
- }
+ SecurityUtil.callWithUgi(getUgi(), () -> {
+ try (final Table table = connection.getTable(TableName.valueOf(tableName))) {
+ table.put(buildPuts(rowId, new ArrayList(columns)));
+ }
+ return null;
+ });
}
@Override
public boolean checkAndPut(final String tableName, final byte[] rowId, final byte[] family, final byte[] qualifier, final byte[] value, final PutColumn column) throws IOException {
- try (final Table table = connection.getTable(TableName.valueOf(tableName))) {
- Put put = new Put(rowId);
- put.addColumn(
- column.getColumnFamily(),
- column.getColumnQualifier(),
- column.getBuffer());
- return table.checkAndPut(rowId, family, qualifier, value, put);
- }
+ return SecurityUtil.callWithUgi(getUgi(), () -> {
+ try (final Table table = connection.getTable(TableName.valueOf(tableName))) {
+ Put put = new Put(rowId);
+ put.addColumn(
+ column.getColumnFamily(),
+ column.getColumnQualifier(),
+ column.getBuffer());
+ return table.checkAndPut(rowId, family, qualifier, value, put);
+ }
+ });
}
@Override
@@ -511,13 +517,16 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme
@Override
public void delete(String tableName, byte[] rowId, String visibilityLabel) throws IOException {
- try (final Table table = connection.getTable(TableName.valueOf(tableName))) {
- Delete delete = new Delete(rowId);
- if (!StringUtils.isEmpty(visibilityLabel)) {
- delete.setCellVisibility(new CellVisibility(visibilityLabel));
+ SecurityUtil.callWithUgi(getUgi(), () -> {
+ try (final Table table = connection.getTable(TableName.valueOf(tableName))) {
+ Delete delete = new Delete(rowId);
+ if (!StringUtils.isEmpty(visibilityLabel)) {
+ delete.setCellVisibility(new CellVisibility(visibilityLabel));
+ }
+ table.delete(delete);
}
- table.delete(delete);
- }
+ return null;
+ });
}
@Override
@@ -554,9 +563,12 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme
}
private void batchDelete(String tableName, List<Delete> deletes) throws IOException {
- try (final Table table = connection.getTable(TableName.valueOf(tableName))) {
- table.delete(deletes);
- }
+ SecurityUtil.callWithUgi(getUgi(), () -> {
+ try (final Table table = connection.getTable(TableName.valueOf(tableName))) {
+ table.delete(deletes);
+ }
+ return null;
+ });
}
@Override
@@ -567,64 +579,70 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme
@Override
public void scan(String tableName, Collection<Column> columns, String filterExpression, long minTime, List<String> visibilityLabels, ResultHandler handler) throws IOException {
- Filter filter = null;
- if (!StringUtils.isBlank(filterExpression)) {
- ParseFilter parseFilter = new ParseFilter();
- filter = parseFilter.parseFilterString(filterExpression);
- }
+ SecurityUtil.callWithUgi(getUgi(), () -> {
+ Filter filter = null;
+ if (!StringUtils.isBlank(filterExpression)) {
+ ParseFilter parseFilter = new ParseFilter();
+ filter = parseFilter.parseFilterString(filterExpression);
+ }
- try (final Table table = connection.getTable(TableName.valueOf(tableName));
- final ResultScanner scanner = getResults(table, columns, filter, minTime, visibilityLabels)) {
+ try (final Table table = connection.getTable(TableName.valueOf(tableName));
+ final ResultScanner scanner = getResults(table, columns, filter, minTime, visibilityLabels)) {
- for (final Result result : scanner) {
- final byte[] rowKey = result.getRow();
- final Cell[] cells = result.rawCells();
+ for (final Result result : scanner) {
+ final byte[] rowKey = result.getRow();
+ final Cell[] cells = result.rawCells();
- if (cells == null) {
- continue;
- }
+ if (cells == null) {
+ continue;
+ }
- // convert HBase cells to NiFi cells
- final ResultCell[] resultCells = new ResultCell[cells.length];
- for (int i=0; i < cells.length; i++) {
- final Cell cell = cells[i];
- final ResultCell resultCell = getResultCell(cell);
- resultCells[i] = resultCell;
- }
+ // convert HBase cells to NiFi cells
+ final ResultCell[] resultCells = new ResultCell[cells.length];
+ for (int i = 0; i < cells.length; i++) {
+ final Cell cell = cells[i];
+ final ResultCell resultCell = getResultCell(cell);
+ resultCells[i] = resultCell;
+ }
- // delegate to the handler
- handler.handle(rowKey, resultCells);
+ // delegate to the handler
+ handler.handle(rowKey, resultCells);
+ }
}
- }
+ return null;
+ });
}
@Override
public void scan(final String tableName, final byte[] startRow, final byte[] endRow, final Collection<Column> columns, List<String> authorizations, final ResultHandler handler)
throws IOException {
- try (final Table table = connection.getTable(TableName.valueOf(tableName));
- final ResultScanner scanner = getResults(table, startRow, endRow, columns, authorizations)) {
+ SecurityUtil.callWithUgi(getUgi(), () -> {
+ try (final Table table = connection.getTable(TableName.valueOf(tableName));
+ final ResultScanner scanner = getResults(table, startRow, endRow, columns, authorizations)) {
- for (final Result result : scanner) {
- final byte[] rowKey = result.getRow();
- final Cell[] cells = result.rawCells();
+ for (final Result result : scanner) {
+ final byte[] rowKey = result.getRow();
+ final Cell[] cells = result.rawCells();
- if (cells == null) {
- continue;
- }
+ if (cells == null) {
+ continue;
+ }
- // convert HBase cells to NiFi cells
- final ResultCell[] resultCells = new ResultCell[cells.length];
- for (int i=0; i < cells.length; i++) {
- final Cell cell = cells[i];
- final ResultCell resultCell = getResultCell(cell);
- resultCells[i] = resultCell;
- }
+ // convert HBase cells to NiFi cells
+ final ResultCell[] resultCells = new ResultCell[cells.length];
+ for (int i = 0; i < cells.length; i++) {
+ final Cell cell = cells[i];
+ final ResultCell resultCell = getResultCell(cell);
+ resultCells[i] = resultCell;
+ }
- // delegate to the handler
- handler.handle(rowKey, resultCells);
+ // delegate to the handler
+ handler.handle(rowKey, resultCells);
+ }
}
- }
+ return null;
+ });
}
@Override
@@ -632,37 +650,40 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme
final Long timerangeMin, final Long timerangeMax, final Integer limitRows, final Boolean isReversed,
final Boolean blockCache, final Collection<Column> columns, List<String> visibilityLabels, final ResultHandler handler) throws IOException {
- try (final Table table = connection.getTable(TableName.valueOf(tableName));
- final ResultScanner scanner = getResults(table, startRow, endRow, filterExpression, timerangeMin,
- timerangeMax, limitRows, isReversed, blockCache, columns, visibilityLabels)) {
+ SecurityUtil.callWithUgi(getUgi(), () -> {
+ try (final Table table = connection.getTable(TableName.valueOf(tableName));
+ final ResultScanner scanner = getResults(table, startRow, endRow, filterExpression, timerangeMin,
+ timerangeMax, limitRows, isReversed, blockCache, columns, visibilityLabels)) {
- int cnt = 0;
- final int lim = limitRows != null ? limitRows : 0;
- for (final Result result : scanner) {
+ int cnt = 0;
+ final int lim = limitRows != null ? limitRows : 0;
+ for (final Result result : scanner) {
- if (lim > 0 && ++cnt > lim){
- break;
- }
+ if (lim > 0 && ++cnt > lim) {
+ break;
+ }
- final byte[] rowKey = result.getRow();
- final Cell[] cells = result.rawCells();
+ final byte[] rowKey = result.getRow();
+ final Cell[] cells = result.rawCells();
- if (cells == null) {
- continue;
- }
+ if (cells == null) {
+ continue;
+ }
- // convert HBase cells to NiFi cells
- final ResultCell[] resultCells = new ResultCell[cells.length];
- for (int i = 0; i < cells.length; i++) {
- final Cell cell = cells[i];
- final ResultCell resultCell = getResultCell(cell);
- resultCells[i] = resultCell;
- }
+ // convert HBase cells to NiFi cells
+ final ResultCell[] resultCells = new ResultCell[cells.length];
+ for (int i = 0; i < cells.length; i++) {
+ final Cell cell = cells[i];
+ final ResultCell resultCell = getResultCell(cell);
+ resultCells[i] = resultCell;
+ }
- // delegate to the handler
- handler.handle(rowKey, resultCells);
+ // delegate to the handler
+ handler.handle(rowKey, resultCells);
+ }
}
- }
+ return null;
+ });
}
//
@@ -868,20 +889,7 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme
}
UserGroupInformation getUgi() {
- getLogger().trace("getting UGI instance");
- if (kerberosUserReference.get() != null) {
- // if there's a KerberosUser associated with this UGI, check the TGT and relogin if it is close to expiring
- KerberosUser kerberosUser = kerberosUserReference.get();
- getLogger().debug("kerberosUser is " + kerberosUser);
- try {
- getLogger().debug("checking TGT on kerberosUser [{}]", new Object[] {kerberosUser});
- kerberosUser.checkTGTAndRelogin();
- } catch (LoginException e) {
- throw new ProcessException("Unable to relogin with kerberos credentials for " + kerberosUser.getPrincipal(), e);
- }
- } else {
- getLogger().debug("kerberosUser was null, will not refresh TGT with KerberosUser");
- }
+ SecurityUtil.checkTGTAndRelogin(getLogger(), kerberosUserReference.get());
return ugi;
}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java
index b8327c5..59f88a8 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.hadoop.KerberosProperties;
import org.apache.nifi.hbase.put.PutColumn;
@@ -33,6 +34,7 @@ import org.mockito.Mockito;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
+import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -40,6 +42,9 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
/**
@@ -52,6 +57,19 @@ public class MockHBaseClientService extends HBase_1_1_2_ClientService {
private Map<String, Result> results = new HashMap<>();
private KerberosProperties kerberosProperties;
private boolean allowExplicitKeytab;
+ private UserGroupInformation mockUgi;
+
+ {
+ mockUgi = mock(UserGroupInformation.class);
+ try {
+ doAnswer(invocation -> {
+ PrivilegedExceptionAction<?> action = invocation.getArgument(0);
+ return action.run();
+ }).when(mockUgi).doAs(any(PrivilegedExceptionAction.class));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
public MockHBaseClientService(final Table table, final String family, final KerberosProperties kerberosProperties) {
this(table, family, kerberosProperties, false);
@@ -209,4 +227,9 @@ public class MockHBaseClientService extends HBase_1_1_2_ClientService {
boolean isAllowExplicitKeytab() {
return allowExplicitKeytab;
}
+
+ @Override
+ UserGroupInformation getUgi() {
+ return mockUgi;
+ }
}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_2_ClientService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_2_ClientService.java
index eaad057..387aa1c 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_2_ClientService.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_2_ClientService.java
@@ -62,7 +62,6 @@ import org.apache.nifi.hbase.scan.ResultCell;
import org.apache.nifi.hbase.scan.ResultHandler;
import org.apache.nifi.hbase.validate.ConfigFilesValidator;
import org.apache.nifi.kerberos.KerberosCredentialsService;
-import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.security.krb.KerberosKeytabUser;
@@ -71,7 +70,6 @@ import org.apache.nifi.security.krb.KerberosUser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.security.auth.login.LoginException;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
@@ -460,47 +458,56 @@ public class HBase_2_ClientService extends AbstractControllerService implements
@Override
public void put(final String tableName, final Collection<PutFlowFile> puts) throws IOException {
- try (final Table table = connection.getTable(TableName.valueOf(tableName))) {
- // Create one Put per row....
- final Map<String, List<PutColumn>> sorted = new HashMap<>();
- final List<Put> newPuts = new ArrayList<>();
-
- for (final PutFlowFile putFlowFile : puts) {
- final String rowKeyString = new String(putFlowFile.getRow(), StandardCharsets.UTF_8);
- List<PutColumn> columns = sorted.get(rowKeyString);
- if (columns == null) {
- columns = new ArrayList<>();
- sorted.put(rowKeyString, columns);
+ SecurityUtil.callWithUgi(getUgi(), () -> {
+ try (final Table table = connection.getTable(TableName.valueOf(tableName))) {
+ // Create one Put per row....
+ final Map<String, List<PutColumn>> sorted = new HashMap<>();
+ final List<Put> newPuts = new ArrayList<>();
+
+ for (final PutFlowFile putFlowFile : puts) {
+ final String rowKeyString = new String(putFlowFile.getRow(), StandardCharsets.UTF_8);
+ List<PutColumn> columns = sorted.get(rowKeyString);
+ if (columns == null) {
+ columns = new ArrayList<>();
+ sorted.put(rowKeyString, columns);
+ }
+
+ columns.addAll(putFlowFile.getColumns());
}
- columns.addAll(putFlowFile.getColumns());
- }
+ for (final Map.Entry<String, List<PutColumn>> entry : sorted.entrySet()) {
+ newPuts.addAll(buildPuts(entry.getKey().getBytes(StandardCharsets.UTF_8), entry.getValue()));
+ }
- for (final Map.Entry<String, List<PutColumn>> entry : sorted.entrySet()) {
- newPuts.addAll(buildPuts(entry.getKey().getBytes(StandardCharsets.UTF_8), entry.getValue()));
+ table.put(newPuts);
}
- table.put(newPuts);
- }
+ return null;
+ });
}
@Override
public void put(final String tableName, final byte[] rowId, final Collection<PutColumn> columns) throws IOException {
- try (final Table table = connection.getTable(TableName.valueOf(tableName))) {
- table.put(buildPuts(rowId, new ArrayList(columns)));
- }
+ SecurityUtil.callWithUgi(getUgi(), () -> {
+ try (final Table table = connection.getTable(TableName.valueOf(tableName))) {
+ table.put(buildPuts(rowId, new ArrayList(columns)));
+ }
+ return null;
+ });
}
@Override
public boolean checkAndPut(final String tableName, final byte[] rowId, final byte[] family, final byte[] qualifier, final byte[] value, final PutColumn column) throws IOException {
- try (final Table table = connection.getTable(TableName.valueOf(tableName))) {
- Put put = new Put(rowId);
- put.addColumn(
- column.getColumnFamily(),
- column.getColumnQualifier(),
- column.getBuffer());
- return table.checkAndPut(rowId, family, qualifier, value, put);
- }
+ return SecurityUtil.callWithUgi(getUgi(), () -> {
+ try (final Table table = connection.getTable(TableName.valueOf(tableName))) {
+ Put put = new Put(rowId);
+ put.addColumn(
+ column.getColumnFamily(),
+ column.getColumnQualifier(),
+ column.getBuffer());
+ return table.checkAndPut(rowId, family, qualifier, value, put);
+ }
+ });
}
@Override
@@ -510,13 +517,16 @@ public class HBase_2_ClientService extends AbstractControllerService implements
@Override
public void delete(String tableName, byte[] rowId, String visibilityLabel) throws IOException {
- try (final Table table = connection.getTable(TableName.valueOf(tableName))) {
- Delete delete = new Delete(rowId);
- if (!StringUtils.isEmpty(visibilityLabel)) {
- delete.setCellVisibility(new CellVisibility(visibilityLabel));
+ SecurityUtil.callWithUgi(getUgi(), () -> {
+ try (final Table table = connection.getTable(TableName.valueOf(tableName))) {
+ Delete delete = new Delete(rowId);
+ if (!StringUtils.isEmpty(visibilityLabel)) {
+ delete.setCellVisibility(new CellVisibility(visibilityLabel));
+ }
+ table.delete(delete);
}
- table.delete(delete);
- }
+ return null;
+ });
}
@Override
@@ -553,9 +563,12 @@ public class HBase_2_ClientService extends AbstractControllerService implements
}
private void batchDelete(String tableName, List<Delete> deletes) throws IOException {
- try (final Table table = connection.getTable(TableName.valueOf(tableName))) {
- table.delete(deletes);
- }
+ SecurityUtil.callWithUgi(getUgi(), () -> {
+ try (final Table table = connection.getTable(TableName.valueOf(tableName))) {
+ table.delete(deletes);
+ }
+ return null;
+ });
}
@Override
@@ -566,64 +579,70 @@ public class HBase_2_ClientService extends AbstractControllerService implements
@Override
public void scan(String tableName, Collection<Column> columns, String filterExpression, long minTime, List<String> visibilityLabels, ResultHandler handler) throws IOException {
- Filter filter = null;
- if (!StringUtils.isBlank(filterExpression)) {
- ParseFilter parseFilter = new ParseFilter();
- filter = parseFilter.parseFilterString(filterExpression);
- }
+ SecurityUtil.callWithUgi(getUgi(), () -> {
+ Filter filter = null;
+ if (!StringUtils.isBlank(filterExpression)) {
+ ParseFilter parseFilter = new ParseFilter();
+ filter = parseFilter.parseFilterString(filterExpression);
+ }
- try (final Table table = connection.getTable(TableName.valueOf(tableName));
- final ResultScanner scanner = getResults(table, columns, filter, minTime, visibilityLabels)) {
+ try (final Table table = connection.getTable(TableName.valueOf(tableName));
+ final ResultScanner scanner = getResults(table, columns, filter, minTime, visibilityLabels)) {
- for (final Result result : scanner) {
- final byte[] rowKey = result.getRow();
- final Cell[] cells = result.rawCells();
+ for (final Result result : scanner) {
+ final byte[] rowKey = result.getRow();
+ final Cell[] cells = result.rawCells();
- if (cells == null) {
- continue;
- }
+ if (cells == null) {
+ continue;
+ }
- // convert HBase cells to NiFi cells
- final ResultCell[] resultCells = new ResultCell[cells.length];
- for (int i=0; i < cells.length; i++) {
- final Cell cell = cells[i];
- final ResultCell resultCell = getResultCell(cell);
- resultCells[i] = resultCell;
- }
+ // convert HBase cells to NiFi cells
+ final ResultCell[] resultCells = new ResultCell[cells.length];
+ for (int i = 0; i < cells.length; i++) {
+ final Cell cell = cells[i];
+ final ResultCell resultCell = getResultCell(cell);
+ resultCells[i] = resultCell;
+ }
- // delegate to the handler
- handler.handle(rowKey, resultCells);
+ // delegate to the handler
+ handler.handle(rowKey, resultCells);
+ }
}
- }
+ return null;
+ });
}
@Override
public void scan(final String tableName, final byte[] startRow, final byte[] endRow, final Collection<Column> columns, List<String> authorizations, final ResultHandler handler)
throws IOException {
- try (final Table table = connection.getTable(TableName.valueOf(tableName));
- final ResultScanner scanner = getResults(table, startRow, endRow, columns, authorizations)) {
+ SecurityUtil.callWithUgi(getUgi(), () -> {
+ try (final Table table = connection.getTable(TableName.valueOf(tableName));
+ final ResultScanner scanner = getResults(table, startRow, endRow, columns, authorizations)) {
- for (final Result result : scanner) {
- final byte[] rowKey = result.getRow();
- final Cell[] cells = result.rawCells();
+ for (final Result result : scanner) {
+ final byte[] rowKey = result.getRow();
+ final Cell[] cells = result.rawCells();
- if (cells == null) {
- continue;
- }
+ if (cells == null) {
+ continue;
+ }
- // convert HBase cells to NiFi cells
- final ResultCell[] resultCells = new ResultCell[cells.length];
- for (int i=0; i < cells.length; i++) {
- final Cell cell = cells[i];
- final ResultCell resultCell = getResultCell(cell);
- resultCells[i] = resultCell;
- }
+ // convert HBase cells to NiFi cells
+ final ResultCell[] resultCells = new ResultCell[cells.length];
+ for (int i = 0; i < cells.length; i++) {
+ final Cell cell = cells[i];
+ final ResultCell resultCell = getResultCell(cell);
+ resultCells[i] = resultCell;
+ }
- // delegate to the handler
- handler.handle(rowKey, resultCells);
+ // delegate to the handler
+ handler.handle(rowKey, resultCells);
+ }
}
- }
+ return null;
+ });
}
@Override
@@ -631,37 +650,40 @@ public class HBase_2_ClientService extends AbstractControllerService implements
final Long timerangeMin, final Long timerangeMax, final Integer limitRows, final Boolean isReversed,
final Boolean blockCache, final Collection<Column> columns, List<String> visibilityLabels, final ResultHandler handler) throws IOException {
- try (final Table table = connection.getTable(TableName.valueOf(tableName));
- final ResultScanner scanner = getResults(table, startRow, endRow, filterExpression, timerangeMin,
- timerangeMax, limitRows, isReversed, blockCache, columns, visibilityLabels)) {
+ SecurityUtil.callWithUgi(getUgi(), () -> {
+ try (final Table table = connection.getTable(TableName.valueOf(tableName));
+ final ResultScanner scanner = getResults(table, startRow, endRow, filterExpression, timerangeMin,
+ timerangeMax, limitRows, isReversed, blockCache, columns, visibilityLabels)) {
- int cnt = 0;
- final int lim = limitRows != null ? limitRows : 0;
- for (final Result result : scanner) {
+ int cnt = 0;
+ final int lim = limitRows != null ? limitRows : 0;
+ for (final Result result : scanner) {
- if (lim > 0 && ++cnt > lim){
- break;
- }
+ if (lim > 0 && ++cnt > lim) {
+ break;
+ }
- final byte[] rowKey = result.getRow();
- final Cell[] cells = result.rawCells();
+ final byte[] rowKey = result.getRow();
+ final Cell[] cells = result.rawCells();
- if (cells == null) {
- continue;
- }
+ if (cells == null) {
+ continue;
+ }
- // convert HBase cells to NiFi cells
- final ResultCell[] resultCells = new ResultCell[cells.length];
- for (int i = 0; i < cells.length; i++) {
- final Cell cell = cells[i];
- final ResultCell resultCell = getResultCell(cell);
- resultCells[i] = resultCell;
- }
+ // convert HBase cells to NiFi cells
+ final ResultCell[] resultCells = new ResultCell[cells.length];
+ for (int i = 0; i < cells.length; i++) {
+ final Cell cell = cells[i];
+ final ResultCell resultCell = getResultCell(cell);
+ resultCells[i] = resultCell;
+ }
- // delegate to the handler
- handler.handle(rowKey, resultCells);
+ // delegate to the handler
+ handler.handle(rowKey, resultCells);
+ }
}
- }
+ return null;
+ });
}
//
@@ -866,22 +888,8 @@ public class HBase_2_ClientService extends AbstractControllerService implements
return Boolean.parseBoolean(System.getenv(ALLOW_EXPLICIT_KEYTAB));
}
- UserGroupInformation getUgi() {
- getLogger().trace("getting UGI instance");
- if (kerberosUserReference.get() != null) {
- // if there's a KerberosUser associated with this UGI, check the TGT and relogin if it is close to expiring
- KerberosUser kerberosUser = kerberosUserReference.get();
- getLogger().debug("kerberosUser is " + kerberosUser);
- try {
- getLogger().debug("checking TGT on kerberosUser [{}]", new Object[] {kerberosUser});
- kerberosUser.checkTGTAndRelogin();
- } catch (LoginException e) {
- throw new ProcessException("Unable to relogin with kerberos credentials for " + kerberosUser.getPrincipal(), e);
- }
- } else {
- getLogger().debug("kerberosUser was null, will not refresh TGT with KerberosUser");
- }
+ UserGroupInformation getUgi() throws IOException {
+ SecurityUtil.checkTGTAndRelogin(getLogger(), kerberosUserReference.get());
return ugi;
}
-
}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java
index b4d75fa..b1be813 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.hadoop.KerberosProperties;
import org.apache.nifi.hbase.put.PutColumn;
@@ -33,6 +34,7 @@ import org.mockito.Mockito;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
+import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -40,6 +42,9 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
/**
@@ -52,6 +57,19 @@ public class MockHBaseClientService extends HBase_2_ClientService {
private Map<String, Result> results = new HashMap<>();
private KerberosProperties kerberosProperties;
private boolean allowExplicitKeytab;
+ private UserGroupInformation mockUgi;
+
+ {
+ mockUgi = mock(UserGroupInformation.class);
+ try {
+ doAnswer(invocation -> {
+ PrivilegedExceptionAction<?> action = invocation.getArgument(0);
+ return action.run();
+ }).when(mockUgi).doAs(any(PrivilegedExceptionAction.class));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
public MockHBaseClientService(final Table table, final String family, final KerberosProperties kerberosProperties) {
this(table, family, kerberosProperties, false);
@@ -79,7 +97,7 @@ public class MockHBaseClientService extends HBase_2_ClientService {
final Cell[] cellArray = new Cell[cells.size()];
int i = 0;
for (final Map.Entry<String, String> cellEntry : cells.entrySet()) {
- final Cell cell = Mockito.mock(Cell.class);
+ final Cell cell = mock(Cell.class);
when(cell.getRowArray()).thenReturn(rowArray);
when(cell.getRowOffset()).thenReturn(0);
when(cell.getRowLength()).thenReturn((short) rowArray.length);
@@ -106,7 +124,7 @@ public class MockHBaseClientService extends HBase_2_ClientService {
cellArray[i++] = cell;
}
- final Result result = Mockito.mock(Result.class);
+ final Result result = mock(Result.class);
when(result.getRow()).thenReturn(rowArray);
when(result.rawCells()).thenReturn(cellArray);
results.put(rowKey, result);
@@ -179,28 +197,28 @@ public class MockHBaseClientService extends HBase_2_ClientService {
}
protected ResultScanner getResults(Table table, byte[] startRow, byte[] endRow, Collection<Column> columns, List<String> labels) throws IOException {
- final ResultScanner scanner = Mockito.mock(ResultScanner.class);
+ final ResultScanner scanner = mock(ResultScanner.class);
Mockito.when(scanner.iterator()).thenReturn(results.values().iterator());
return scanner;
}
@Override
protected ResultScanner getResults(Table table, Collection<Column> columns, Filter filter, long minTime, List<String> labels) throws IOException {
- final ResultScanner scanner = Mockito.mock(ResultScanner.class);
+ final ResultScanner scanner = mock(ResultScanner.class);
Mockito.when(scanner.iterator()).thenReturn(results.values().iterator());
return scanner;
}
protected ResultScanner getResults(final Table table, final String startRow, final String endRow, final String filterExpression, final Long timerangeMin, final Long timerangeMax,
final Integer limitRows, final Boolean isReversed, final Collection<Column> columns) throws IOException {
- final ResultScanner scanner = Mockito.mock(ResultScanner.class);
+ final ResultScanner scanner = mock(ResultScanner.class);
Mockito.when(scanner.iterator()).thenReturn(results.values().iterator());
return scanner;
}
@Override
protected Connection createConnection(ConfigurationContext context) throws IOException {
- Connection connection = Mockito.mock(Connection.class);
+ Connection connection = mock(Connection.class);
Mockito.when(connection.getTable(table.getName())).thenReturn(table);
return connection;
}
@@ -209,4 +227,9 @@ public class MockHBaseClientService extends HBase_2_ClientService {
boolean isAllowExplicitKeytab() {
return allowExplicitKeytab;
}
+
+ @Override
+ UserGroupInformation getUgi() throws IOException {
+ return mockUgi;
+ }
}