You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@nifi.apache.org by Great Info <gu...@gmail.com> on 2020/01/21 17:22:50 UTC

NiFi Invoke Http processor with oauth2

Hi,

I am using NiFi invokeHttp processor to send the data to some other third
party server. The third party server is oauth2 protected, so I need to
login and get the JWT token after getting it, invoke http needs to add it
in header *'Authorization:"JWTtoken"'* before sending to third-party server
and refresh it when it expires.

When I went through NiFi invokehttp processor it does not have any option
for oauth2 flow options.

During search I got few links oauth
<https://gist.github.com/pvillard31/c2926fdcff57b017a771d2d1acdb5cba>, this
is very big flow to get a token which is adding more nifi processor stuffs
, and nifi-jira <https://github.com/apache/nifi/pull/2085> link shows there
is some Jira ticket on the same but I could not find same in recent NiFi.
Hence is there a better flow to handle oauth2 when sending the data to
other third party server.

Regards

Indra

Re: NiFi Invoke Http processor with oauth2

Posted by Juan Pablo Gardella <ga...@gmail.com>.
*removed dev list*

Hi, I did a processor + a simple token service to do authentication based
on token. Hope it helps to you. Probably you should adjust the token
service to match the json that contains the token. Below the processors and
the service. Notice I am using it in a standalone nifi. For clusters, maybe
you have to externalize where to store the token if you don't request a new
token in each node.


*TokenAttributeUpdater *
@EventDriven
@SideEffectFree
@InputRequirement(Requirement.INPUT_REQUIRED)
@SupportsBatching
@AutoService(Processor.class)
@WritesAttribute(attribute = "token", description = "Token to used in HTTP
header")
public class TokenAttributeUpdater extends AbstractProcessor {

private static final Logger LOGGER = LoggerFactory.getLogger(
TokenAttributeUpdater.class);
public static final Relationship REL_SUCCESS = new Relationship.Builder().
name("success")
.description("").build();
public static final Relationship REL_FAIL = new Relationship.Builder().name(
"fail").description(
"").build();

public static final PropertyDescriptor TOKEN_SERVICE = new
PropertyDescriptor.Builder().name(
"Token service").identifiesControllerService(TokenService.class).required(
true).build();

static {
final List<PropertyDescriptor> props = new ArrayList<>();
props.add(TOKEN_SERVICE);

PROPERTIES = Collections.unmodifiableList(props);
final Set<Relationship> relationships = new HashSet<>();
relationships.add(REL_SUCCESS);
relationships.add(REL_FAIL);
RELATIONSHIPS = Collections.unmodifiableSet(relationships);

}

private static final List<PropertyDescriptor> PROPERTIES;
private static final Set<Relationship> RELATIONSHIPS;

@Override
public Set<Relationship> getRelationships() {
return RELATIONSHIPS;
}

@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws
ProcessException {
final FlowFile file = session.get();
if (file != null) {
try {
doProcessFile(file, session, context);
} catch (final Exception e) {
LOGGER.error("Fail to obtain token {}", e.getMessage());
FlowFile flowFileToWrite = session.putAttribute(file, "error", e.getClass()
+ ":" + e
.getMessage());
session.transfer(flowFileToWrite, REL_FAIL);
}
}
}

@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return PROPERTIES;
}

private void doProcessFile(final FlowFile flowFile, ProcessSession session,
ProcessContext context) {
FlowFile flowFileToWrite = session.putAttribute(flowFile, "token", context.
getProperty(
TOKEN_SERVICE).asControllerService(TokenService.class).getToken());
session.transfer(flowFileToWrite, REL_SUCCESS);
}

}

*TokenService:*
public interface TokenService extends ControllerService {
String getToken() throws ProcessException;
}

*TokenServiceImpl * @CapabilityDescription("Token handler.")
@Tags({"token", "credentials"})
@AutoService(ControllerService.class)
public class TokenServiceImpl extends AbstractControllerService implements
TokenService {

private static final class Token {
// How many seconds extra are used to mark a token expired.
private static final long SECONDS_WINDOWS = 60L;
private final String token;
/**
* When the token expires.
*/
private final Instant expires_in;

public Token(String token, int expires_in) {
this.token = Objects.requireNonNull(token, "token cannot be null");
if (expires_in <= 0) {
throw new IllegalArgumentException("expires_in should be greater than 0");
}
this.expires_in = Instant.now().plusSeconds(expires_in);
}

public String getToken() {
if (isExpired()) {
throw new IllegalStateException("Token expired");
}
return token;
}

public boolean isExpired() {
return Instant.now().plusSeconds(SECONDS_WINDOWS).isAfter(expires_in);
}
}

// TODO: This class should match the JSON which includes the token info and
when it expires.
private static final class TokenResponse {
String token;
int expires_in;
}

public static final PropertyDescriptor LOGIN_URI = new PropertyDescriptor.
Builder().name(
"login_uri").description("Login URI")
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).
addValidator(
StandardValidators.URI_VALIDATOR).required(true).build();

public static final PropertyDescriptor USER= new PropertyDescriptor.Builder
().name("user")
.description("usuario").expressionLanguageSupported(ExpressionLanguageScope.
VARIABLE_REGISTRY)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR).required(true).build
();

public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.
Builder().name(
"password").description("password").expressionLanguageSupported(
ExpressionLanguageScope.VARIABLE_REGISTRY).addValidator(
StandardValidators.NON_BLANK_VALIDATOR).sensitive(true).required(true).build
();

public static final PropertyDescriptor CONNECTION_TIMEOUT = new
PropertyDescriptor.Builder().name(
"connection.timeout").description("Connection timeout").
expressionLanguageSupported(
ExpressionLanguageScope.VARIABLE_REGISTRY).addValidator(
StandardValidators.TIME_PERIOD_VALIDATOR).defaultValue("10 sec").required(
true)
.build();

private static final String JSON_TEMPLATE = "{\"username\":
\"%s\",\"password\": \"%s\"}";
private static final List<PropertyDescriptor> PROPERTIES;
private static final Gson GSON = new Gson();
private static final Logger LOGGER = LoggerFactory.getLogger(
BancoIndustrialTokenService.class);
public static final MediaType JSON = MediaType.get("application/json;
charset=utf-8");

static {
final List<PropertyDescriptor> props = new ArrayList<>();
props.add(USER);
props.add(PASSWORD);
props.add(LOGIN_URI);
props.add(CONNECTION_TIMEOUT);
PROPERTIES = Collections.unmodifiableList(props);
}

private Token token;

@Override
public synchronized String getToken() throws ProcessException {
LOGGER.info("Token requested...");
if (token == null || token.isExpired()) {
try {
Stopwatch sw = Stopwatch.createStarted();
LOGGER.info("Token will be generated...");
token = createToken();
LOGGER.info("Token was generated successfully in {}", sw);
} catch (IOException | InterruptedException e) {
LOGGER.error("Cannot obtain the token: {}", e.getMessage(), e);
throw new ProcessException("Failed to obtain the token", e);
}
}
LOGGER.info("Token will be returned");
return token.getToken();
}

@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return PROPERTIES;
}

private Token createToken() throws IOException, InterruptedException {
if (!System.getProperty("java.version").startsWith("1.")) {
return getTokenUsingJava11();
} else {
return getTokenUsingOkHttp(getConfigurationContext());
}
}

private Duration getConnectionTimeOut() {
return Duration.ofMillis(Double.valueOf(FormatUtils.getPreciseTimeDuration(
getConfigurationContext().getProperty(CONNECTION_TIMEOUT).
evaluateAttributeExpressions()
.getValue().trim(), TimeUnit.MILLISECONDS)).longValue());
}

private String getLoginJsonBody() {
return String.format(JSON_TEMPLATE, getConfigurationContext().getProperty(
USUARIO)
.evaluateAttributeExpressions().getValue(), getConfigurationContext().
getProperty(PASSWORD)
.evaluateAttributeExpressions().getValue());
}

private URI getLoginUri() {
return URI.create(getConfigurationContext().getProperty(LOGIN_URI)
.evaluateAttributeExpressions().getValue().trim());
}

private Token getTokenUsingJava11() throws IOException, InterruptedException
{
HttpRequest request = HttpRequest.newBuilder().uri(getLoginUri()).POST(
BodyPublishers.ofString(
getLoginJsonBody())).header("Content-Type", "application/json").build();
HttpResponse<String> response = HttpClient.newBuilder().connectTimeout(
getConnectionTimeOut())
.build().send(request, BodyHandlers.ofString());
LOGGER.info("Response status code: {}", response.statusCode());
LOGGER.info("Response headers: {}", response.headers());
TokenResponse tokenResponse = GSON.fromJson(response.body(), TokenResponse.
class);
return new Token(tokenResponse.token, tokenResponse.expires_in);
}

private Token getTokenUsingOkHttp(ConfigurationContext context) throws
IOException {
OkHttpClient.Builder okHttpClientBuilder = new OkHttpClient().newBuilder();
okHttpClientBuilder.connectTimeout((context.getProperty(CONNECTION_TIMEOUT)
.evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS).intValue
()),
TimeUnit.MILLISECONDS);
okHttpClientBuilder.readTimeout((context.getProperty(CONNECTION_TIMEOUT)
.evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS).intValue
()),
TimeUnit.MILLISECONDS);
RequestBody body = RequestBody.create(getLoginJsonBody(), JSON);
Request request = new Request.Builder().url(getLoginUri().toString()).post(
body).build();
try (Response response = okHttpClientBuilder.build().newCall(request).
execute()) {
TokenResponse tokenResponse = GSON.fromJson(response.body().string(),
TokenResponse.class);
return new Token(tokenResponse.token, tokenResponse.expires_in);
}
}
}

On Tue, 21 Jan 2020 at 14:23, Great Info <gu...@gmail.com> wrote:

> Hi,
>
> I am using NiFi invokeHttp processor to send the data to some other third
> party server. The third party server is oauth2 protected, so I need to
> login and get the JWT token after getting it, invoke http needs to add it
> in header *'Authorization:"JWTtoken"'* before sending to third-party
> server and refresh it when it expires.
>
> When I went through NiFi invokehttp processor it does not have any option
> for oauth2 flow options.
>
> During search I got few links oauth
> <https://gist.github.com/pvillard31/c2926fdcff57b017a771d2d1acdb5cba>,
> this is very big flow to get a token which is adding more nifi processor
> stuffs , and nifi-jira <https://github.com/apache/nifi/pull/2085> link
> shows there is some Jira ticket on the same but I could not find same in
> recent NiFi. Hence is there a better flow to handle oauth2 when sending the
> data to other third party server.
>
> Regards
>
> Indra
>